rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    db::{DBAccess, DB},
17    ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// An iterator over a database or column family, with specifiable
26/// ranges and direction.
27///
28/// This iterator is different to the standard ``DBIteratorWithThreadMode`` as it aims Into
29/// replicate the underlying iterator API within RocksDB itself. This should
30/// give access to more performance and flexibility but departs from the
31/// widely recognised Rust idioms.
32///
33/// ```
34/// use rocksdb::{DB, Options};
35///
36/// let path = "_path_for_rocksdb_storage4";
37/// {
38///     let db = DB::open_default(path).unwrap();
39///     let mut iter = db.raw_iterator();
40///
41///     // Forwards iteration
42///     iter.seek_to_first();
43///     while iter.valid() {
44///         println!("Saw {:?} {:?}", iter.key(), iter.value());
45///         iter.next();
46///     }
47///
48///     // Reverse iteration
49///     iter.seek_to_last();
50///     while iter.valid() {
51///         println!("Saw {:?} {:?}", iter.key(), iter.value());
52///         iter.prev();
53///     }
54///
55///     // Seeking
56///     iter.seek(b"my key");
57///     while iter.valid() {
58///         println!("Saw {:?} {:?}", iter.key(), iter.value());
59///         iter.next();
60///     }
61///
62///     // Reverse iteration from key
63///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
64///     // this will make the iterator start from the previous key rather than the next.
65///     iter.seek_for_prev(b"my key");
66///     while iter.valid() {
67///         println!("Saw {:?} {:?}", iter.key(), iter.value());
68///         iter.prev();
69///     }
70/// }
71/// let _ = DB::destroy(&Options::default(), path);
72/// ```
73pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
74    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
75
76    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
77    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
78    /// Storing this makes sure the upper bound is always alive when the
79    /// iterator is being used.
80    ///
81    /// And yes, we need to store the entire ReadOptions structure since C++
82    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
83    /// point to vectors we own.  See issue #660.
84    _readopts: ReadOptions,
85
86    db: PhantomData<&'a D>,
87}
88
89impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
90    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
91        let inner = unsafe { db.create_iterator(&readopts) };
92        Self::from_inner(inner, readopts)
93    }
94
95    pub(crate) fn new_cf(
96        db: &'a D,
97        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
98        readopts: ReadOptions,
99    ) -> Self {
100        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
101        Self::from_inner(inner, readopts)
102    }
103
104    fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
105        // This unwrap will never fail since rocksdb_create_iterator and
106        // rocksdb_create_iterator_cf functions always return non-null. They
107        // use new and deference the result so any nulls would end up with SIGSEGV
108        // there and we would have a bigger issue.
109        let inner = std::ptr::NonNull::new(inner).unwrap();
110        Self {
111            inner,
112            _readopts: readopts,
113            db: PhantomData,
114        }
115    }
116
117    /// Returns `true` if the iterator is valid. An iterator is invalidated when
118    /// it reaches the end of its defined range, or when it encounters an error.
119    ///
120    /// To check whether the iterator encountered an error after `valid` has
121    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
122    /// return an error when `valid` is `true`.
123    pub fn valid(&self) -> bool {
124        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
125    }
126
127    /// Returns an error `Result` if the iterator has encountered an error
128    /// during operation. When an error is encountered, the iterator is
129    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
130    ///
131    /// Performing a seek will discard the current status.
132    pub fn status(&self) -> Result<(), Error> {
133        unsafe {
134            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
135        }
136        Ok(())
137    }
138
139    /// Seeks to the first key in the database.
140    ///
141    /// # Examples
142    ///
143    /// ```rust
144    /// use rocksdb::{DB, Options};
145    ///
146    /// let path = "_path_for_rocksdb_storage5";
147    /// {
148    ///     let db = DB::open_default(path).unwrap();
149    ///     let mut iter = db.raw_iterator();
150    ///
151    ///     // Iterate all keys from the start in lexicographic order
152    ///     iter.seek_to_first();
153    ///
154    ///     while iter.valid() {
155    ///         println!("{:?} {:?}", iter.key(), iter.value());
156    ///         iter.next();
157    ///     }
158    ///
159    ///     // Read just the first key
160    ///     iter.seek_to_first();
161    ///
162    ///     if iter.valid() {
163    ///         println!("{:?} {:?}", iter.key(), iter.value());
164    ///     } else {
165    ///         // There are no keys in the database
166    ///     }
167    /// }
168    /// let _ = DB::destroy(&Options::default(), path);
169    /// ```
170    pub fn seek_to_first(&mut self) {
171        unsafe {
172            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
173        }
174    }
175
176    /// Seeks to the last key in the database.
177    ///
178    /// # Examples
179    ///
180    /// ```rust
181    /// use rocksdb::{DB, Options};
182    ///
183    /// let path = "_path_for_rocksdb_storage6";
184    /// {
185    ///     let db = DB::open_default(path).unwrap();
186    ///     let mut iter = db.raw_iterator();
187    ///
188    ///     // Iterate all keys from the end in reverse lexicographic order
189    ///     iter.seek_to_last();
190    ///
191    ///     while iter.valid() {
192    ///         println!("{:?} {:?}", iter.key(), iter.value());
193    ///         iter.prev();
194    ///     }
195    ///
196    ///     // Read just the last key
197    ///     iter.seek_to_last();
198    ///
199    ///     if iter.valid() {
200    ///         println!("{:?} {:?}", iter.key(), iter.value());
201    ///     } else {
202    ///         // There are no keys in the database
203    ///     }
204    /// }
205    /// let _ = DB::destroy(&Options::default(), path);
206    /// ```
207    pub fn seek_to_last(&mut self) {
208        unsafe {
209            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
210        }
211    }
212
213    /// Seeks to the specified key or the first key that lexicographically follows it.
214    ///
215    /// This method will attempt to seek to the specified key. If that key does not exist, it will
216    /// find and seek to the key that lexicographically follows it instead.
217    ///
218    /// # Examples
219    ///
220    /// ```rust
221    /// use rocksdb::{DB, Options};
222    ///
223    /// let path = "_path_for_rocksdb_storage7";
224    /// {
225    ///     let db = DB::open_default(path).unwrap();
226    ///     let mut iter = db.raw_iterator();
227    ///
228    ///     // Read the first key that starts with 'a'
229    ///     iter.seek(b"a");
230    ///
231    ///     if iter.valid() {
232    ///         println!("{:?} {:?}", iter.key(), iter.value());
233    ///     } else {
234    ///         // There are no keys in the database
235    ///     }
236    /// }
237    /// let _ = DB::destroy(&Options::default(), path);
238    /// ```
239    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
240        let key = key.as_ref();
241
242        unsafe {
243            ffi::rocksdb_iter_seek(
244                self.inner.as_ptr(),
245                key.as_ptr() as *const c_char,
246                key.len() as size_t,
247            );
248        }
249    }
250
251    /// Seeks to the specified key, or the first key that lexicographically precedes it.
252    ///
253    /// Like ``.seek()`` this method will attempt to seek to the specified key.
254    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
255    /// seek to key that lexicographically precedes it instead.
256    ///
257    /// # Examples
258    ///
259    /// ```rust
260    /// use rocksdb::{DB, Options};
261    ///
262    /// let path = "_path_for_rocksdb_storage8";
263    /// {
264    ///     let db = DB::open_default(path).unwrap();
265    ///     let mut iter = db.raw_iterator();
266    ///
267    ///     // Read the last key that starts with 'a'
268    ///     iter.seek_for_prev(b"b");
269    ///
270    ///     if iter.valid() {
271    ///         println!("{:?} {:?}", iter.key(), iter.value());
272    ///     } else {
273    ///         // There are no keys in the database
274    ///     }
275    /// }
276    /// let _ = DB::destroy(&Options::default(), path);
277    /// ```
278    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
279        let key = key.as_ref();
280
281        unsafe {
282            ffi::rocksdb_iter_seek_for_prev(
283                self.inner.as_ptr(),
284                key.as_ptr() as *const c_char,
285                key.len() as size_t,
286            );
287        }
288    }
289
290    /// Seeks to the next key.
291    pub fn next(&mut self) {
292        if self.valid() {
293            unsafe {
294                ffi::rocksdb_iter_next(self.inner.as_ptr());
295            }
296        }
297    }
298
299    /// Seeks to the previous key.
300    pub fn prev(&mut self) {
301        if self.valid() {
302            unsafe {
303                ffi::rocksdb_iter_prev(self.inner.as_ptr());
304            }
305        }
306    }
307
308    /// Returns a slice of the current key.
309    pub fn key(&self) -> Option<&[u8]> {
310        if self.valid() {
311            Some(self.key_impl())
312        } else {
313            None
314        }
315    }
316
317    /// Returns a slice of the current value.
318    pub fn value(&self) -> Option<&[u8]> {
319        if self.valid() {
320            Some(self.value_impl())
321        } else {
322            None
323        }
324    }
325
326    /// Returns pair with slice of the current key and current value.
327    pub fn item(&self) -> Option<(&[u8], &[u8])> {
328        if self.valid() {
329            Some((self.key_impl(), self.value_impl()))
330        } else {
331            None
332        }
333    }
334
335    /// Returns a slice of the current key; assumes the iterator is valid.
336    fn key_impl(&self) -> &[u8] {
337        // Safety Note: This is safe as all methods that may invalidate the buffer returned
338        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
339        unsafe {
340            let mut key_len: size_t = 0;
341            let key_len_ptr: *mut size_t = &mut key_len;
342            let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
343            slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
344        }
345    }
346
347    /// Returns a slice of the current value; assumes the iterator is valid.
348    fn value_impl(&self) -> &[u8] {
349        // Safety Note: This is safe as all methods that may invalidate the buffer returned
350        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
351        unsafe {
352            let mut val_len: size_t = 0;
353            let val_len_ptr: *mut size_t = &mut val_len;
354            let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
355            slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
356        }
357    }
358}
359
360impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> {
361    fn drop(&mut self) {
362        unsafe {
363            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
364        }
365    }
366}
367
368unsafe impl<'a, D: DBAccess> Send for DBRawIteratorWithThreadMode<'a, D> {}
369unsafe impl<'a, D: DBAccess> Sync for DBRawIteratorWithThreadMode<'a, D> {}
370
371/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
372pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
373
374/// An iterator over a database or column family, with specifiable
375/// ranges and direction.
376///
377/// ```
378/// use rocksdb::{DB, Direction, IteratorMode, Options};
379///
380/// let path = "_path_for_rocksdb_storage2";
381/// {
382///     let db = DB::open_default(path).unwrap();
383///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
384///     for item in iter {
385///         let (key, value) = item.unwrap();
386///         println!("Saw {:?} {:?}", key, value);
387///     }
388///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
389///     for item in iter {
390///         let (key, value) = item.unwrap();
391///         println!("Saw {:?} {:?}", key, value);
392///     }
393///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
394///     for item in iter {
395///         let (key, value) = item.unwrap();
396///         println!("Saw {:?} {:?}", key, value);
397///     }
398///
399///     // You can seek with an existing Iterator instance, too
400///     iter = db.iterator(IteratorMode::Start);
401///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
402///     for item in iter {
403///         let (key, value) = item.unwrap();
404///         println!("Saw {:?} {:?}", key, value);
405///     }
406/// }
407/// let _ = DB::destroy(&Options::default(), path);
408/// ```
409pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
410    raw: DBRawIteratorWithThreadMode<'a, D>,
411    direction: Direction,
412    done: bool,
413}
414
415#[derive(Copy, Clone)]
416pub enum Direction {
417    Forward,
418    Reverse,
419}
420
421pub type KVBytes = (Box<[u8]>, Box<[u8]>);
422
423#[derive(Copy, Clone)]
424pub enum IteratorMode<'a> {
425    Start,
426    End,
427    From(&'a [u8], Direction),
428}
429
430impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
431    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
432        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
433    }
434
435    pub(crate) fn new_cf(
436        db: &'a D,
437        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
438        readopts: ReadOptions,
439        mode: IteratorMode,
440    ) -> Self {
441        Self::from_raw(
442            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
443            mode,
444        )
445    }
446
447    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
448        let mut rv = DBIteratorWithThreadMode {
449            raw,
450            direction: Direction::Forward, // blown away by set_mode()
451            done: false,
452        };
453        rv.set_mode(mode);
454        rv
455    }
456
457    pub fn set_mode(&mut self, mode: IteratorMode) {
458        self.done = false;
459        self.direction = match mode {
460            IteratorMode::Start => {
461                self.raw.seek_to_first();
462                Direction::Forward
463            }
464            IteratorMode::End => {
465                self.raw.seek_to_last();
466                Direction::Reverse
467            }
468            IteratorMode::From(key, Direction::Forward) => {
469                self.raw.seek(key);
470                Direction::Forward
471            }
472            IteratorMode::From(key, Direction::Reverse) => {
473                self.raw.seek_for_prev(key);
474                Direction::Reverse
475            }
476        };
477    }
478}
479
480impl<'a, D: DBAccess> Iterator for DBIteratorWithThreadMode<'a, D> {
481    type Item = Result<KVBytes, Error>;
482
483    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
484        if self.done {
485            None
486        } else if let Some((key, value)) = self.raw.item() {
487            let item = (Box::from(key), Box::from(value));
488            match self.direction {
489                Direction::Forward => self.raw.next(),
490                Direction::Reverse => self.raw.prev(),
491            }
492            Some(Ok(item))
493        } else {
494            self.done = true;
495            self.raw.status().err().map(Result::Err)
496        }
497    }
498}
499
500impl<'a, D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'a, D> {}
501
502impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
503    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
504        self.raw
505    }
506}
507
508/// Iterates the batches of writes since a given sequence number.
509///
510/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
511/// batches of write operations that have occurred since a given sequence number
512/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
513/// the application.
514///
515/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
516/// value is the sequence number of the associated write batch.
517///
518pub struct DBWALIterator {
519    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
520    pub(crate) start_seq_number: u64,
521}
522
523impl DBWALIterator {
524    /// Returns `true` if the iterator is valid. An iterator is invalidated when
525    /// it reaches the end of its defined range, or when it encounters an error.
526    ///
527    /// To check whether the iterator encountered an error after `valid` has
528    /// returned `false`, use the [`status`](DBWALIterator::status) method.
529    /// `status` will never return an error when `valid` is `true`.
530    pub fn valid(&self) -> bool {
531        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
532    }
533
534    /// Returns an error `Result` if the iterator has encountered an error
535    /// during operation. When an error is encountered, the iterator is
536    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
537    /// called.
538    pub fn status(&self) -> Result<(), Error> {
539        unsafe {
540            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
541        }
542        Ok(())
543    }
544}
545
546impl Iterator for DBWALIterator {
547    type Item = Result<(u64, WriteBatch), Error>;
548
549    fn next(&mut self) -> Option<Self::Item> {
550        if !self.valid() {
551            return None;
552        }
553
554        let mut seq: u64 = 0;
555        let mut batch = WriteBatch {
556            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
557        };
558
559        // if the initial sequence number is what was requested we skip it to
560        // only provide changes *after* it
561        while seq <= self.start_seq_number {
562            unsafe {
563                ffi::rocksdb_wal_iter_next(self.inner);
564            }
565
566            if !self.valid() {
567                return None;
568            }
569
570            // this drops which in turn frees the skipped batch
571            batch = WriteBatch {
572                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
573            };
574        }
575
576        if !self.valid() {
577            return self.status().err().map(Result::Err);
578        }
579
580        // Seek to the next write batch.
581        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
582        unsafe {
583            ffi::rocksdb_wal_iter_next(self.inner);
584        }
585
586        Some(Ok((seq, batch)))
587    }
588}
589
590impl Drop for DBWALIterator {
591    fn drop(&mut self) {
592        unsafe {
593            ffi::rocksdb_wal_iter_destroy(self.inner);
594        }
595    }
596}