rocksdb/db_iterator.rs
1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16 db::{DBAccess, DB},
17 ffi, Error, ReadOptions, WriteBatch,
18};
19use libc::{c_char, c_uchar, size_t};
20use std::{marker::PhantomData, slice};
21
22/// A type alias to keep compatibility. See [`DBRawIteratorWithThreadMode`] for details
23pub type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, DB>;
24
25/// A low-level iterator over a database or column family, created by [`DB::raw_iterator`]
26/// and other `raw_iterator_*` methods.
27///
28/// This iterator replicates RocksDB's API. It should provide better
29/// performance and more features than [`DBIteratorWithThreadMode`], which is a standard
30/// Rust [`std::iter::Iterator`].
31///
32/// ```
33/// use rocksdb::{DB, Options};
34///
35/// let tempdir = tempfile::Builder::new()
36/// .prefix("_path_for_rocksdb_storage4")
37/// .tempdir()
38/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage4.");
39/// let path = tempdir.path();
40/// {
41/// let db = DB::open_default(path).unwrap();
42/// let mut iter = db.raw_iterator();
43///
44/// // Forwards iteration
45/// iter.seek_to_first();
46/// while iter.valid() {
47/// println!("Saw {:?} {:?}", iter.key(), iter.value());
48/// iter.next();
49/// }
50///
51/// // Reverse iteration
52/// iter.seek_to_last();
53/// while iter.valid() {
54/// println!("Saw {:?} {:?}", iter.key(), iter.value());
55/// iter.prev();
56/// }
57///
58/// // Seeking
59/// iter.seek(b"my key");
60/// while iter.valid() {
61/// println!("Saw {:?} {:?}", iter.key(), iter.value());
62/// iter.next();
63/// }
64///
65/// // Reverse iteration from key
66/// // Note, use seek_for_prev when reversing because if this key doesn't exist,
67/// // this will make the iterator start from the previous key rather than the next.
68/// iter.seek_for_prev(b"my key");
69/// while iter.valid() {
70/// println!("Saw {:?} {:?}", iter.key(), iter.value());
71/// iter.prev();
72/// }
73/// }
74/// let _ = DB::destroy(&Options::default(), path);
75/// ```
76pub struct DBRawIteratorWithThreadMode<'a, D: DBAccess> {
77 inner: std::ptr::NonNull<ffi::rocksdb_iterator_t>,
78
79 /// When iterate_lower_bound or iterate_upper_bound are set, the inner
80 /// C iterator keeps a pointer to the upper bound inside `_readopts`.
81 /// Storing this makes sure the upper bound is always alive when the
82 /// iterator is being used.
83 ///
84 /// And yes, we need to store the entire ReadOptions structure since C++
85 /// ReadOptions keep reference to C rocksdb_readoptions_t wrapper which
86 /// point to vectors we own. See issue #660.
87 _readopts: ReadOptions,
88
89 db: PhantomData<&'a D>,
90}
91
92impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
93 pub(crate) fn new(db: &D, readopts: ReadOptions) -> Self {
94 let inner = unsafe { db.create_iterator(&readopts) };
95 Self::from_inner(inner, readopts)
96 }
97
98 pub(crate) fn new_cf(
99 db: &'a D,
100 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
101 readopts: ReadOptions,
102 ) -> Self {
103 let inner = unsafe { db.create_iterator_cf(cf_handle, &readopts) };
104 Self::from_inner(inner, readopts)
105 }
106
107 fn from_inner(inner: *mut ffi::rocksdb_iterator_t, readopts: ReadOptions) -> Self {
108 // This unwrap will never fail since rocksdb_create_iterator and
109 // rocksdb_create_iterator_cf functions always return non-null. They
110 // use new and deference the result so any nulls would end up with SIGSEGV
111 // there and we would have a bigger issue.
112 let inner = std::ptr::NonNull::new(inner).unwrap();
113 Self {
114 inner,
115 _readopts: readopts,
116 db: PhantomData,
117 }
118 }
119
120 /// Returns `true` if the iterator is valid. An iterator is invalidated when
121 /// it reaches the end of its defined range, or when it encounters an error.
122 ///
123 /// To check whether the iterator encountered an error after `valid` has
124 /// returned `false`, use the [`status`](DBRawIteratorWithThreadMode::status) method. `status` will never
125 /// return an error when `valid` is `true`.
126 pub fn valid(&self) -> bool {
127 unsafe { ffi::rocksdb_iter_valid(self.inner.as_ptr()) != 0 }
128 }
129
130 /// Returns an error `Result` if the iterator has encountered an error
131 /// during operation. When an error is encountered, the iterator is
132 /// invalidated and [`valid`](DBRawIteratorWithThreadMode::valid) will return `false` when called.
133 ///
134 /// Performing a seek will discard the current status.
135 pub fn status(&self) -> Result<(), Error> {
136 unsafe {
137 ffi_try!(ffi::rocksdb_iter_get_error(self.inner.as_ptr()));
138 }
139 Ok(())
140 }
141
142 /// Seeks to the first key in the database.
143 ///
144 /// # Examples
145 ///
146 /// ```rust
147 /// use rocksdb::{DB, Options};
148 ///
149 /// let tempdir = tempfile::Builder::new()
150 /// .prefix("_path_for_rocksdb_storage5")
151 /// .tempdir()
152 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage5.");
153 /// let path = tempdir.path();
154 /// {
155 /// let db = DB::open_default(path).unwrap();
156 /// let mut iter = db.raw_iterator();
157 ///
158 /// // Iterate all keys from the start in lexicographic order
159 /// iter.seek_to_first();
160 ///
161 /// while iter.valid() {
162 /// println!("{:?} {:?}", iter.key(), iter.value());
163 /// iter.next();
164 /// }
165 ///
166 /// // Read just the first key
167 /// iter.seek_to_first();
168 ///
169 /// if iter.valid() {
170 /// println!("{:?} {:?}", iter.key(), iter.value());
171 /// } else {
172 /// // There are no keys in the database
173 /// }
174 /// }
175 /// let _ = DB::destroy(&Options::default(), path);
176 /// ```
177 pub fn seek_to_first(&mut self) {
178 unsafe {
179 ffi::rocksdb_iter_seek_to_first(self.inner.as_ptr());
180 }
181 }
182
183 /// Seeks to the last key in the database.
184 ///
185 /// # Examples
186 ///
187 /// ```rust
188 /// use rocksdb::{DB, Options};
189 ///
190 /// let tempdir = tempfile::Builder::new()
191 /// .prefix("_path_for_rocksdb_storage6")
192 /// .tempdir()
193 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage6.");
194 /// let path = tempdir.path();
195 /// {
196 /// let db = DB::open_default(path).unwrap();
197 /// let mut iter = db.raw_iterator();
198 ///
199 /// // Iterate all keys from the end in reverse lexicographic order
200 /// iter.seek_to_last();
201 ///
202 /// while iter.valid() {
203 /// println!("{:?} {:?}", iter.key(), iter.value());
204 /// iter.prev();
205 /// }
206 ///
207 /// // Read just the last key
208 /// iter.seek_to_last();
209 ///
210 /// if iter.valid() {
211 /// println!("{:?} {:?}", iter.key(), iter.value());
212 /// } else {
213 /// // There are no keys in the database
214 /// }
215 /// }
216 /// let _ = DB::destroy(&Options::default(), path);
217 /// ```
218 pub fn seek_to_last(&mut self) {
219 unsafe {
220 ffi::rocksdb_iter_seek_to_last(self.inner.as_ptr());
221 }
222 }
223
224 /// Seeks to the specified key or the first key that lexicographically follows it.
225 ///
226 /// This method will attempt to seek to the specified key. If that key does not exist, it will
227 /// find and seek to the key that lexicographically follows it instead.
228 ///
229 /// # Examples
230 ///
231 /// ```rust
232 /// use rocksdb::{DB, Options};
233 ///
234 /// let tempdir = tempfile::Builder::new()
235 /// .prefix("_path_for_rocksdb_storage7")
236 /// .tempdir()
237 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage7.");
238 /// let path = tempdir.path();
239 /// {
240 /// let db = DB::open_default(path).unwrap();
241 /// let mut iter = db.raw_iterator();
242 ///
243 /// // Read the first key that starts with 'a'
244 /// iter.seek(b"a");
245 ///
246 /// if iter.valid() {
247 /// println!("{:?} {:?}", iter.key(), iter.value());
248 /// } else {
249 /// // There are no keys in the database
250 /// }
251 /// }
252 /// let _ = DB::destroy(&Options::default(), path);
253 /// ```
254 pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
255 let key = key.as_ref();
256
257 unsafe {
258 ffi::rocksdb_iter_seek(
259 self.inner.as_ptr(),
260 key.as_ptr() as *const c_char,
261 key.len() as size_t,
262 );
263 }
264 }
265
266 /// Seeks to the specified key, or the first key that lexicographically precedes it.
267 ///
268 /// Like ``.seek()`` this method will attempt to seek to the specified key.
269 /// The difference with ``.seek()`` is that if the specified key do not exist, this method will
270 /// seek to key that lexicographically precedes it instead.
271 ///
272 /// # Examples
273 ///
274 /// ```rust
275 /// use rocksdb::{DB, Options};
276 ///
277 /// let tempdir = tempfile::Builder::new()
278 /// .prefix("_path_for_rocksdb_storage8")
279 /// .tempdir()
280 /// .expect("Failed to create temporary path for the _path_for_rocksdb_storage8.");
281 /// let path = tempdir.path();
282 /// {
283 /// let db = DB::open_default(path).unwrap();
284 /// let mut iter = db.raw_iterator();
285 ///
286 /// // Read the last key that starts with 'a'
287 /// iter.seek_for_prev(b"b");
288 ///
289 /// if iter.valid() {
290 /// println!("{:?} {:?}", iter.key(), iter.value());
291 /// } else {
292 /// // There are no keys in the database
293 /// }
294 /// }
295 /// let _ = DB::destroy(&Options::default(), path);
296 /// ```
297 pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
298 let key = key.as_ref();
299
300 unsafe {
301 ffi::rocksdb_iter_seek_for_prev(
302 self.inner.as_ptr(),
303 key.as_ptr() as *const c_char,
304 key.len() as size_t,
305 );
306 }
307 }
308
309 /// Seeks to the next key.
310 pub fn next(&mut self) {
311 if self.valid() {
312 unsafe {
313 ffi::rocksdb_iter_next(self.inner.as_ptr());
314 }
315 }
316 }
317
318 /// Seeks to the previous key.
319 pub fn prev(&mut self) {
320 if self.valid() {
321 unsafe {
322 ffi::rocksdb_iter_prev(self.inner.as_ptr());
323 }
324 }
325 }
326
327 /// Returns a slice of the current key.
328 pub fn key(&self) -> Option<&[u8]> {
329 if self.valid() {
330 Some(self.key_impl())
331 } else {
332 None
333 }
334 }
335
336 /// Returns a slice of the current value.
337 pub fn value(&self) -> Option<&[u8]> {
338 if self.valid() {
339 Some(self.value_impl())
340 } else {
341 None
342 }
343 }
344
345 /// Returns pair with slice of the current key and current value.
346 pub fn item(&self) -> Option<(&[u8], &[u8])> {
347 if self.valid() {
348 Some((self.key_impl(), self.value_impl()))
349 } else {
350 None
351 }
352 }
353
354 /// Returns a slice of the current key; assumes the iterator is valid.
355 fn key_impl(&self) -> &[u8] {
356 // Safety Note: This is safe as all methods that may invalidate the buffer returned
357 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
358 unsafe {
359 let mut key_len: size_t = 0;
360 let key_len_ptr: *mut size_t = &mut key_len;
361 let key_ptr = ffi::rocksdb_iter_key(self.inner.as_ptr(), key_len_ptr);
362 slice::from_raw_parts(key_ptr as *const c_uchar, key_len)
363 }
364 }
365
366 /// Returns a slice of the current value; assumes the iterator is valid.
367 fn value_impl(&self) -> &[u8] {
368 // Safety Note: This is safe as all methods that may invalidate the buffer returned
369 // take `&mut self`, so borrow checker will prevent use of buffer after seek.
370 unsafe {
371 let mut val_len: size_t = 0;
372 let val_len_ptr: *mut size_t = &mut val_len;
373 let val_ptr = ffi::rocksdb_iter_value(self.inner.as_ptr(), val_len_ptr);
374 slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
375 }
376 }
377}
378
379impl<D: DBAccess> Drop for DBRawIteratorWithThreadMode<'_, D> {
380 fn drop(&mut self) {
381 unsafe {
382 ffi::rocksdb_iter_destroy(self.inner.as_ptr());
383 }
384 }
385}
386
387unsafe impl<D: DBAccess> Send for DBRawIteratorWithThreadMode<'_, D> {}
388unsafe impl<D: DBAccess> Sync for DBRawIteratorWithThreadMode<'_, D> {}
389
390/// A type alias to keep compatibility. See [`DBIteratorWithThreadMode`] for details
391pub type DBIterator<'a> = DBIteratorWithThreadMode<'a, DB>;
392
393/// A standard Rust [`Iterator`] over a database or column family.
394///
395/// As an alternative, [`DBRawIteratorWithThreadMode`] is a low level wrapper around
396/// RocksDB's API, which can provide better performance and more features.
397///
398/// ```
399/// use rocksdb::{DB, Direction, IteratorMode, Options};
400///
401/// let tempdir = tempfile::Builder::new()
402/// .prefix("_path_for_rocksdb_storage2")
403/// .tempdir()
404/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage2.");
405/// let path = tempdir.path();
406/// {
407/// let db = DB::open_default(path).unwrap();
408/// let mut iter = db.iterator(IteratorMode::Start); // Always iterates forward
409/// for item in iter {
410/// let (key, value) = item.unwrap();
411/// println!("Saw {:?} {:?}", key, value);
412/// }
413/// iter = db.iterator(IteratorMode::End); // Always iterates backward
414/// for item in iter {
415/// let (key, value) = item.unwrap();
416/// println!("Saw {:?} {:?}", key, value);
417/// }
418/// iter = db.iterator(IteratorMode::From(b"my key", Direction::Forward)); // From a key in Direction::{forward,reverse}
419/// for item in iter {
420/// let (key, value) = item.unwrap();
421/// println!("Saw {:?} {:?}", key, value);
422/// }
423///
424/// // You can seek with an existing Iterator instance, too
425/// iter = db.iterator(IteratorMode::Start);
426/// iter.set_mode(IteratorMode::From(b"another key", Direction::Reverse));
427/// for item in iter {
428/// let (key, value) = item.unwrap();
429/// println!("Saw {:?} {:?}", key, value);
430/// }
431/// }
432/// let _ = DB::destroy(&Options::default(), path);
433/// ```
434pub struct DBIteratorWithThreadMode<'a, D: DBAccess> {
435 raw: DBRawIteratorWithThreadMode<'a, D>,
436 direction: Direction,
437 done: bool,
438}
439
440#[derive(Copy, Clone)]
441pub enum Direction {
442 Forward,
443 Reverse,
444}
445
446pub type KVBytes = (Box<[u8]>, Box<[u8]>);
447
448#[derive(Copy, Clone)]
449pub enum IteratorMode<'a> {
450 Start,
451 End,
452 From(&'a [u8], Direction),
453}
454
455impl<'a, D: DBAccess> DBIteratorWithThreadMode<'a, D> {
456 pub(crate) fn new(db: &D, readopts: ReadOptions, mode: IteratorMode) -> Self {
457 Self::from_raw(DBRawIteratorWithThreadMode::new(db, readopts), mode)
458 }
459
460 pub(crate) fn new_cf(
461 db: &'a D,
462 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
463 readopts: ReadOptions,
464 mode: IteratorMode,
465 ) -> Self {
466 Self::from_raw(
467 DBRawIteratorWithThreadMode::new_cf(db, cf_handle, readopts),
468 mode,
469 )
470 }
471
472 fn from_raw(raw: DBRawIteratorWithThreadMode<'a, D>, mode: IteratorMode) -> Self {
473 let mut rv = DBIteratorWithThreadMode {
474 raw,
475 direction: Direction::Forward, // blown away by set_mode()
476 done: false,
477 };
478 rv.set_mode(mode);
479 rv
480 }
481
482 pub fn set_mode(&mut self, mode: IteratorMode) {
483 self.done = false;
484 self.direction = match mode {
485 IteratorMode::Start => {
486 self.raw.seek_to_first();
487 Direction::Forward
488 }
489 IteratorMode::End => {
490 self.raw.seek_to_last();
491 Direction::Reverse
492 }
493 IteratorMode::From(key, Direction::Forward) => {
494 self.raw.seek(key);
495 Direction::Forward
496 }
497 IteratorMode::From(key, Direction::Reverse) => {
498 self.raw.seek_for_prev(key);
499 Direction::Reverse
500 }
501 };
502 }
503}
504
505impl<D: DBAccess> Iterator for DBIteratorWithThreadMode<'_, D> {
506 type Item = Result<KVBytes, Error>;
507
508 fn next(&mut self) -> Option<Result<KVBytes, Error>> {
509 if self.done {
510 None
511 } else if let Some((key, value)) = self.raw.item() {
512 let item = (Box::from(key), Box::from(value));
513 match self.direction {
514 Direction::Forward => self.raw.next(),
515 Direction::Reverse => self.raw.prev(),
516 }
517 Some(Ok(item))
518 } else {
519 self.done = true;
520 self.raw.status().err().map(Result::Err)
521 }
522 }
523}
524
525impl<D: DBAccess> std::iter::FusedIterator for DBIteratorWithThreadMode<'_, D> {}
526
527impl<'a, D: DBAccess> Into<DBRawIteratorWithThreadMode<'a, D>> for DBIteratorWithThreadMode<'a, D> {
528 fn into(self) -> DBRawIteratorWithThreadMode<'a, D> {
529 self.raw
530 }
531}
532
533/// Iterates the batches of writes since a given sequence number.
534///
535/// `DBWALIterator` is returned by `DB::get_updates_since()` and will return the
536/// batches of write operations that have occurred since a given sequence number
537/// (see `DB::latest_sequence_number()`). This iterator cannot be constructed by
538/// the application.
539///
540/// The iterator item type is a tuple of (`u64`, `WriteBatch`) where the first
541/// value is the sequence number of the associated write batch.
542///
543pub struct DBWALIterator {
544 pub(crate) inner: *mut ffi::rocksdb_wal_iterator_t,
545 pub(crate) start_seq_number: u64,
546}
547
548impl DBWALIterator {
549 /// Returns `true` if the iterator is valid. An iterator is invalidated when
550 /// it reaches the end of its defined range, or when it encounters an error.
551 ///
552 /// To check whether the iterator encountered an error after `valid` has
553 /// returned `false`, use the [`status`](DBWALIterator::status) method.
554 /// `status` will never return an error when `valid` is `true`.
555 pub fn valid(&self) -> bool {
556 unsafe { ffi::rocksdb_wal_iter_valid(self.inner) != 0 }
557 }
558
559 /// Returns an error `Result` if the iterator has encountered an error
560 /// during operation. When an error is encountered, the iterator is
561 /// invalidated and [`valid`](DBWALIterator::valid) will return `false` when
562 /// called.
563 pub fn status(&self) -> Result<(), Error> {
564 unsafe {
565 ffi_try!(ffi::rocksdb_wal_iter_status(self.inner));
566 }
567 Ok(())
568 }
569}
570
571impl Iterator for DBWALIterator {
572 type Item = Result<(u64, WriteBatch), Error>;
573
574 fn next(&mut self) -> Option<Self::Item> {
575 if !self.valid() {
576 return None;
577 }
578
579 let mut seq: u64 = 0;
580 let mut batch = WriteBatch {
581 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
582 };
583
584 // if the initial sequence number is what was requested we skip it to
585 // only provide changes *after* it
586 while seq <= self.start_seq_number {
587 unsafe {
588 ffi::rocksdb_wal_iter_next(self.inner);
589 }
590
591 if !self.valid() {
592 return None;
593 }
594
595 // this drops which in turn frees the skipped batch
596 batch = WriteBatch {
597 inner: unsafe { ffi::rocksdb_wal_iter_get_batch(self.inner, &mut seq) },
598 };
599 }
600
601 if !self.valid() {
602 return self.status().err().map(Result::Err);
603 }
604
605 // Seek to the next write batch.
606 // Note that WriteBatches live independently of the WAL iterator so this is safe to do
607 unsafe {
608 ffi::rocksdb_wal_iter_next(self.inner);
609 }
610
611 Some(Ok((seq, batch)))
612 }
613}
614
615impl Drop for DBWALIterator {
616 fn drop(&mut self) {
617 unsafe {
618 ffi::rocksdb_wal_iter_destroy(self.inner);
619 }
620 }
621}