rocksdb/
db.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use crate::{
17    column_family::AsColumnFamilyRef,
18    column_family::BoundColumnFamily,
19    column_family::UnboundColumnFamily,
20    db_options::OptionsMustOutliveDB,
21    ffi,
22    ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23    ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24    DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25    IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26    WaitForCompactOptions, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
27};
28
29use crate::ffi_util::CSlice;
30use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
31use std::collections::BTreeMap;
32use std::ffi::{CStr, CString};
33use std::fmt;
34use std::fs;
35use std::iter;
36use std::path::Path;
37use std::path::PathBuf;
38use std::ptr;
39use std::slice;
40use std::str;
41use std::sync::Arc;
42use std::sync::RwLock;
43use std::time::Duration;
44
45/// Marker trait to specify single or multi threaded column family alternations for
46/// [`DBWithThreadMode<T>`]
47///
48/// This arrangement makes differences in self mutability and return type in
49/// some of `DBWithThreadMode` methods.
50///
51/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
52/// also has a minimum set of not-encapsulated internal methods between
53/// [`SingleThreaded`] and [`MultiThreaded`].  These methods aren't expected to be
54/// called and defined externally.
55pub trait ThreadMode {
56    /// Internal implementation for storing column family handles
57    fn new_cf_map_internal(
58        cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
59    ) -> Self;
60    /// Internal implementation for dropping column family handles
61    fn drop_all_cfs_internal(&mut self);
62}
63
64/// Actual marker type for the marker trait `ThreadMode`, which holds
65/// a collection of column families without synchronization primitive, providing
66/// no overhead for the single-threaded column family alternations. The other
67/// mode is [`MultiThreaded`].
68///
69/// See [`DB`] for more details, including performance implications for each mode
70pub struct SingleThreaded {
71    pub(crate) cfs: BTreeMap<String, ColumnFamily>,
72}
73
74/// Actual marker type for the marker trait `ThreadMode`, which holds
75/// a collection of column families wrapped in a RwLock to be mutated
76/// concurrently. The other mode is [`SingleThreaded`].
77///
78/// See [`DB`] for more details, including performance implications for each mode
79pub struct MultiThreaded {
80    pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
81}
82
83impl ThreadMode for SingleThreaded {
84    fn new_cf_map_internal(
85        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
86    ) -> Self {
87        Self {
88            cfs: cfs
89                .into_iter()
90                .map(|(n, c)| (n, ColumnFamily { inner: c }))
91                .collect(),
92        }
93    }
94
95    fn drop_all_cfs_internal(&mut self) {
96        // Cause all ColumnFamily objects to be Drop::drop()-ed.
97        self.cfs.clear();
98    }
99}
100
101impl ThreadMode for MultiThreaded {
102    fn new_cf_map_internal(
103        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
104    ) -> Self {
105        Self {
106            cfs: RwLock::new(
107                cfs.into_iter()
108                    .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
109                    .collect(),
110            ),
111        }
112    }
113
114    fn drop_all_cfs_internal(&mut self) {
115        // Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
116        self.cfs.write().unwrap().clear();
117    }
118}
119
120/// Get underlying `rocksdb_t`.
121pub trait DBInner {
122    fn inner(&self) -> *mut ffi::rocksdb_t;
123}
124
125/// A helper type to implement some common methods for [`DBWithThreadMode`]
126/// and [`OptimisticTransactionDB`].
127///
128/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
129pub struct DBCommon<T: ThreadMode, D: DBInner> {
130    pub(crate) inner: D,
131    cfs: T, // Column families are held differently depending on thread mode
132    path: PathBuf,
133    _outlive: Vec<OptionsMustOutliveDB>,
134}
135
136/// Minimal set of DB-related methods, intended to be generic over
137/// `DBWithThreadMode<T>`. Mainly used internally
138pub trait DBAccess {
139    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
140
141    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
142
143    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
144
145    unsafe fn create_iterator_cf(
146        &self,
147        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
148        readopts: &ReadOptions,
149    ) -> *mut ffi::rocksdb_iterator_t;
150
151    fn get_opt<K: AsRef<[u8]>>(
152        &self,
153        key: K,
154        readopts: &ReadOptions,
155    ) -> Result<Option<Vec<u8>>, Error>;
156
157    fn get_cf_opt<K: AsRef<[u8]>>(
158        &self,
159        cf: &impl AsColumnFamilyRef,
160        key: K,
161        readopts: &ReadOptions,
162    ) -> Result<Option<Vec<u8>>, Error>;
163
164    fn get_pinned_opt<K: AsRef<[u8]>>(
165        &self,
166        key: K,
167        readopts: &ReadOptions,
168    ) -> Result<Option<DBPinnableSlice>, Error>;
169
170    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
171        &self,
172        cf: &impl AsColumnFamilyRef,
173        key: K,
174        readopts: &ReadOptions,
175    ) -> Result<Option<DBPinnableSlice>, Error>;
176
177    fn multi_get_opt<K, I>(
178        &self,
179        keys: I,
180        readopts: &ReadOptions,
181    ) -> Vec<Result<Option<Vec<u8>>, Error>>
182    where
183        K: AsRef<[u8]>,
184        I: IntoIterator<Item = K>;
185
186    fn multi_get_cf_opt<'b, K, I, W>(
187        &self,
188        keys_cf: I,
189        readopts: &ReadOptions,
190    ) -> Vec<Result<Option<Vec<u8>>, Error>>
191    where
192        K: AsRef<[u8]>,
193        I: IntoIterator<Item = (&'b W, K)>,
194        W: AsColumnFamilyRef + 'b;
195}
196
197impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
198    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
199        ffi::rocksdb_create_snapshot(self.inner.inner())
200    }
201
202    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
203        ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
204    }
205
206    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
207        ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
208    }
209
210    unsafe fn create_iterator_cf(
211        &self,
212        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
213        readopts: &ReadOptions,
214    ) -> *mut ffi::rocksdb_iterator_t {
215        ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
216    }
217
218    fn get_opt<K: AsRef<[u8]>>(
219        &self,
220        key: K,
221        readopts: &ReadOptions,
222    ) -> Result<Option<Vec<u8>>, Error> {
223        self.get_opt(key, readopts)
224    }
225
226    fn get_cf_opt<K: AsRef<[u8]>>(
227        &self,
228        cf: &impl AsColumnFamilyRef,
229        key: K,
230        readopts: &ReadOptions,
231    ) -> Result<Option<Vec<u8>>, Error> {
232        self.get_cf_opt(cf, key, readopts)
233    }
234
235    fn get_pinned_opt<K: AsRef<[u8]>>(
236        &self,
237        key: K,
238        readopts: &ReadOptions,
239    ) -> Result<Option<DBPinnableSlice>, Error> {
240        self.get_pinned_opt(key, readopts)
241    }
242
243    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
244        &self,
245        cf: &impl AsColumnFamilyRef,
246        key: K,
247        readopts: &ReadOptions,
248    ) -> Result<Option<DBPinnableSlice>, Error> {
249        self.get_pinned_cf_opt(cf, key, readopts)
250    }
251
252    fn multi_get_opt<K, Iter>(
253        &self,
254        keys: Iter,
255        readopts: &ReadOptions,
256    ) -> Vec<Result<Option<Vec<u8>>, Error>>
257    where
258        K: AsRef<[u8]>,
259        Iter: IntoIterator<Item = K>,
260    {
261        self.multi_get_opt(keys, readopts)
262    }
263
264    fn multi_get_cf_opt<'b, K, Iter, W>(
265        &self,
266        keys_cf: Iter,
267        readopts: &ReadOptions,
268    ) -> Vec<Result<Option<Vec<u8>>, Error>>
269    where
270        K: AsRef<[u8]>,
271        Iter: IntoIterator<Item = (&'b W, K)>,
272        W: AsColumnFamilyRef + 'b,
273    {
274        self.multi_get_cf_opt(keys_cf, readopts)
275    }
276}
277
278pub struct DBWithThreadModeInner {
279    inner: *mut ffi::rocksdb_t,
280}
281
282impl DBInner for DBWithThreadModeInner {
283    fn inner(&self) -> *mut ffi::rocksdb_t {
284        self.inner
285    }
286}
287
288impl Drop for DBWithThreadModeInner {
289    fn drop(&mut self) {
290        unsafe {
291            ffi::rocksdb_close(self.inner);
292        }
293    }
294}
295
296/// A type alias to RocksDB database.
297///
298/// See crate level documentation for a simple usage example.
299/// See [`DBCommon`] for full list of methods.
300pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
301
302/// A type alias to DB instance type with the single-threaded column family
303/// creations/deletions
304///
305/// # Compatibility and multi-threaded mode
306///
307/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
308/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
309/// column family alternations.
310///
311/// # Limited performance implication for single-threaded mode
312///
313/// Even with [`SingleThreaded`], almost all of RocksDB operations is
314/// multi-threaded unless the underlying RocksDB instance is
315/// specifically configured otherwise. `SingleThreaded` only forces
316/// serialization of column family alternations by requiring `&mut self` of DB
317/// instance due to its wrapper implementation details.
318///
319/// # Multi-threaded mode
320///
321/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
322/// workload including multi-threaded column family alternations, costing the
323/// RwLock overhead inside `DB`.
324#[cfg(not(feature = "multi-threaded-cf"))]
325pub type DB = DBWithThreadMode<SingleThreaded>;
326
327#[cfg(feature = "multi-threaded-cf")]
328pub type DB = DBWithThreadMode<MultiThreaded>;
329
330// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
331// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
332// rocksdb internally does not rely on thread-local information for its user-exposed types.
333unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
334
335// Sync is similarly safe for many types because they do not expose interior mutability, and their
336// use within the rocksdb library is generally behind a const reference
337unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
338
339// Specifies whether open DB for read only.
340enum AccessType<'a> {
341    ReadWrite,
342    ReadOnly { error_if_log_file_exist: bool },
343    Secondary { secondary_path: &'a Path },
344    WithTTL { ttl: Duration },
345}
346
347/// Methods of `DBWithThreadMode`.
348impl<T: ThreadMode> DBWithThreadMode<T> {
349    /// Opens a database with default options.
350    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
351        let mut opts = Options::default();
352        opts.create_if_missing(true);
353        Self::open(&opts, path)
354    }
355
356    /// Opens the database with the specified options.
357    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
358        Self::open_cf(opts, path, None::<&str>)
359    }
360
361    /// Opens the database for read only with the specified options.
362    pub fn open_for_read_only<P: AsRef<Path>>(
363        opts: &Options,
364        path: P,
365        error_if_log_file_exist: bool,
366    ) -> Result<Self, Error> {
367        Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
368    }
369
370    /// Opens the database as a secondary.
371    pub fn open_as_secondary<P: AsRef<Path>>(
372        opts: &Options,
373        primary_path: P,
374        secondary_path: P,
375    ) -> Result<Self, Error> {
376        Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
377    }
378
379    /// Opens the database with a Time to Live compaction filter.
380    pub fn open_with_ttl<P: AsRef<Path>>(
381        opts: &Options,
382        path: P,
383        ttl: Duration,
384    ) -> Result<Self, Error> {
385        Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
386    }
387
388    /// Opens the database with a Time to Live compaction filter and column family names.
389    ///
390    /// Column families opened using this function will be created with default `Options`.
391    pub fn open_cf_with_ttl<P, I, N>(
392        opts: &Options,
393        path: P,
394        cfs: I,
395        ttl: Duration,
396    ) -> Result<Self, Error>
397    where
398        P: AsRef<Path>,
399        I: IntoIterator<Item = N>,
400        N: AsRef<str>,
401    {
402        let cfs = cfs
403            .into_iter()
404            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
405
406        Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
407    }
408
409    /// Opens a database with the given database with a Time to Live compaction filter and
410    /// column family descriptors.
411    pub fn open_cf_descriptors_with_ttl<P, I>(
412        opts: &Options,
413        path: P,
414        cfs: I,
415        ttl: Duration,
416    ) -> Result<Self, Error>
417    where
418        P: AsRef<Path>,
419        I: IntoIterator<Item = ColumnFamilyDescriptor>,
420    {
421        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
422    }
423
424    /// Opens a database with the given database options and column family names.
425    ///
426    /// Column families opened using this function will be created with default `Options`.
427    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
428    where
429        P: AsRef<Path>,
430        I: IntoIterator<Item = N>,
431        N: AsRef<str>,
432    {
433        let cfs = cfs
434            .into_iter()
435            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
436
437        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
438    }
439
440    /// Opens a database with the given database options and column family names.
441    ///
442    /// Column families opened using given `Options`.
443    pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
444    where
445        P: AsRef<Path>,
446        I: IntoIterator<Item = (N, Options)>,
447        N: AsRef<str>,
448    {
449        let cfs = cfs
450            .into_iter()
451            .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
452
453        Self::open_cf_descriptors(opts, path, cfs)
454    }
455
456    /// Opens a database for read only with the given database options and column family names.
457    pub fn open_cf_for_read_only<P, I, N>(
458        opts: &Options,
459        path: P,
460        cfs: I,
461        error_if_log_file_exist: bool,
462    ) -> Result<Self, Error>
463    where
464        P: AsRef<Path>,
465        I: IntoIterator<Item = N>,
466        N: AsRef<str>,
467    {
468        let cfs = cfs
469            .into_iter()
470            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
471
472        Self::open_cf_descriptors_internal(
473            opts,
474            path,
475            cfs,
476            &AccessType::ReadOnly {
477                error_if_log_file_exist,
478            },
479        )
480    }
481
482    /// Opens a database for read only with the given database options and column family names.
483    pub fn open_cf_with_opts_for_read_only<P, I, N>(
484        db_opts: &Options,
485        path: P,
486        cfs: I,
487        error_if_log_file_exist: bool,
488    ) -> Result<Self, Error>
489    where
490        P: AsRef<Path>,
491        I: IntoIterator<Item = (N, Options)>,
492        N: AsRef<str>,
493    {
494        let cfs = cfs
495            .into_iter()
496            .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
497
498        Self::open_cf_descriptors_internal(
499            db_opts,
500            path,
501            cfs,
502            &AccessType::ReadOnly {
503                error_if_log_file_exist,
504            },
505        )
506    }
507
508    /// Opens a database for ready only with the given database options and
509    /// column family descriptors.
510    pub fn open_cf_descriptors_read_only<P, I>(
511        opts: &Options,
512        path: P,
513        cfs: I,
514        error_if_log_file_exist: bool,
515    ) -> Result<Self, Error>
516    where
517        P: AsRef<Path>,
518        I: IntoIterator<Item = ColumnFamilyDescriptor>,
519    {
520        Self::open_cf_descriptors_internal(
521            opts,
522            path,
523            cfs,
524            &AccessType::ReadOnly {
525                error_if_log_file_exist,
526            },
527        )
528    }
529
530    /// Opens the database as a secondary with the given database options and column family names.
531    pub fn open_cf_as_secondary<P, I, N>(
532        opts: &Options,
533        primary_path: P,
534        secondary_path: P,
535        cfs: I,
536    ) -> Result<Self, Error>
537    where
538        P: AsRef<Path>,
539        I: IntoIterator<Item = N>,
540        N: AsRef<str>,
541    {
542        let cfs = cfs
543            .into_iter()
544            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
545
546        Self::open_cf_descriptors_internal(
547            opts,
548            primary_path,
549            cfs,
550            &AccessType::Secondary {
551                secondary_path: secondary_path.as_ref(),
552            },
553        )
554    }
555
556    /// Opens the database as a secondary with the given database options and
557    /// column family descriptors.
558    pub fn open_cf_descriptors_as_secondary<P, I>(
559        opts: &Options,
560        path: P,
561        secondary_path: P,
562        cfs: I,
563    ) -> Result<Self, Error>
564    where
565        P: AsRef<Path>,
566        I: IntoIterator<Item = ColumnFamilyDescriptor>,
567    {
568        Self::open_cf_descriptors_internal(
569            opts,
570            path,
571            cfs,
572            &AccessType::Secondary {
573                secondary_path: secondary_path.as_ref(),
574            },
575        )
576    }
577
578    /// Opens a database with the given database options and column family descriptors.
579    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
580    where
581        P: AsRef<Path>,
582        I: IntoIterator<Item = ColumnFamilyDescriptor>,
583    {
584        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
585    }
586
587    /// Internal implementation for opening RocksDB.
588    fn open_cf_descriptors_internal<P, I>(
589        opts: &Options,
590        path: P,
591        cfs: I,
592        access_type: &AccessType,
593    ) -> Result<Self, Error>
594    where
595        P: AsRef<Path>,
596        I: IntoIterator<Item = ColumnFamilyDescriptor>,
597    {
598        let cfs: Vec<_> = cfs.into_iter().collect();
599        let outlive = iter::once(opts.outlive.clone())
600            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
601            .collect();
602
603        let cpath = to_cpath(&path)?;
604
605        if let Err(e) = fs::create_dir_all(&path) {
606            return Err(Error::new(format!(
607                "Failed to create RocksDB directory: `{e:?}`."
608            )));
609        }
610
611        let db: *mut ffi::rocksdb_t;
612        let mut cf_map = BTreeMap::new();
613
614        if cfs.is_empty() {
615            db = Self::open_raw(opts, &cpath, access_type)?;
616        } else {
617            let mut cfs_v = cfs;
618            // Always open the default column family.
619            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
620                cfs_v.push(ColumnFamilyDescriptor {
621                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
622                    options: Options::default(),
623                });
624            }
625            // We need to store our CStrings in an intermediate vector
626            // so that their pointers remain valid.
627            let c_cfs: Vec<CString> = cfs_v
628                .iter()
629                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
630                .collect();
631
632            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
633
634            // These handles will be populated by DB.
635            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
636
637            let cfopts: Vec<_> = cfs_v
638                .iter()
639                .map(|cf| cf.options.inner as *const _)
640                .collect();
641
642            db = Self::open_cf_raw(
643                opts,
644                &cpath,
645                &cfs_v,
646                &cfnames,
647                &cfopts,
648                &mut cfhandles,
649                access_type,
650            )?;
651            for handle in &cfhandles {
652                if handle.is_null() {
653                    return Err(Error::new(
654                        "Received null column family handle from DB.".to_owned(),
655                    ));
656                }
657            }
658
659            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
660                cf_map.insert(cf_desc.name.clone(), inner);
661            }
662        }
663
664        if db.is_null() {
665            return Err(Error::new("Could not initialize database.".to_owned()));
666        }
667
668        Ok(Self {
669            inner: DBWithThreadModeInner { inner: db },
670            path: path.as_ref().to_path_buf(),
671            cfs: T::new_cf_map_internal(cf_map),
672            _outlive: outlive,
673        })
674    }
675
676    fn open_raw(
677        opts: &Options,
678        cpath: &CString,
679        access_type: &AccessType,
680    ) -> Result<*mut ffi::rocksdb_t, Error> {
681        let db = unsafe {
682            match *access_type {
683                AccessType::ReadOnly {
684                    error_if_log_file_exist,
685                } => ffi_try!(ffi::rocksdb_open_for_read_only(
686                    opts.inner,
687                    cpath.as_ptr(),
688                    c_uchar::from(error_if_log_file_exist),
689                )),
690                AccessType::ReadWrite => {
691                    ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
692                }
693                AccessType::Secondary { secondary_path } => {
694                    ffi_try!(ffi::rocksdb_open_as_secondary(
695                        opts.inner,
696                        cpath.as_ptr(),
697                        to_cpath(secondary_path)?.as_ptr(),
698                    ))
699                }
700                AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
701                    opts.inner,
702                    cpath.as_ptr(),
703                    ttl.as_secs() as c_int,
704                )),
705            }
706        };
707        Ok(db)
708    }
709
710    #[allow(clippy::pedantic)]
711    fn open_cf_raw(
712        opts: &Options,
713        cpath: &CString,
714        cfs_v: &[ColumnFamilyDescriptor],
715        cfnames: &[*const c_char],
716        cfopts: &[*const ffi::rocksdb_options_t],
717        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
718        access_type: &AccessType,
719    ) -> Result<*mut ffi::rocksdb_t, Error> {
720        let db = unsafe {
721            match *access_type {
722                AccessType::ReadOnly {
723                    error_if_log_file_exist,
724                } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
725                    opts.inner,
726                    cpath.as_ptr(),
727                    cfs_v.len() as c_int,
728                    cfnames.as_ptr(),
729                    cfopts.as_ptr(),
730                    cfhandles.as_mut_ptr(),
731                    c_uchar::from(error_if_log_file_exist),
732                )),
733                AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
734                    opts.inner,
735                    cpath.as_ptr(),
736                    cfs_v.len() as c_int,
737                    cfnames.as_ptr(),
738                    cfopts.as_ptr(),
739                    cfhandles.as_mut_ptr(),
740                )),
741                AccessType::Secondary { secondary_path } => {
742                    ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
743                        opts.inner,
744                        cpath.as_ptr(),
745                        to_cpath(secondary_path)?.as_ptr(),
746                        cfs_v.len() as c_int,
747                        cfnames.as_ptr(),
748                        cfopts.as_ptr(),
749                        cfhandles.as_mut_ptr(),
750                    ))
751                }
752                AccessType::WithTTL { ttl } => {
753                    let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
754                    ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
755                        opts.inner,
756                        cpath.as_ptr(),
757                        cfs_v.len() as c_int,
758                        cfnames.as_ptr(),
759                        cfopts.as_ptr(),
760                        cfhandles.as_mut_ptr(),
761                        ttls_v.as_ptr(),
762                    ))
763                }
764            }
765        };
766        Ok(db)
767    }
768
769    /// Removes the database entries in the range `["from", "to")` using given write options.
770    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
771        &self,
772        cf: &impl AsColumnFamilyRef,
773        from: K,
774        to: K,
775        writeopts: &WriteOptions,
776    ) -> Result<(), Error> {
777        let from = from.as_ref();
778        let to = to.as_ref();
779
780        unsafe {
781            ffi_try!(ffi::rocksdb_delete_range_cf(
782                self.inner.inner(),
783                writeopts.inner,
784                cf.inner(),
785                from.as_ptr() as *const c_char,
786                from.len() as size_t,
787                to.as_ptr() as *const c_char,
788                to.len() as size_t,
789            ));
790            Ok(())
791        }
792    }
793
794    /// Removes the database entries in the range `["from", "to")` using default write options.
795    pub fn delete_range_cf<K: AsRef<[u8]>>(
796        &self,
797        cf: &impl AsColumnFamilyRef,
798        from: K,
799        to: K,
800    ) -> Result<(), Error> {
801        self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
802    }
803
804    pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
805        unsafe {
806            ffi_try!(ffi::rocksdb_write(
807                self.inner.inner(),
808                writeopts.inner,
809                batch.inner
810            ));
811        }
812        Ok(())
813    }
814
815    pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
816        self.write_opt(batch, &WriteOptions::default())
817    }
818
819    pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
820        let mut wo = WriteOptions::new();
821        wo.disable_wal(true);
822        self.write_opt(batch, &wo)
823    }
824}
825
826/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
827impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
828    pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
829        Self {
830            inner,
831            cfs,
832            path,
833            _outlive: outlive,
834        }
835    }
836
837    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
838        let cpath = to_cpath(path)?;
839        let mut length = 0;
840
841        unsafe {
842            let ptr = ffi_try!(ffi::rocksdb_list_column_families(
843                opts.inner,
844                cpath.as_ptr(),
845                &mut length,
846            ));
847
848            let vec = slice::from_raw_parts(ptr, length)
849                .iter()
850                .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
851                .collect();
852            ffi::rocksdb_list_column_families_destroy(ptr, length);
853            Ok(vec)
854        }
855    }
856
857    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
858        let cpath = to_cpath(path)?;
859        unsafe {
860            ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
861        }
862        Ok(())
863    }
864
865    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
866        let cpath = to_cpath(path)?;
867        unsafe {
868            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
869        }
870        Ok(())
871    }
872
873    pub fn path(&self) -> &Path {
874        self.path.as_path()
875    }
876
877    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
878    /// the data to disk.
879    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
880        unsafe {
881            ffi_try!(ffi::rocksdb_flush_wal(
882                self.inner.inner(),
883                c_uchar::from(sync)
884            ));
885        }
886        Ok(())
887    }
888
889    /// Flushes database memtables to SST files on the disk.
890    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
891        unsafe {
892            ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
893        }
894        Ok(())
895    }
896
897    /// Flushes database memtables to SST files on the disk using default options.
898    pub fn flush(&self) -> Result<(), Error> {
899        self.flush_opt(&FlushOptions::default())
900    }
901
902    /// Flushes database memtables to SST files on the disk for a given column family.
903    pub fn flush_cf_opt(
904        &self,
905        cf: &impl AsColumnFamilyRef,
906        flushopts: &FlushOptions,
907    ) -> Result<(), Error> {
908        unsafe {
909            ffi_try!(ffi::rocksdb_flush_cf(
910                self.inner.inner(),
911                flushopts.inner,
912                cf.inner()
913            ));
914        }
915        Ok(())
916    }
917
918    /// Flushes multiple column families.
919    ///
920    /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
921    /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
922    /// number at the time when flush is requested.
923    pub fn flush_cfs_opt(
924        &self,
925        cfs: &[&impl AsColumnFamilyRef],
926        opts: &FlushOptions,
927    ) -> Result<(), Error> {
928        let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
929        unsafe {
930            ffi_try!(ffi::rocksdb_flush_cfs(
931                self.inner.inner(),
932                opts.inner,
933                cfs.as_mut_ptr(),
934                cfs.len() as libc::c_int,
935            ));
936        }
937        Ok(())
938    }
939
940    /// Flushes database memtables to SST files on the disk for a given column family using default
941    /// options.
942    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
943        self.flush_cf_opt(cf, &FlushOptions::default())
944    }
945
946    /// Return the bytes associated with a key value with read options. If you only intend to use
947    /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
948    /// to avoid unnecessary memory copy.
949    pub fn get_opt<K: AsRef<[u8]>>(
950        &self,
951        key: K,
952        readopts: &ReadOptions,
953    ) -> Result<Option<Vec<u8>>, Error> {
954        self.get_pinned_opt(key, readopts)
955            .map(|x| x.map(|v| v.as_ref().to_vec()))
956    }
957
958    /// Return the bytes associated with a key value. If you only intend to use the vector returned
959    /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
960    /// copy.
961    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
962        self.get_opt(key.as_ref(), &ReadOptions::default())
963    }
964
965    /// Return the bytes associated with a key value and the given column family with read options.
966    /// If you only intend to use the vector returned temporarily, consider using
967    /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
968    pub fn get_cf_opt<K: AsRef<[u8]>>(
969        &self,
970        cf: &impl AsColumnFamilyRef,
971        key: K,
972        readopts: &ReadOptions,
973    ) -> Result<Option<Vec<u8>>, Error> {
974        self.get_pinned_cf_opt(cf, key, readopts)
975            .map(|x| x.map(|v| v.as_ref().to_vec()))
976    }
977
978    /// Return the bytes associated with a key value and the given column family. If you only
979    /// intend to use the vector returned temporarily, consider using
980    /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
981    pub fn get_cf<K: AsRef<[u8]>>(
982        &self,
983        cf: &impl AsColumnFamilyRef,
984        key: K,
985    ) -> Result<Option<Vec<u8>>, Error> {
986        self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
987    }
988
989    /// Return the value associated with a key using RocksDB's PinnableSlice
990    /// so as to avoid unnecessary memory copy.
991    pub fn get_pinned_opt<K: AsRef<[u8]>>(
992        &self,
993        key: K,
994        readopts: &ReadOptions,
995    ) -> Result<Option<DBPinnableSlice>, Error> {
996        if readopts.inner.is_null() {
997            return Err(Error::new(
998                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
999                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1000                    .to_owned(),
1001            ));
1002        }
1003
1004        let key = key.as_ref();
1005        unsafe {
1006            let val = ffi_try!(ffi::rocksdb_get_pinned(
1007                self.inner.inner(),
1008                readopts.inner,
1009                key.as_ptr() as *const c_char,
1010                key.len() as size_t,
1011            ));
1012            if val.is_null() {
1013                Ok(None)
1014            } else {
1015                Ok(Some(DBPinnableSlice::from_c(val)))
1016            }
1017        }
1018    }
1019
1020    /// Return the value associated with a key using RocksDB's PinnableSlice
1021    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1022    /// leverages default options.
1023    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1024        self.get_pinned_opt(key, &ReadOptions::default())
1025    }
1026
1027    /// Return the value associated with a key using RocksDB's PinnableSlice
1028    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1029    /// allows specifying ColumnFamily
1030    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1031        &self,
1032        cf: &impl AsColumnFamilyRef,
1033        key: K,
1034        readopts: &ReadOptions,
1035    ) -> Result<Option<DBPinnableSlice>, Error> {
1036        if readopts.inner.is_null() {
1037            return Err(Error::new(
1038                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1039                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1040                    .to_owned(),
1041            ));
1042        }
1043
1044        let key = key.as_ref();
1045        unsafe {
1046            let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1047                self.inner.inner(),
1048                readopts.inner,
1049                cf.inner(),
1050                key.as_ptr() as *const c_char,
1051                key.len() as size_t,
1052            ));
1053            if val.is_null() {
1054                Ok(None)
1055            } else {
1056                Ok(Some(DBPinnableSlice::from_c(val)))
1057            }
1058        }
1059    }
1060
1061    /// Return the value associated with a key using RocksDB's PinnableSlice
1062    /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
1063    /// leverages default options.
1064    pub fn get_pinned_cf<K: AsRef<[u8]>>(
1065        &self,
1066        cf: &impl AsColumnFamilyRef,
1067        key: K,
1068    ) -> Result<Option<DBPinnableSlice>, Error> {
1069        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1070    }
1071
1072    /// Return the values associated with the given keys.
1073    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1074    where
1075        K: AsRef<[u8]>,
1076        I: IntoIterator<Item = K>,
1077    {
1078        self.multi_get_opt(keys, &ReadOptions::default())
1079    }
1080
1081    /// Return the values associated with the given keys using read options.
1082    pub fn multi_get_opt<K, I>(
1083        &self,
1084        keys: I,
1085        readopts: &ReadOptions,
1086    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1087    where
1088        K: AsRef<[u8]>,
1089        I: IntoIterator<Item = K>,
1090    {
1091        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1092            .into_iter()
1093            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1094            .unzip();
1095        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1096
1097        let mut values = vec![ptr::null_mut(); keys.len()];
1098        let mut values_sizes = vec![0_usize; keys.len()];
1099        let mut errors = vec![ptr::null_mut(); keys.len()];
1100        unsafe {
1101            ffi::rocksdb_multi_get(
1102                self.inner.inner(),
1103                readopts.inner,
1104                ptr_keys.len(),
1105                ptr_keys.as_ptr(),
1106                keys_sizes.as_ptr(),
1107                values.as_mut_ptr(),
1108                values_sizes.as_mut_ptr(),
1109                errors.as_mut_ptr(),
1110            );
1111        }
1112
1113        convert_values(values, values_sizes, errors)
1114    }
1115
1116    /// Return the values associated with the given keys and column families.
1117    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1118        &'a self,
1119        keys: I,
1120    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1121    where
1122        K: AsRef<[u8]>,
1123        I: IntoIterator<Item = (&'b W, K)>,
1124        W: 'b + AsColumnFamilyRef,
1125    {
1126        self.multi_get_cf_opt(keys, &ReadOptions::default())
1127    }
1128
1129    /// Return the values associated with the given keys and column families using read options.
1130    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1131        &'a self,
1132        keys: I,
1133        readopts: &ReadOptions,
1134    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1135    where
1136        K: AsRef<[u8]>,
1137        I: IntoIterator<Item = (&'b W, K)>,
1138        W: 'b + AsColumnFamilyRef,
1139    {
1140        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1141            .into_iter()
1142            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1143            .unzip();
1144        let ptr_keys: Vec<_> = cfs_and_keys
1145            .iter()
1146            .map(|(_, k)| k.as_ptr() as *const c_char)
1147            .collect();
1148        let ptr_cfs: Vec<_> = cfs_and_keys
1149            .iter()
1150            .map(|(c, _)| c.inner() as *const _)
1151            .collect();
1152
1153        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1154        let mut values_sizes = vec![0_usize; ptr_keys.len()];
1155        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1156        unsafe {
1157            ffi::rocksdb_multi_get_cf(
1158                self.inner.inner(),
1159                readopts.inner,
1160                ptr_cfs.as_ptr(),
1161                ptr_keys.len(),
1162                ptr_keys.as_ptr(),
1163                keys_sizes.as_ptr(),
1164                values.as_mut_ptr(),
1165                values_sizes.as_mut_ptr(),
1166                errors.as_mut_ptr(),
1167            );
1168        }
1169
1170        convert_values(values, values_sizes, errors)
1171    }
1172
1173    /// Return the values associated with the given keys and the specified column family
1174    /// where internally the read requests are processed in batch if block-based table
1175    /// SST format is used.  It is a more optimized version of multi_get_cf.
1176    pub fn batched_multi_get_cf<'a, K, I>(
1177        &self,
1178        cf: &impl AsColumnFamilyRef,
1179        keys: I,
1180        sorted_input: bool,
1181    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1182    where
1183        K: AsRef<[u8]> + 'a + ?Sized,
1184        I: IntoIterator<Item = &'a K>,
1185    {
1186        self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1187    }
1188
1189    /// Return the values associated with the given keys and the specified column family
1190    /// where internally the read requests are processed in batch if block-based table
1191    /// SST format is used. It is a more optimized version of multi_get_cf_opt.
1192    pub fn batched_multi_get_cf_opt<'a, K, I>(
1193        &self,
1194        cf: &impl AsColumnFamilyRef,
1195        keys: I,
1196        sorted_input: bool,
1197        readopts: &ReadOptions,
1198    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1199    where
1200        K: AsRef<[u8]> + 'a + ?Sized,
1201        I: IntoIterator<Item = &'a K>,
1202    {
1203        let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1204            .into_iter()
1205            .map(|k| {
1206                let k = k.as_ref();
1207                (k.as_ptr() as *const c_char, k.len())
1208            })
1209            .unzip();
1210
1211        let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1212        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1213
1214        unsafe {
1215            ffi::rocksdb_batched_multi_get_cf(
1216                self.inner.inner(),
1217                readopts.inner,
1218                cf.inner(),
1219                ptr_keys.len(),
1220                ptr_keys.as_ptr(),
1221                keys_sizes.as_ptr(),
1222                pinned_values.as_mut_ptr(),
1223                errors.as_mut_ptr(),
1224                sorted_input,
1225            );
1226            pinned_values
1227                .into_iter()
1228                .zip(errors.into_iter())
1229                .map(|(v, e)| {
1230                    if e.is_null() {
1231                        if v.is_null() {
1232                            Ok(None)
1233                        } else {
1234                            Ok(Some(DBPinnableSlice::from_c(v)))
1235                        }
1236                    } else {
1237                        Err(Error::new(crate::ffi_util::error_message(e)))
1238                    }
1239                })
1240                .collect()
1241        }
1242    }
1243
1244    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1245    /// `true`. This function uses default `ReadOptions`.
1246    pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1247        self.key_may_exist_opt(key, &ReadOptions::default())
1248    }
1249
1250    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1251    /// `true`.
1252    pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1253        let key = key.as_ref();
1254        unsafe {
1255            0 != ffi::rocksdb_key_may_exist(
1256                self.inner.inner(),
1257                readopts.inner,
1258                key.as_ptr() as *const c_char,
1259                key.len() as size_t,
1260                ptr::null_mut(), /*value*/
1261                ptr::null_mut(), /*val_len*/
1262                ptr::null(),     /*timestamp*/
1263                0,               /*timestamp_len*/
1264                ptr::null_mut(), /*value_found*/
1265            )
1266        }
1267    }
1268
1269    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1270    /// otherwise returns `true`. This function uses default `ReadOptions`.
1271    pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1272        self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1273    }
1274
1275    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1276    /// otherwise returns `true`.
1277    pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1278        &self,
1279        cf: &impl AsColumnFamilyRef,
1280        key: K,
1281        readopts: &ReadOptions,
1282    ) -> bool {
1283        let key = key.as_ref();
1284        0 != unsafe {
1285            ffi::rocksdb_key_may_exist_cf(
1286                self.inner.inner(),
1287                readopts.inner,
1288                cf.inner(),
1289                key.as_ptr() as *const c_char,
1290                key.len() as size_t,
1291                ptr::null_mut(), /*value*/
1292                ptr::null_mut(), /*val_len*/
1293                ptr::null(),     /*timestamp*/
1294                0,               /*timestamp_len*/
1295                ptr::null_mut(), /*value_found*/
1296            )
1297        }
1298    }
1299
1300    /// If the key definitely does not exist in the database, then this method
1301    /// returns `(false, None)`, else `(true, None)` if it may.
1302    /// If the key is found in memory, then it returns `(true, Some<CSlice>)`.
1303    ///
1304    /// This check is potentially lighter-weight than calling `get()`. One way
1305    /// to make this lighter weight is to avoid doing any IOs.
1306    pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1307        &self,
1308        cf: &impl AsColumnFamilyRef,
1309        key: K,
1310        readopts: &ReadOptions,
1311    ) -> (bool, Option<CSlice>) {
1312        let key = key.as_ref();
1313        let mut val: *mut c_char = ptr::null_mut();
1314        let mut val_len: usize = 0;
1315        let mut value_found: c_uchar = 0;
1316        let may_exists = 0
1317            != unsafe {
1318                ffi::rocksdb_key_may_exist_cf(
1319                    self.inner.inner(),
1320                    readopts.inner,
1321                    cf.inner(),
1322                    key.as_ptr() as *const c_char,
1323                    key.len() as size_t,
1324                    &mut val,         /*value*/
1325                    &mut val_len,     /*val_len*/
1326                    ptr::null(),      /*timestamp*/
1327                    0,                /*timestamp_len*/
1328                    &mut value_found, /*value_found*/
1329                )
1330            };
1331        // The value is only allocated (using malloc) and returned if it is found and
1332        // value_found isn't NULL. In that case the user is responsible for freeing it.
1333        if may_exists && value_found != 0 {
1334            (
1335                may_exists,
1336                Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1337            )
1338        } else {
1339            (may_exists, None)
1340        }
1341    }
1342
1343    fn create_inner_cf_handle(
1344        &self,
1345        name: impl CStrLike,
1346        opts: &Options,
1347    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1348        let cf_name = name.bake().map_err(|err| {
1349            Error::new(format!(
1350                "Failed to convert path to CString when creating cf: {err}"
1351            ))
1352        })?;
1353        Ok(unsafe {
1354            ffi_try!(ffi::rocksdb_create_column_family(
1355                self.inner.inner(),
1356                opts.inner,
1357                cf_name.as_ptr(),
1358            ))
1359        })
1360    }
1361
1362    pub fn iterator<'a: 'b, 'b>(
1363        &'a self,
1364        mode: IteratorMode,
1365    ) -> DBIteratorWithThreadMode<'b, Self> {
1366        let readopts = ReadOptions::default();
1367        self.iterator_opt(mode, readopts)
1368    }
1369
1370    pub fn iterator_opt<'a: 'b, 'b>(
1371        &'a self,
1372        mode: IteratorMode,
1373        readopts: ReadOptions,
1374    ) -> DBIteratorWithThreadMode<'b, Self> {
1375        DBIteratorWithThreadMode::new(self, readopts, mode)
1376    }
1377
1378    /// Opens an iterator using the provided ReadOptions.
1379    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
1380    pub fn iterator_cf_opt<'a: 'b, 'b>(
1381        &'a self,
1382        cf_handle: &impl AsColumnFamilyRef,
1383        readopts: ReadOptions,
1384        mode: IteratorMode,
1385    ) -> DBIteratorWithThreadMode<'b, Self> {
1386        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1387    }
1388
1389    /// Opens an iterator with `set_total_order_seek` enabled.
1390    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
1391    /// with a Hash-based implementation.
1392    pub fn full_iterator<'a: 'b, 'b>(
1393        &'a self,
1394        mode: IteratorMode,
1395    ) -> DBIteratorWithThreadMode<'b, Self> {
1396        let mut opts = ReadOptions::default();
1397        opts.set_total_order_seek(true);
1398        DBIteratorWithThreadMode::new(self, opts, mode)
1399    }
1400
1401    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1402        &'a self,
1403        prefix: P,
1404    ) -> DBIteratorWithThreadMode<'b, Self> {
1405        let mut opts = ReadOptions::default();
1406        opts.set_prefix_same_as_start(true);
1407        DBIteratorWithThreadMode::new(
1408            self,
1409            opts,
1410            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1411        )
1412    }
1413
1414    pub fn iterator_cf<'a: 'b, 'b>(
1415        &'a self,
1416        cf_handle: &impl AsColumnFamilyRef,
1417        mode: IteratorMode,
1418    ) -> DBIteratorWithThreadMode<'b, Self> {
1419        let opts = ReadOptions::default();
1420        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1421    }
1422
1423    pub fn full_iterator_cf<'a: 'b, 'b>(
1424        &'a self,
1425        cf_handle: &impl AsColumnFamilyRef,
1426        mode: IteratorMode,
1427    ) -> DBIteratorWithThreadMode<'b, Self> {
1428        let mut opts = ReadOptions::default();
1429        opts.set_total_order_seek(true);
1430        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1431    }
1432
1433    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1434        &'a self,
1435        cf_handle: &impl AsColumnFamilyRef,
1436        prefix: P,
1437    ) -> DBIteratorWithThreadMode<'a, Self> {
1438        let mut opts = ReadOptions::default();
1439        opts.set_prefix_same_as_start(true);
1440        DBIteratorWithThreadMode::<'a, Self>::new_cf(
1441            self,
1442            cf_handle.inner(),
1443            opts,
1444            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1445        )
1446    }
1447
1448    /// Opens a raw iterator over the database, using the default read options
1449    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1450        let opts = ReadOptions::default();
1451        DBRawIteratorWithThreadMode::new(self, opts)
1452    }
1453
1454    /// Opens a raw iterator over the given column family, using the default read options
1455    pub fn raw_iterator_cf<'a: 'b, 'b>(
1456        &'a self,
1457        cf_handle: &impl AsColumnFamilyRef,
1458    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1459        let opts = ReadOptions::default();
1460        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1461    }
1462
1463    /// Opens a raw iterator over the database, using the given read options
1464    pub fn raw_iterator_opt<'a: 'b, 'b>(
1465        &'a self,
1466        readopts: ReadOptions,
1467    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1468        DBRawIteratorWithThreadMode::new(self, readopts)
1469    }
1470
1471    /// Opens a raw iterator over the given column family, using the given read options
1472    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1473        &'a self,
1474        cf_handle: &impl AsColumnFamilyRef,
1475        readopts: ReadOptions,
1476    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1477        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1478    }
1479
1480    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1481        SnapshotWithThreadMode::<Self>::new(self)
1482    }
1483
1484    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1485    where
1486        K: AsRef<[u8]>,
1487        V: AsRef<[u8]>,
1488    {
1489        let key = key.as_ref();
1490        let value = value.as_ref();
1491
1492        unsafe {
1493            ffi_try!(ffi::rocksdb_put(
1494                self.inner.inner(),
1495                writeopts.inner,
1496                key.as_ptr() as *const c_char,
1497                key.len() as size_t,
1498                value.as_ptr() as *const c_char,
1499                value.len() as size_t,
1500            ));
1501            Ok(())
1502        }
1503    }
1504
1505    pub fn put_cf_opt<K, V>(
1506        &self,
1507        cf: &impl AsColumnFamilyRef,
1508        key: K,
1509        value: V,
1510        writeopts: &WriteOptions,
1511    ) -> Result<(), Error>
1512    where
1513        K: AsRef<[u8]>,
1514        V: AsRef<[u8]>,
1515    {
1516        let key = key.as_ref();
1517        let value = value.as_ref();
1518
1519        unsafe {
1520            ffi_try!(ffi::rocksdb_put_cf(
1521                self.inner.inner(),
1522                writeopts.inner,
1523                cf.inner(),
1524                key.as_ptr() as *const c_char,
1525                key.len() as size_t,
1526                value.as_ptr() as *const c_char,
1527                value.len() as size_t,
1528            ));
1529            Ok(())
1530        }
1531    }
1532
1533    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1534    where
1535        K: AsRef<[u8]>,
1536        V: AsRef<[u8]>,
1537    {
1538        let key = key.as_ref();
1539        let value = value.as_ref();
1540
1541        unsafe {
1542            ffi_try!(ffi::rocksdb_merge(
1543                self.inner.inner(),
1544                writeopts.inner,
1545                key.as_ptr() as *const c_char,
1546                key.len() as size_t,
1547                value.as_ptr() as *const c_char,
1548                value.len() as size_t,
1549            ));
1550            Ok(())
1551        }
1552    }
1553
1554    pub fn merge_cf_opt<K, V>(
1555        &self,
1556        cf: &impl AsColumnFamilyRef,
1557        key: K,
1558        value: V,
1559        writeopts: &WriteOptions,
1560    ) -> Result<(), Error>
1561    where
1562        K: AsRef<[u8]>,
1563        V: AsRef<[u8]>,
1564    {
1565        let key = key.as_ref();
1566        let value = value.as_ref();
1567
1568        unsafe {
1569            ffi_try!(ffi::rocksdb_merge_cf(
1570                self.inner.inner(),
1571                writeopts.inner,
1572                cf.inner(),
1573                key.as_ptr() as *const c_char,
1574                key.len() as size_t,
1575                value.as_ptr() as *const c_char,
1576                value.len() as size_t,
1577            ));
1578            Ok(())
1579        }
1580    }
1581
1582    pub fn delete_opt<K: AsRef<[u8]>>(
1583        &self,
1584        key: K,
1585        writeopts: &WriteOptions,
1586    ) -> Result<(), Error> {
1587        let key = key.as_ref();
1588
1589        unsafe {
1590            ffi_try!(ffi::rocksdb_delete(
1591                self.inner.inner(),
1592                writeopts.inner,
1593                key.as_ptr() as *const c_char,
1594                key.len() as size_t,
1595            ));
1596            Ok(())
1597        }
1598    }
1599
1600    pub fn delete_cf_opt<K: AsRef<[u8]>>(
1601        &self,
1602        cf: &impl AsColumnFamilyRef,
1603        key: K,
1604        writeopts: &WriteOptions,
1605    ) -> Result<(), Error> {
1606        let key = key.as_ref();
1607
1608        unsafe {
1609            ffi_try!(ffi::rocksdb_delete_cf(
1610                self.inner.inner(),
1611                writeopts.inner,
1612                cf.inner(),
1613                key.as_ptr() as *const c_char,
1614                key.len() as size_t,
1615            ));
1616            Ok(())
1617        }
1618    }
1619
1620    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1621    where
1622        K: AsRef<[u8]>,
1623        V: AsRef<[u8]>,
1624    {
1625        self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1626    }
1627
1628    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1629    where
1630        K: AsRef<[u8]>,
1631        V: AsRef<[u8]>,
1632    {
1633        self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1634    }
1635
1636    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1637    where
1638        K: AsRef<[u8]>,
1639        V: AsRef<[u8]>,
1640    {
1641        self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1642    }
1643
1644    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1645    where
1646        K: AsRef<[u8]>,
1647        V: AsRef<[u8]>,
1648    {
1649        self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1650    }
1651
1652    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1653        self.delete_opt(key.as_ref(), &WriteOptions::default())
1654    }
1655
1656    pub fn delete_cf<K: AsRef<[u8]>>(
1657        &self,
1658        cf: &impl AsColumnFamilyRef,
1659        key: K,
1660    ) -> Result<(), Error> {
1661        self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1662    }
1663
1664    /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
1665    pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1666        unsafe {
1667            let start = start.as_ref().map(AsRef::as_ref);
1668            let end = end.as_ref().map(AsRef::as_ref);
1669
1670            ffi::rocksdb_compact_range(
1671                self.inner.inner(),
1672                opt_bytes_to_ptr(start),
1673                start.map_or(0, <[u8]>::len) as size_t,
1674                opt_bytes_to_ptr(end),
1675                end.map_or(0, <[u8]>::len) as size_t,
1676            );
1677        }
1678    }
1679
1680    /// Same as `compact_range` but with custom options.
1681    pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1682        &self,
1683        start: Option<S>,
1684        end: Option<E>,
1685        opts: &CompactOptions,
1686    ) {
1687        unsafe {
1688            let start = start.as_ref().map(AsRef::as_ref);
1689            let end = end.as_ref().map(AsRef::as_ref);
1690
1691            ffi::rocksdb_compact_range_opt(
1692                self.inner.inner(),
1693                opts.inner,
1694                opt_bytes_to_ptr(start),
1695                start.map_or(0, <[u8]>::len) as size_t,
1696                opt_bytes_to_ptr(end),
1697                end.map_or(0, <[u8]>::len) as size_t,
1698            );
1699        }
1700    }
1701
1702    /// Runs a manual compaction on the Range of keys given on the
1703    /// given column family. This is not likely to be needed for typical usage.
1704    pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1705        &self,
1706        cf: &impl AsColumnFamilyRef,
1707        start: Option<S>,
1708        end: Option<E>,
1709    ) {
1710        unsafe {
1711            let start = start.as_ref().map(AsRef::as_ref);
1712            let end = end.as_ref().map(AsRef::as_ref);
1713
1714            ffi::rocksdb_compact_range_cf(
1715                self.inner.inner(),
1716                cf.inner(),
1717                opt_bytes_to_ptr(start),
1718                start.map_or(0, <[u8]>::len) as size_t,
1719                opt_bytes_to_ptr(end),
1720                end.map_or(0, <[u8]>::len) as size_t,
1721            );
1722        }
1723    }
1724
1725    /// Same as `compact_range_cf` but with custom options.
1726    pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1727        &self,
1728        cf: &impl AsColumnFamilyRef,
1729        start: Option<S>,
1730        end: Option<E>,
1731        opts: &CompactOptions,
1732    ) {
1733        unsafe {
1734            let start = start.as_ref().map(AsRef::as_ref);
1735            let end = end.as_ref().map(AsRef::as_ref);
1736
1737            ffi::rocksdb_compact_range_cf_opt(
1738                self.inner.inner(),
1739                cf.inner(),
1740                opts.inner,
1741                opt_bytes_to_ptr(start),
1742                start.map_or(0, <[u8]>::len) as size_t,
1743                opt_bytes_to_ptr(end),
1744                end.map_or(0, <[u8]>::len) as size_t,
1745            );
1746        }
1747    }
1748
1749    /// Wait for all flush and compactions jobs to finish. Jobs to wait include the
1750    /// unscheduled (queued, but not scheduled yet).
1751    ///
1752    /// NOTE: This may also never return if there's sufficient ongoing writes that
1753    /// keeps flush and compaction going without stopping. The user would have to
1754    /// cease all the writes to DB to make this eventually return in a stable
1755    /// state. The user may also use timeout option in WaitForCompactOptions to
1756    /// make this stop waiting and return when timeout expires.
1757    pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
1758        unsafe {
1759            ffi_try!(ffi::rocksdb_wait_for_compact(
1760                self.inner.inner(),
1761                opts.inner
1762            ));
1763        }
1764        Ok(())
1765    }
1766
1767    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1768        let copts = convert_options(opts)?;
1769        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1770        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1771        let count = opts.len() as i32;
1772        unsafe {
1773            ffi_try!(ffi::rocksdb_set_options(
1774                self.inner.inner(),
1775                count,
1776                cnames.as_ptr(),
1777                cvalues.as_ptr(),
1778            ));
1779        }
1780        Ok(())
1781    }
1782
1783    pub fn set_options_cf(
1784        &self,
1785        cf: &impl AsColumnFamilyRef,
1786        opts: &[(&str, &str)],
1787    ) -> Result<(), Error> {
1788        let copts = convert_options(opts)?;
1789        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1790        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1791        let count = opts.len() as i32;
1792        unsafe {
1793            ffi_try!(ffi::rocksdb_set_options_cf(
1794                self.inner.inner(),
1795                cf.inner(),
1796                count,
1797                cnames.as_ptr(),
1798                cvalues.as_ptr(),
1799            ));
1800        }
1801        Ok(())
1802    }
1803
1804    /// Implementation for property_value et al methods.
1805    ///
1806    /// `name` is the name of the property.  It will be converted into a CString
1807    /// and passed to `get_property` as argument.  `get_property` reads the
1808    /// specified property and either returns NULL or a pointer to a C allocated
1809    /// string; this method takes ownership of that string and will free it at
1810    /// the end. That string is parsed using `parse` callback which produces
1811    /// the returned result.
1812    fn property_value_impl<R>(
1813        name: impl CStrLike,
1814        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1815        parse: impl FnOnce(&str) -> Result<R, Error>,
1816    ) -> Result<Option<R>, Error> {
1817        let value = match name.bake() {
1818            Ok(prop_name) => get_property(prop_name.as_ptr()),
1819            Err(e) => {
1820                return Err(Error::new(format!(
1821                    "Failed to convert property name to CString: {e}"
1822                )));
1823            }
1824        };
1825        if value.is_null() {
1826            return Ok(None);
1827        }
1828        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1829            Ok(s) => parse(s).map(|value| Some(value)),
1830            Err(e) => Err(Error::new(format!(
1831                "Failed to convert property value to string: {e}"
1832            ))),
1833        };
1834        unsafe {
1835            ffi::rocksdb_free(value as *mut c_void);
1836        }
1837        result
1838    }
1839
1840    /// Retrieves a RocksDB property by name.
1841    ///
1842    /// Full list of properties could be find
1843    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1844    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1845        Self::property_value_impl(
1846            name,
1847            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1848            |str_value| Ok(str_value.to_owned()),
1849        )
1850    }
1851
1852    /// Retrieves a RocksDB property by name, for a specific column family.
1853    ///
1854    /// Full list of properties could be find
1855    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1856    pub fn property_value_cf(
1857        &self,
1858        cf: &impl AsColumnFamilyRef,
1859        name: impl CStrLike,
1860    ) -> Result<Option<String>, Error> {
1861        Self::property_value_impl(
1862            name,
1863            |prop_name| unsafe {
1864                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1865            },
1866            |str_value| Ok(str_value.to_owned()),
1867        )
1868    }
1869
1870    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1871        value.parse::<u64>().map_err(|err| {
1872            Error::new(format!(
1873                "Failed to convert property value {value} to int: {err}"
1874            ))
1875        })
1876    }
1877
1878    /// Retrieves a RocksDB property and casts it to an integer.
1879    ///
1880    /// Full list of properties that return int values could be find
1881    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1882    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1883        Self::property_value_impl(
1884            name,
1885            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1886            Self::parse_property_int_value,
1887        )
1888    }
1889
1890    /// Retrieves a RocksDB property for a specific column family and casts it to an integer.
1891    ///
1892    /// Full list of properties that return int values could be find
1893    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1894    pub fn property_int_value_cf(
1895        &self,
1896        cf: &impl AsColumnFamilyRef,
1897        name: impl CStrLike,
1898    ) -> Result<Option<u64>, Error> {
1899        Self::property_value_impl(
1900            name,
1901            |prop_name| unsafe {
1902                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1903            },
1904            Self::parse_property_int_value,
1905        )
1906    }
1907
1908    /// The sequence number of the most recent transaction.
1909    pub fn latest_sequence_number(&self) -> u64 {
1910        unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1911    }
1912
1913    /// Iterate over batches of write operations since a given sequence.
1914    ///
1915    /// Produce an iterator that will provide the batches of write operations
1916    /// that have occurred since the given sequence (see
1917    /// `latest_sequence_number()`). Use the provided iterator to retrieve each
1918    /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
1919    /// deletes using the `WriteBatch::iterate()` function.
1920    ///
1921    /// Calling `get_updates_since()` with a sequence number that is out of
1922    /// bounds will return an error.
1923    pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1924        unsafe {
1925            // rocksdb_wal_readoptions_t does not appear to have any functions
1926            // for creating and destroying it; fortunately we can pass a nullptr
1927            // here to get the default behavior
1928            let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1929            let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1930                self.inner.inner(),
1931                seq_number,
1932                opts
1933            ));
1934            Ok(DBWALIterator {
1935                inner: iter,
1936                start_seq_number: seq_number,
1937            })
1938        }
1939    }
1940
1941    /// Tries to catch up with the primary by reading as much as possible from the
1942    /// log files.
1943    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1944        unsafe {
1945            ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1946        }
1947        Ok(())
1948    }
1949
1950    /// Loads a list of external SST files created with SstFileWriter into the DB with default opts
1951    pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1952        let opts = IngestExternalFileOptions::default();
1953        self.ingest_external_file_opts(&opts, paths)
1954    }
1955
1956    /// Loads a list of external SST files created with SstFileWriter into the DB
1957    pub fn ingest_external_file_opts<P: AsRef<Path>>(
1958        &self,
1959        opts: &IngestExternalFileOptions,
1960        paths: Vec<P>,
1961    ) -> Result<(), Error> {
1962        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1963        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1964
1965        self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1966    }
1967
1968    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1969    /// with default opts
1970    pub fn ingest_external_file_cf<P: AsRef<Path>>(
1971        &self,
1972        cf: &impl AsColumnFamilyRef,
1973        paths: Vec<P>,
1974    ) -> Result<(), Error> {
1975        let opts = IngestExternalFileOptions::default();
1976        self.ingest_external_file_cf_opts(cf, &opts, paths)
1977    }
1978
1979    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1980    pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1981        &self,
1982        cf: &impl AsColumnFamilyRef,
1983        opts: &IngestExternalFileOptions,
1984        paths: Vec<P>,
1985    ) -> Result<(), Error> {
1986        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1987        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1988
1989        self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1990    }
1991
1992    fn ingest_external_file_raw(
1993        &self,
1994        opts: &IngestExternalFileOptions,
1995        paths_v: &[CString],
1996        cpaths: &[*const c_char],
1997    ) -> Result<(), Error> {
1998        unsafe {
1999            ffi_try!(ffi::rocksdb_ingest_external_file(
2000                self.inner.inner(),
2001                cpaths.as_ptr(),
2002                paths_v.len(),
2003                opts.inner as *const _
2004            ));
2005            Ok(())
2006        }
2007    }
2008
2009    fn ingest_external_file_raw_cf(
2010        &self,
2011        cf: &impl AsColumnFamilyRef,
2012        opts: &IngestExternalFileOptions,
2013        paths_v: &[CString],
2014        cpaths: &[*const c_char],
2015    ) -> Result<(), Error> {
2016        unsafe {
2017            ffi_try!(ffi::rocksdb_ingest_external_file_cf(
2018                self.inner.inner(),
2019                cf.inner(),
2020                cpaths.as_ptr(),
2021                paths_v.len(),
2022                opts.inner as *const _
2023            ));
2024            Ok(())
2025        }
2026    }
2027
2028    /// Obtains the LSM-tree meta data of the default column family of the DB
2029    pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
2030        unsafe {
2031            let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
2032
2033            let metadata = ColumnFamilyMetaData {
2034                size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2035                name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2036                file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2037            };
2038
2039            // destroy
2040            ffi::rocksdb_column_family_metadata_destroy(ptr);
2041
2042            // return
2043            metadata
2044        }
2045    }
2046
2047    /// Obtains the LSM-tree meta data of the specified column family of the DB
2048    pub fn get_column_family_metadata_cf(
2049        &self,
2050        cf: &impl AsColumnFamilyRef,
2051    ) -> ColumnFamilyMetaData {
2052        unsafe {
2053            let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
2054
2055            let metadata = ColumnFamilyMetaData {
2056                size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2057                name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2058                file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2059            };
2060
2061            // destroy
2062            ffi::rocksdb_column_family_metadata_destroy(ptr);
2063
2064            // return
2065            metadata
2066        }
2067    }
2068
2069    /// Returns a list of all table files with their level, start key
2070    /// and end key
2071    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
2072        unsafe {
2073            let files = ffi::rocksdb_livefiles(self.inner.inner());
2074            if files.is_null() {
2075                Err(Error::new("Could not get live files".to_owned()))
2076            } else {
2077                let n = ffi::rocksdb_livefiles_count(files);
2078
2079                let mut livefiles = Vec::with_capacity(n as usize);
2080                let mut key_size: usize = 0;
2081
2082                for i in 0..n {
2083                    let column_family_name =
2084                        from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2085                    let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2086                    let size = ffi::rocksdb_livefiles_size(files, i);
2087                    let level = ffi::rocksdb_livefiles_level(files, i);
2088
2089                    // get smallest key inside file
2090                    let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2091                    let smallest_key = raw_data(smallest_key, key_size);
2092
2093                    // get largest key inside file
2094                    let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2095                    let largest_key = raw_data(largest_key, key_size);
2096
2097                    livefiles.push(LiveFile {
2098                        column_family_name,
2099                        name,
2100                        size,
2101                        level,
2102                        start_key: smallest_key,
2103                        end_key: largest_key,
2104                        num_entries: ffi::rocksdb_livefiles_entries(files, i),
2105                        num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2106                    });
2107                }
2108
2109                // destroy livefiles metadata(s)
2110                ffi::rocksdb_livefiles_destroy(files);
2111
2112                // return
2113                Ok(livefiles)
2114            }
2115        }
2116    }
2117
2118    /// Delete sst files whose keys are entirely in the given range.
2119    ///
2120    /// Could leave some keys in the range which are in files which are not
2121    /// entirely in the range.
2122    ///
2123    /// Note: L0 files are left regardless of whether they're in the range.
2124    ///
2125    /// SnapshotWithThreadModes before the delete might not see the data in the given range.
2126    pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2127        let from = from.as_ref();
2128        let to = to.as_ref();
2129        unsafe {
2130            ffi_try!(ffi::rocksdb_delete_file_in_range(
2131                self.inner.inner(),
2132                from.as_ptr() as *const c_char,
2133                from.len() as size_t,
2134                to.as_ptr() as *const c_char,
2135                to.len() as size_t,
2136            ));
2137            Ok(())
2138        }
2139    }
2140
2141    /// Same as `delete_file_in_range` but only for specific column family
2142    pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2143        &self,
2144        cf: &impl AsColumnFamilyRef,
2145        from: K,
2146        to: K,
2147    ) -> Result<(), Error> {
2148        let from = from.as_ref();
2149        let to = to.as_ref();
2150        unsafe {
2151            ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2152                self.inner.inner(),
2153                cf.inner(),
2154                from.as_ptr() as *const c_char,
2155                from.len() as size_t,
2156                to.as_ptr() as *const c_char,
2157                to.len() as size_t,
2158            ));
2159            Ok(())
2160        }
2161    }
2162
2163    /// Request stopping background work, if wait is true wait until it's done.
2164    pub fn cancel_all_background_work(&self, wait: bool) {
2165        unsafe {
2166            ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2167        }
2168    }
2169
2170    fn drop_column_family<C>(
2171        &self,
2172        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2173        cf: C,
2174    ) -> Result<(), Error> {
2175        unsafe {
2176            // first mark the column family as dropped
2177            ffi_try!(ffi::rocksdb_drop_column_family(
2178                self.inner.inner(),
2179                cf_inner
2180            ));
2181        }
2182        // then finally reclaim any resources (mem, files) by destroying the only single column
2183        // family handle by drop()-ing it
2184        drop(cf);
2185        Ok(())
2186    }
2187}
2188
2189impl<I: DBInner> DBCommon<SingleThreaded, I> {
2190    /// Creates column family with given name and options
2191    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2192        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2193        self.cfs
2194            .cfs
2195            .insert(name.as_ref().to_string(), ColumnFamily { inner });
2196        Ok(())
2197    }
2198
2199    /// Drops the column family with the given name
2200    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2201        if let Some(cf) = self.cfs.cfs.remove(name) {
2202            self.drop_column_family(cf.inner, cf)
2203        } else {
2204            Err(Error::new(format!("Invalid column family: {name}")))
2205        }
2206    }
2207
2208    /// Returns the underlying column family handle
2209    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2210        self.cfs.cfs.get(name)
2211    }
2212}
2213
2214impl<I: DBInner> DBCommon<MultiThreaded, I> {
2215    /// Creates column family with given name and options
2216    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2217        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2218        self.cfs.cfs.write().unwrap().insert(
2219            name.as_ref().to_string(),
2220            Arc::new(UnboundColumnFamily { inner }),
2221        );
2222        Ok(())
2223    }
2224
2225    /// Drops the column family with the given name by internally locking the inner column
2226    /// family map. This avoids needing `&mut self` reference
2227    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2228        if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2229            self.drop_column_family(cf.inner, cf)
2230        } else {
2231            Err(Error::new(format!("Invalid column family: {name}")))
2232        }
2233    }
2234
2235    /// Returns the underlying column family handle
2236    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2237        self.cfs
2238            .cfs
2239            .read()
2240            .unwrap()
2241            .get(name)
2242            .cloned()
2243            .map(UnboundColumnFamily::bound_column_family)
2244    }
2245}
2246
2247impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2248    fn drop(&mut self) {
2249        self.cfs.drop_all_cfs_internal();
2250    }
2251}
2252
2253impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2254    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2255        write!(f, "RocksDB {{ path: {:?} }}", self.path())
2256    }
2257}
2258
2259/// The metadata that describes a column family.
2260#[derive(Debug, Clone)]
2261pub struct ColumnFamilyMetaData {
2262    // The size of this column family in bytes, which is equal to the sum of
2263    // the file size of its "levels".
2264    pub size: u64,
2265    // The name of the column family.
2266    pub name: String,
2267    // The number of files in this column family.
2268    pub file_count: usize,
2269}
2270
2271/// The metadata that describes a SST file
2272#[derive(Debug, Clone)]
2273pub struct LiveFile {
2274    /// Name of the column family the file belongs to
2275    pub column_family_name: String,
2276    /// Name of the file
2277    pub name: String,
2278    /// Size of the file
2279    pub size: usize,
2280    /// Level at which this file resides
2281    pub level: i32,
2282    /// Smallest user defined key in the file
2283    pub start_key: Option<Vec<u8>>,
2284    /// Largest user defined key in the file
2285    pub end_key: Option<Vec<u8>>,
2286    /// Number of entries/alive keys in the file
2287    pub num_entries: u64,
2288    /// Number of deletions/tomb key(s) in the file
2289    pub num_deletions: u64,
2290}
2291
2292fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2293    opts.iter()
2294        .map(|(name, value)| {
2295            let cname = match CString::new(name.as_bytes()) {
2296                Ok(cname) => cname,
2297                Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2298            };
2299            let cvalue = match CString::new(value.as_bytes()) {
2300                Ok(cvalue) => cvalue,
2301                Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2302            };
2303            Ok((cname, cvalue))
2304        })
2305        .collect()
2306}
2307
2308pub(crate) fn convert_values(
2309    values: Vec<*mut c_char>,
2310    values_sizes: Vec<usize>,
2311    errors: Vec<*mut c_char>,
2312) -> Vec<Result<Option<Vec<u8>>, Error>> {
2313    values
2314        .into_iter()
2315        .zip(values_sizes.into_iter())
2316        .zip(errors.into_iter())
2317        .map(|((v, s), e)| {
2318            if e.is_null() {
2319                let value = unsafe { crate::ffi_util::raw_data(v, s) };
2320                unsafe {
2321                    ffi::rocksdb_free(v as *mut c_void);
2322                }
2323                Ok(value)
2324            } else {
2325                Err(Error::new(crate::ffi_util::error_message(e)))
2326            }
2327        })
2328        .collect()
2329}