rocksdb/
db_iterator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    db::{DBAccess, DB},
17    ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
26/// and other `raw_iterator_*` methods.
27///
28/// This iterator replicates RocksDB's API. It should provide better
29/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
30/// Rust [`std::iter::Iterator`].
31///
32/// ```
33/// use rocksdb::{DB, Options};
34///
35/// let tempdir = tempfile::Builder::new()
36///     .prefix("_path_for_rocksdb_storage4")
37///     .tempdir()
38///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
39/// let path = tempdir.path();
40/// {
41///     let db = DB::open_default(path).unwrap();
42///     let mut iter = db.raw_iterator();
43///
44///     // Forwards iteration
45///     iter.seek_to_first();
46///     while iter.valid() {
47///         println!("Saw {:?} {:?}", iter.key(), iter.value());
48///         iter.next();
49///     }
50///
51///     // Reverse iteration
52///     iter.seek_to_last();
53///     while iter.valid() {
54///         println!("Saw {:?} {:?}", iter.key(), iter.value());
55///         iter.prev();
56///     }
57///
58///     // Seeking
59///     iter.seek(b"my key");
60///     while iter.valid() {
61///         println!("Saw {:?} {:?}", iter.key(), iter.value());
62///         iter.next();
63///     }
64///
65///     // Reverse iteration from key
66///     // Note, use seek_for_prev when reversing because if this key doesn't exist,
67///     // this will make the iterator start from the previous key rather than the next.
68///     iter.seek_for_prev(b"my key");
69///     while iter.valid() {
70///         println!("Saw {:?} {:?}", iter.key(), iter.value());
71///         iter.prev();
72///     }
73/// }
74/// let _ = DB::destroy(&Options::default(), path);
75/// ```
76pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
77    inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
78
79    /// When iterate_lower_bound or iterate_upper_bound are set, the inner
80    /// C iterator keeps a pointer to the upper bound inside `_readopts`.
81    /// Storing this makes sure the upper bound is always alive when the
82    /// iterator is being used.
83    ///
84    /// And yes, we need to store the entire ReadOptions structure since C++
85    /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
86    /// point to vectors we own.  See issue #660.
87    _readopts: ReadOptions,
88
89    db: PhantomData<&'a D>,
90}
91
92impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
93    pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
94        let inner = unsafe { db.create_iterator(&readopts) };
95        Self::from_inner(inner, readopts)
96    }
97
98    pub(crate) fn new_cf(
99        db: &'a D,
100        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
101        readopts: ReadOptions,
102    ) -> Self {
103        let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
104        Self::from_inner(inner, readopts)
105    }
106
107    fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
108        // This unwrap will never fail since rocksdb_create_iterator and
109        // rocksdb_create_iterator_cf functions always return non-null. They
110        // use new and deference the result so any nulls would end up with SIGSEGV
111        // there and we would have a bigger issue.
112        let inner = std::ptr::NonNull::new(inner).unwrap();
113        Self {
114            inner,
115            _readopts: readopts,
116            db: PhantomData,
117        }
118    }
119
120    /// Returns `true` if the iterator is valid. An iterator is invalidated when
121    /// it reaches the end of its defined range, or when it encounters an error.
122    ///
123    /// To check whether the iterator encountered an error after `valid` has
124    /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
125    /// return an error when `valid` is `true`.
126    pub fn valid(&self) -> bool {
127        unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
128    }
129
130    /// Returns an error `Result` if the iterator has encountered an error
131    /// during operation. When an error is encountered, the iterator is
132    /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
133    ///
134    /// Performing a seek will discard the current status.
135    pub fn status(&self) -> Result<(), Error> {
136        unsafe {
137            ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
138        }
139        Ok(())
140    }
141
142    /// Seeks to the first key in the database.
143    ///
144    /// # Examples
145    ///
146    /// ```rust
147    /// use rocksdb::{DB, Options};
148    ///
149    /// let tempdir = tempfile::Builder::new()
150    ///     .prefix("_path_for_rocksdb_storage5")
151    ///     .tempdir()
152    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
153    /// let path = tempdir.path();
154    /// {
155    ///     let db = DB::open_default(path).unwrap();
156    ///     let mut iter = db.raw_iterator();
157    ///
158    ///     // Iterate all keys from the start in lexicographic order
159    ///     iter.seek_to_first();
160    ///
161    ///     while iter.valid() {
162    ///         println!("{:?} {:?}", iter.key(), iter.value());
163    ///         iter.next();
164    ///     }
165    ///
166    ///     // Read just the first key
167    ///     iter.seek_to_first();
168    ///
169    ///     if iter.valid() {
170    ///         println!("{:?} {:?}", iter.key(), iter.value());
171    ///     } else {
172    ///         // There are no keys in the database
173    ///     }
174    /// }
175    /// let _ = DB::destroy(&Options::default(), path);
176    /// ```
177    pub fn seek_to_first(&mut self) {
178        unsafe {
179            ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
180        }
181    }
182
183    /// Seeks to the last key in the database.
184    ///
185    /// # Examples
186    ///
187    /// ```rust
188    /// use rocksdb::{DB, Options};
189    ///
190    /// let tempdir = tempfile::Builder::new()
191    ///     .prefix("_path_for_rocksdb_storage6")
192    ///     .tempdir()
193    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
194    /// let path = tempdir.path();
195    /// {
196    ///     let db = DB::open_default(path).unwrap();
197    ///     let mut iter = db.raw_iterator();
198    ///
199    ///     // Iterate all keys from the end in reverse lexicographic order
200    ///     iter.seek_to_last();
201    ///
202    ///     while iter.valid() {
203    ///         println!("{:?} {:?}", iter.key(), iter.value());
204    ///         iter.prev();
205    ///     }
206    ///
207    ///     // Read just the last key
208    ///     iter.seek_to_last();
209    ///
210    ///     if iter.valid() {
211    ///         println!("{:?} {:?}", iter.key(), iter.value());
212    ///     } else {
213    ///         // There are no keys in the database
214    ///     }
215    /// }
216    /// let _ = DB::destroy(&Options::default(), path);
217    /// ```
218    pub fn seek_to_last(&mut self) {
219        unsafe {
220            ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
221        }
222    }
223
224    /// Seeks to the specified key or the first key that lexicographically follows it.
225    ///
226    /// This method will attempt to seek to the specified key. If that key does not exist, it will
227    /// find and seek to the key that lexicographically follows it instead.
228    ///
229    /// # Examples
230    ///
231    /// ```rust
232    /// use rocksdb::{DB, Options};
233    ///
234    /// let tempdir = tempfile::Builder::new()
235    ///     .prefix("_path_for_rocksdb_storage7")
236    ///     .tempdir()
237    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
238    /// let path = tempdir.path();
239    /// {
240    ///     let db = DB::open_default(path).unwrap();
241    ///     let mut iter = db.raw_iterator();
242    ///
243    ///     // Read the first key that starts with 'a'
244    ///     iter.seek(b"a");
245    ///
246    ///     if iter.valid() {
247    ///         println!("{:?} {:?}", iter.key(), iter.value());
248    ///     } else {
249    ///         // There are no keys in the database
250    ///     }
251    /// }
252    /// let _ = DB::destroy(&Options::default(), path);
253    /// ```
254    pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
255        let key = key.as_ref();
256
257        unsafe {
258            ffi::rocksdb_iter_seek(
259                self.inner.as_ptr(),
260                key.as_ptr() as *const c_char,
261                key.len() as size_t,
262            );
263        }
264    }
265
266    /// Seeks to the specified key, or the first key that lexicographically precedes it.
267    ///
268    /// Like ``.seek()`` this method will attempt to seek to the specified key.
269    /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
270    /// seek to key that lexicographically precedes it instead.
271    ///
272    /// # Examples
273    ///
274    /// ```rust
275    /// use rocksdb::{DB, Options};
276    ///
277    /// let tempdir = tempfile::Builder::new()
278    ///     .prefix("_path_for_rocksdb_storage8")
279    ///     .tempdir()
280    ///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
281    /// let path = tempdir.path();
282    /// {
283    ///     let db = DB::open_default(path).unwrap();
284    ///     let mut iter = db.raw_iterator();
285    ///
286    ///     // Read the last key that starts with 'a'
287    ///     iter.seek_for_prev(b"b");
288    ///
289    ///     if iter.valid() {
290    ///         println!("{:?} {:?}", iter.key(), iter.value());
291    ///     } else {
292    ///         // There are no keys in the database
293    ///     }
294    /// }
295    /// let _ = DB::destroy(&Options::default(), path);
296    /// ```
297    pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
298        let key = key.as_ref();
299
300        unsafe {
301            ffi::rocksdb_iter_seek_for_prev(
302                self.inner.as_ptr(),
303                key.as_ptr() as *const c_char,
304                key.len() as size_t,
305            );
306        }
307    }
308
309    /// Seeks to the next key.
310    pub fn next(&mut self) {
311        if self.valid() {
312            unsafe {
313                ffi::rocksdb_iter_next(self.inner.as_ptr());
314            }
315        }
316    }
317
318    /// Seeks to the previous key.
319    pub fn prev(&mut self) {
320        if self.valid() {
321            unsafe {
322                ffi::rocksdb_iter_prev(self.inner.as_ptr());
323            }
324        }
325    }
326
327    /// Returns a slice of the current key.
328    pub fn key(&self) -> Option<&[u8]> {
329        if self.valid() {
330            Some(self.key_impl())
331        } else {
332            None
333        }
334    }
335
336    /// Returns a slice of the current value.
337    pub fn value(&self) -> Option<&[u8]> {
338        if self.valid() {
339            Some(self.value_impl())
340        } else {
341            None
342        }
343    }
344
345    /// Returns pair with slice of the current key and current value.
346    pub fn item(&self) -> Option<(&[u8], &[u8])> {
347        if self.valid() {
348            Some((self.key_impl(), self.value_impl()))
349        } else {
350            None
351        }
352    }
353
354    /// Returns a slice of the current key; assumes the iterator is valid.
355    fn key_impl(&self) -> &[u8] {
356        // Safety Note: This is safe as all methods that may invalidate the buffer returned
357        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
358        unsafe {
359            let mut key_len: size_t = 0;
360            let key_len_ptr: *mut size_t = &mut key_len;
361            let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
362            slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
363        }
364    }
365
366    /// Returns a slice of the current value; assumes the iterator is valid.
367    fn value_impl(&self) -> &[u8] {
368        // Safety Note: This is safe as all methods that may invalidate the buffer returned
369        // take `&mut self`, so borrow checker will prevent use of buffer after seek.
370        unsafe {
371            let mut val_len: size_t = 0;
372            let val_len_ptr: *mut size_t = &mut val_len;
373            let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
374            slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
375        }
376    }
377}
378
379impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
380    fn drop(&mut self) {
381        unsafe {
382            ffi::rocksdb_iter_destroy(self.inner.as_ptr());
383        }
384    }
385}
386
387unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
388unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
389
390/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
391pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
392
393/// A standard Rust [`Iterator`] over a database or column family.
394///
395/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
396/// RocksDB's API, which can provide better performance and more features.
397///
398/// ```
399/// use rocksdb::{DB, Direction, IteratorMode, Options};
400///
401/// let tempdir = tempfile::Builder::new()
402///     .prefix("_path_for_rocksdb_storage2")
403///     .tempdir()
404///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
405/// let path = tempdir.path();
406/// {
407///     let db = DB::open_default(path).unwrap();
408///     let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
409///     for item in iter {
410///         let (key, value) = item.unwrap();
411///         println!("Saw {:?} {:?}", key, value);
412///     }
413///     iter = db.iterator(IteratorMode::End);  // Always iterates backward
414///     for item in iter {
415///         let (key, value) = item.unwrap();
416///         println!("Saw {:?} {:?}", key, value);
417///     }
418///     iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
419///     for item in iter {
420///         let (key, value) = item.unwrap();
421///         println!("Saw {:?} {:?}", key, value);
422///     }
423///
424///     // You can seek with an existing Iterator instance, too
425///     iter = db.iterator(IteratorMode::Start);
426///     iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
427///     for item in iter {
428///         let (key, value) = item.unwrap();
429///         println!("Saw {:?} {:?}", key, value);
430///     }
431/// }
432/// let _ = DB::destroy(&Options::default(), path);
433/// ```
434pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
435    raw: DBRawIteratorWithThreadMode<'a, D>,
436    direction: Direction,
437    done: bool,
438}
439
440#[derive(Copy, Clone)]
441pub enum Direction {
442    Forward,
443    Reverse,
444}
445
446pub type KVBytes = (Box<[u8]>, Box<[u8]>);
447
448#[derive(Copy, Clone)]
449pub enum IteratorMode<'a> {
450    Start,
451    End,
452    From(&'a [u8], Direction),
453}
454
455impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
456    pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
457        Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
458    }
459
460    pub(crate) fn new_cf(
461        db: &'a D,
462        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
463        readopts: ReadOptions,
464        mode: IteratorMode,
465    ) -> Self {
466        Self::from_raw(
467            DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
468            mode,
469        )
470    }
471
472    fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
473        let mut rv = DBIteratorWithThreadMode {
474            raw,
475            direction: Direction::Forward, // blown away by set_mode()
476            done: false,
477        };
478        rv.set_mode(mode);
479        rv
480    }
481
482    pub fn set_mode(&mut self, mode: IteratorMode) {
483        self.done = false;
484        self.direction = match mode {
485            IteratorMode::Start => {
486                self.raw.seek_to_first();
487                Direction::Forward
488            }
489            IteratorMode::End => {
490                self.raw.seek_to_last();
491                Direction::Reverse
492            }
493            IteratorMode::From(key, Direction::Forward) => {
494                self.raw.seek(key);
495                Direction::Forward
496            }
497            IteratorMode::From(key, Direction::Reverse) => {
498                self.raw.seek_for_prev(key);
499                Direction::Reverse
500            }
501        };
502    }
503}
504
505impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
506    type Item = Result<KVBytes, Error>;
507
508    fn next(&mut self) -> Option<Result<KVBytes, Error>> {
509        if self.done {
510            None
511        } else if let Some((key, value)) = self.raw.item() {
512            let item = (Box::from(key), Box::from(value));
513            match self.direction {
514                Direction::Forward => self.raw.next(),
515                Direction::Reverse => self.raw.prev(),
516            }
517            Some(Ok(item))
518        } else {
519            self.done = true;
520            self.raw.status().err().map(Result::Err)
521        }
522    }
523}
524
525impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
526
527impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
528    fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
529        self.raw
530    }
531}
532
533/// Iterates the batches of writes since a given sequence number.
534///
535/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
536/// batches of write operations that have occurred since a given sequence number
537/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
538/// the application.
539///
540/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
541/// value is the sequence number of the associated write batch.
542///
543pub struct DBWALIterator {
544    pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
545    pub(crate) start_seq_number: u64,
546}
547
548impl DBWALIterator {
549    /// Returns `true` if the iterator is valid. An iterator is invalidated when
550    /// it reaches the end of its defined range, or when it encounters an error.
551    ///
552    /// To check whether the iterator encountered an error after `valid` has
553    /// returned `false`, use the [`status`](DBWALIterator::status) method.
554    /// `status` will never return an error when `valid` is `true`.
555    pub fn valid(&self) -> bool {
556        unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
557    }
558
559    /// Returns an error `Result` if the iterator has encountered an error
560    /// during operation. When an error is encountered, the iterator is
561    /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
562    /// called.
563    pub fn status(&self) -> Result<(), Error> {
564        unsafe {
565            ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
566        }
567        Ok(())
568    }
569}
570
571impl Iterator for DBWALIterator {
572    type Item = Result<(u64, WriteBatch), Error>;
573
574    fn next(&mut self) -> Option<Self::Item> {
575        if !self.valid() {
576            return None;
577        }
578
579        let mut seq: u64 = 0;
580        let mut batch = WriteBatch {
581            inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
582        };
583
584        // if the initial sequence number is what was requested we skip it to
585        // only provide changes *after* it
586        while seq <= self.start_seq_number {
587            unsafe {
588                ffi::rocksdb_wal_iter_next(self.inner);
589            }
590
591            if !self.valid() {
592                return None;
593            }
594
595            // this drops which in turn frees the skipped batch
596            batch = WriteBatch {
597                inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
598            };
599        }
600
601        if !self.valid() {
602            return self.status().err().map(Result::Err);
603        }
604
605        // Seek to the next write batch.
606        // Note that WriteBatches live independently of the WAL iterator so this is safe to do
607        unsafe {
608            ffi::rocksdb_wal_iter_next(self.inner);
609        }
610
611        Some(Ok((seq, batch)))
612    }
613}
614
615impl Drop for DBWALIterator {
616    fn drop(&mut self) {
617        unsafe {
618            ffi::rocksdb_wal_iter_destroy(self.inner);
619        }
620    }
621}