rocksdb/transactions/
transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{
17    collections::BTreeMap,
18    ffi::CString,
19    fs, iter,
20    marker::PhantomData,
21    path::{Path, PathBuf},
22    ptr,
23    sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31    column_family::UnboundColumnFamily,
32    db::{convert_values, DBAccess},
33    db_options::OptionsMustOutliveDB,
34    ffi,
35    ffi_util::to_cpath,
36    AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
37    DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
38    IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
39    ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
40    WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_void, size_t};
44
45#[cfg(not(feature = "multi-threaded-cf"))]
46type DefaultThreadMode = crate::SingleThreaded;
47#[cfg(feature = "multi-threaded-cf")]
48type DefaultThreadMode = crate::MultiThreaded;
49
50/// RocksDB TransactionDB.
51///
52/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
53/// to learn more about RocksDB TransactionDB.
54///
55/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
56/// if feature `multi-threaded-cf` is not enabled.
57///
58/// ```
59/// use rocksdb::{DB, Options, TransactionDB, SingleThreaded};
60/// let tempdir = tempfile::Builder::new()
61///     .prefix("_path_for_transaction_db")
62///     .tempdir()
63///     .expect("Failed to create temporary path for the _path_for_transaction_db");
64/// let path = tempdir.path();
65/// {
66///     let db: TransactionDB = TransactionDB::open_default(path).unwrap();
67///     db.put(b"my key", b"my value").unwrap();
68///
69///     // create transaction
70///     let txn = db.transaction();
71///     txn.put(b"key2", b"value2");
72///     txn.put(b"key3", b"value3");
73///     txn.commit().unwrap();
74/// }
75/// let _ = DB::destroy(&Options::default(), path);
76/// ```
77///
78/// [`SingleThreaded`]: crate::SingleThreaded
79pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
80    pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
81    cfs: T,
82    path: PathBuf,
83    // prepared 2pc transactions.
84    prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
85    _outlive: Vec<OptionsMustOutliveDB>,
86}
87
88unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
89unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
90
91impl<T: ThreadMode> DBAccess for TransactionDB<T> {
92    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
93        ffi::rocksdb_transactiondb_create_snapshot(self.inner)
94    }
95
96    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
97        ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
98    }
99
100    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
101        ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
102    }
103
104    unsafe fn create_iterator_cf(
105        &self,
106        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
107        readopts: &ReadOptions,
108    ) -> *mut ffi::rocksdb_iterator_t {
109        ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
110    }
111
112    fn get_opt<K: AsRef<[u8]>>(
113        &self,
114        key: K,
115        readopts: &ReadOptions,
116    ) -> Result<Option<Vec<u8>>, Error> {
117        self.get_opt(key, readopts)
118    }
119
120    fn get_cf_opt<K: AsRef<[u8]>>(
121        &self,
122        cf: &impl AsColumnFamilyRef,
123        key: K,
124        readopts: &ReadOptions,
125    ) -> Result<Option<Vec<u8>>, Error> {
126        self.get_cf_opt(cf, key, readopts)
127    }
128
129    fn get_pinned_opt<K: AsRef<[u8]>>(
130        &self,
131        key: K,
132        readopts: &ReadOptions,
133    ) -> Result<Option<DBPinnableSlice>, Error> {
134        self.get_pinned_opt(key, readopts)
135    }
136
137    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
138        &self,
139        cf: &impl AsColumnFamilyRef,
140        key: K,
141        readopts: &ReadOptions,
142    ) -> Result<Option<DBPinnableSlice>, Error> {
143        self.get_pinned_cf_opt(cf, key, readopts)
144    }
145
146    fn multi_get_opt<K, I>(
147        &self,
148        keys: I,
149        readopts: &ReadOptions,
150    ) -> Vec<Result<Option<Vec<u8>>, Error>>
151    where
152        K: AsRef<[u8]>,
153        I: IntoIterator<Item = K>,
154    {
155        self.multi_get_opt(keys, readopts)
156    }
157
158    fn multi_get_cf_opt<'b, K, I, W>(
159        &self,
160        keys_cf: I,
161        readopts: &ReadOptions,
162    ) -> Vec<Result<Option<Vec<u8>>, Error>>
163    where
164        K: AsRef<[u8]>,
165        I: IntoIterator<Item = (&'b W, K)>,
166        W: AsColumnFamilyRef + 'b,
167    {
168        self.multi_get_cf_opt(keys_cf, readopts)
169    }
170}
171
172impl<T: ThreadMode> TransactionDB<T> {
173    /// Opens a database with default options.
174    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
175        let mut opts = Options::default();
176        opts.create_if_missing(true);
177        let txn_db_opts = TransactionDBOptions::default();
178        Self::open(&opts, &txn_db_opts, path)
179    }
180
181    /// Opens the database with the specified options.
182    pub fn open<P: AsRef<Path>>(
183        opts: &Options,
184        txn_db_opts: &TransactionDBOptions,
185        path: P,
186    ) -> Result<Self, Error> {
187        Self::open_cf(opts, txn_db_opts, path, None::<&str>)
188    }
189
190    /// Opens a database with the given database options and column family names.
191    ///
192    /// Column families opened using this function will be created with default `Options`.
193    pub fn open_cf<P, I, N>(
194        opts: &Options,
195        txn_db_opts: &TransactionDBOptions,
196        path: P,
197        cfs: I,
198    ) -> Result<Self, Error>
199    where
200        P: AsRef<Path>,
201        I: IntoIterator<Item = N>,
202        N: AsRef<str>,
203    {
204        let cfs = cfs
205            .into_iter()
206            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
207
208        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
209    }
210
211    /// Opens a database with the given database options and column family descriptors.
212    pub fn open_cf_descriptors<P, I>(
213        opts: &Options,
214        txn_db_opts: &TransactionDBOptions,
215        path: P,
216        cfs: I,
217    ) -> Result<Self, Error>
218    where
219        P: AsRef<Path>,
220        I: IntoIterator<Item = ColumnFamilyDescriptor>,
221    {
222        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
223    }
224
225    /// Internal implementation for opening RocksDB.
226    fn open_cf_descriptors_internal<P, I>(
227        opts: &Options,
228        txn_db_opts: &TransactionDBOptions,
229        path: P,
230        cfs: I,
231    ) -> Result<Self, Error>
232    where
233        P: AsRef<Path>,
234        I: IntoIterator<Item = ColumnFamilyDescriptor>,
235    {
236        let cfs: Vec<_> = cfs.into_iter().collect();
237        let outlive = iter::once(opts.outlive.clone())
238            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
239            .collect();
240
241        let cpath = to_cpath(&path)?;
242
243        if let Err(e) = fs::create_dir_all(&path) {
244            return Err(Error::new(format!(
245                "Failed to create RocksDB directory: `{e:?}`."
246            )));
247        }
248
249        let db: *mut ffi::rocksdb_transactiondb_t;
250        let mut cf_map = BTreeMap::new();
251
252        if cfs.is_empty() {
253            db = Self::open_raw(opts, txn_db_opts, &cpath)?;
254        } else {
255            let mut cfs_v = cfs;
256            // Always open the default column family.
257            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
258                cfs_v.push(ColumnFamilyDescriptor {
259                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
260                    options: Options::default(),
261                    ttl: ColumnFamilyTtl::SameAsDb, // it will have ttl specified in `DBWithThreadMode::open_with_ttl`
262                });
263            }
264            // We need to store our CStrings in an intermediate vector
265            // so that their pointers remain valid.
266            let c_cfs: Vec<CString> = cfs_v
267                .iter()
268                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
269                .collect();
270
271            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
272
273            // These handles will be populated by DB.
274            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
275
276            let cfopts: Vec<_> = cfs_v
277                .iter()
278                .map(|cf| cf.options.inner.cast_const())
279                .collect();
280
281            db = Self::open_cf_raw(
282                opts,
283                txn_db_opts,
284                &cpath,
285                &cfs_v,
286                &cfnames,
287                &cfopts,
288                &mut cfhandles,
289            )?;
290
291            for handle in &cfhandles {
292                if handle.is_null() {
293                    return Err(Error::new(
294                        "Received null column family handle from DB.".to_owned(),
295                    ));
296                }
297            }
298
299            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
300                cf_map.insert(cf_desc.name.clone(), inner);
301            }
302        }
303
304        if db.is_null() {
305            return Err(Error::new("Could not initialize database.".to_owned()));
306        }
307
308        let prepared = unsafe {
309            let mut cnt = 0;
310            let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
311            let mut vec = vec![std::ptr::null_mut(); cnt];
312            if !ptr.is_null() {
313                std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
314                ffi::rocksdb_free(ptr as *mut c_void);
315            }
316            vec
317        };
318
319        Ok(TransactionDB {
320            inner: db,
321            cfs: T::new_cf_map_internal(cf_map),
322            path: path.as_ref().to_path_buf(),
323            prepared: Mutex::new(prepared),
324            _outlive: outlive,
325        })
326    }
327
328    fn open_raw(
329        opts: &Options,
330        txn_db_opts: &TransactionDBOptions,
331        cpath: &CString,
332    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
333        unsafe {
334            let db = ffi_try!(ffi::rocksdb_transactiondb_open(
335                opts.inner,
336                txn_db_opts.inner,
337                cpath.as_ptr()
338            ));
339            Ok(db)
340        }
341    }
342
343    fn open_cf_raw(
344        opts: &Options,
345        txn_db_opts: &TransactionDBOptions,
346        cpath: &CString,
347        cfs_v: &[ColumnFamilyDescriptor],
348        cfnames: &[*const c_char],
349        cfopts: &[*const ffi::rocksdb_options_t],
350        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
351    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
352        unsafe {
353            let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
354                opts.inner,
355                txn_db_opts.inner,
356                cpath.as_ptr(),
357                cfs_v.len() as c_int,
358                cfnames.as_ptr(),
359                cfopts.as_ptr(),
360                cfhandles.as_mut_ptr(),
361            ));
362            Ok(db)
363        }
364    }
365
366    fn create_inner_cf_handle(
367        &self,
368        name: &str,
369        opts: &Options,
370    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
371        let cf_name = CString::new(name.as_bytes()).map_err(|_| {
372            Error::new("Failed to convert path to CString when creating cf".to_owned())
373        })?;
374
375        Ok(unsafe {
376            ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
377                self.inner,
378                opts.inner,
379                cf_name.as_ptr(),
380            ))
381        })
382    }
383
384    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
385        DB::list_cf(opts, path)
386    }
387
388    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
389        DB::destroy(opts, path)
390    }
391
392    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
393        DB::repair(opts, path)
394    }
395
396    pub fn path(&self) -> &Path {
397        self.path.as_path()
398    }
399
400    /// Creates a transaction with default options.
401    pub fn transaction(&self) -> Transaction<Self> {
402        self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
403    }
404
405    /// Creates a transaction with options.
406    pub fn transaction_opt<'a>(
407        &'a self,
408        write_opts: &WriteOptions,
409        txn_opts: &TransactionOptions,
410    ) -> Transaction<'a, Self> {
411        Transaction {
412            inner: unsafe {
413                ffi::rocksdb_transaction_begin(
414                    self.inner,
415                    write_opts.inner,
416                    txn_opts.inner,
417                    std::ptr::null_mut(),
418                )
419            },
420            _marker: PhantomData,
421        }
422    }
423
424    /// Get all prepared transactions for recovery.
425    ///
426    /// This function is expected to call once after open database.
427    /// User should commit or rollback all transactions before start other transactions.
428    pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
429        self.prepared
430            .lock()
431            .unwrap()
432            .drain(0..)
433            .map(|inner| Transaction {
434                inner,
435                _marker: PhantomData,
436            })
437            .collect()
438    }
439
440    /// Returns the bytes associated with a key value.
441    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
442        self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
443    }
444
445    /// Returns the bytes associated with a key value and the given column family.
446    pub fn get_cf<K: AsRef<[u8]>>(
447        &self,
448        cf: &impl AsColumnFamilyRef,
449        key: K,
450    ) -> Result<Option<Vec<u8>>, Error> {
451        self.get_pinned_cf(cf, key)
452            .map(|x| x.map(|v| v.as_ref().to_vec()))
453    }
454
455    /// Returns the bytes associated with a key value with read options.
456    pub fn get_opt<K: AsRef<[u8]>>(
457        &self,
458        key: K,
459        readopts: &ReadOptions,
460    ) -> Result<Option<Vec<u8>>, Error> {
461        self.get_pinned_opt(key, readopts)
462            .map(|x| x.map(|v| v.as_ref().to_vec()))
463    }
464
465    /// Returns the bytes associated with a key value and the given column family with read options.
466    pub fn get_cf_opt<K: AsRef<[u8]>>(
467        &self,
468        cf: &impl AsColumnFamilyRef,
469        key: K,
470        readopts: &ReadOptions,
471    ) -> Result<Option<Vec<u8>>, Error> {
472        self.get_pinned_cf_opt(cf, key, readopts)
473            .map(|x| x.map(|v| v.as_ref().to_vec()))
474    }
475
476    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
477        self.get_pinned_opt(key, &ReadOptions::default())
478    }
479
480    /// Returns the bytes associated with a key value and the given column family.
481    pub fn get_pinned_cf<K: AsRef<[u8]>>(
482        &self,
483        cf: &impl AsColumnFamilyRef,
484        key: K,
485    ) -> Result<Option<DBPinnableSlice>, Error> {
486        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
487    }
488
489    /// Returns the bytes associated with a key value with read options.
490    pub fn get_pinned_opt<K: AsRef<[u8]>>(
491        &self,
492        key: K,
493        readopts: &ReadOptions,
494    ) -> Result<Option<DBPinnableSlice>, Error> {
495        let key = key.as_ref();
496        unsafe {
497            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
498                self.inner,
499                readopts.inner,
500                key.as_ptr() as *const c_char,
501                key.len() as size_t,
502            ));
503            if val.is_null() {
504                Ok(None)
505            } else {
506                Ok(Some(DBPinnableSlice::from_c(val)))
507            }
508        }
509    }
510
511    /// Returns the bytes associated with a key value and the given column family with read options.
512    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
513        &self,
514        cf: &impl AsColumnFamilyRef,
515        key: K,
516        readopts: &ReadOptions,
517    ) -> Result<Option<DBPinnableSlice>, Error> {
518        let key = key.as_ref();
519        unsafe {
520            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
521                self.inner,
522                readopts.inner,
523                cf.inner(),
524                key.as_ptr() as *const c_char,
525                key.len() as size_t,
526            ));
527            if val.is_null() {
528                Ok(None)
529            } else {
530                Ok(Some(DBPinnableSlice::from_c(val)))
531            }
532        }
533    }
534
535    /// Return the values associated with the given keys.
536    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
537    where
538        K: AsRef<[u8]>,
539        I: IntoIterator<Item = K>,
540    {
541        self.multi_get_opt(keys, &ReadOptions::default())
542    }
543
544    /// Return the values associated with the given keys using read options.
545    pub fn multi_get_opt<K, I>(
546        &self,
547        keys: I,
548        readopts: &ReadOptions,
549    ) -> Vec<Result<Option<Vec<u8>>, Error>>
550    where
551        K: AsRef<[u8]>,
552        I: IntoIterator<Item = K>,
553    {
554        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
555            .into_iter()
556            .map(|key| {
557                let key = key.as_ref();
558                (Box::from(key), key.len())
559            })
560            .unzip();
561        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
562
563        let mut values = vec![ptr::null_mut(); keys.len()];
564        let mut values_sizes = vec![0_usize; keys.len()];
565        let mut errors = vec![ptr::null_mut(); keys.len()];
566        unsafe {
567            ffi::rocksdb_transactiondb_multi_get(
568                self.inner,
569                readopts.inner,
570                ptr_keys.len(),
571                ptr_keys.as_ptr(),
572                keys_sizes.as_ptr(),
573                values.as_mut_ptr(),
574                values_sizes.as_mut_ptr(),
575                errors.as_mut_ptr(),
576            );
577        }
578
579        convert_values(values, values_sizes, errors)
580    }
581
582    /// Return the values associated with the given keys and column families.
583    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
584        &'a self,
585        keys: I,
586    ) -> Vec<Result<Option<Vec<u8>>, Error>>
587    where
588        K: AsRef<[u8]>,
589        I: IntoIterator<Item = (&'b W, K)>,
590        W: 'b + AsColumnFamilyRef,
591    {
592        self.multi_get_cf_opt(keys, &ReadOptions::default())
593    }
594
595    /// Return the values associated with the given keys and column families using read options.
596    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
597        &'a self,
598        keys: I,
599        readopts: &ReadOptions,
600    ) -> Vec<Result<Option<Vec<u8>>, Error>>
601    where
602        K: AsRef<[u8]>,
603        I: IntoIterator<Item = (&'b W, K)>,
604        W: 'b + AsColumnFamilyRef,
605    {
606        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
607            .into_iter()
608            .map(|(cf, key)| {
609                let key = key.as_ref();
610                ((cf, Box::from(key)), key.len())
611            })
612            .unzip();
613        let ptr_keys: Vec<_> = cfs_and_keys
614            .iter()
615            .map(|(_, k)| k.as_ptr() as *const c_char)
616            .collect();
617        let ptr_cfs: Vec<_> = cfs_and_keys
618            .iter()
619            .map(|(c, _)| c.inner().cast_const())
620            .collect();
621
622        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
623        let mut values_sizes = vec![0_usize; ptr_keys.len()];
624        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
625        unsafe {
626            ffi::rocksdb_transactiondb_multi_get_cf(
627                self.inner,
628                readopts.inner,
629                ptr_cfs.as_ptr(),
630                ptr_keys.len(),
631                ptr_keys.as_ptr(),
632                keys_sizes.as_ptr(),
633                values.as_mut_ptr(),
634                values_sizes.as_mut_ptr(),
635                errors.as_mut_ptr(),
636            );
637        }
638
639        convert_values(values, values_sizes, errors)
640    }
641
642    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
643    where
644        K: AsRef<[u8]>,
645        V: AsRef<[u8]>,
646    {
647        self.put_opt(key, value, &WriteOptions::default())
648    }
649
650    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
651    where
652        K: AsRef<[u8]>,
653        V: AsRef<[u8]>,
654    {
655        self.put_cf_opt(cf, key, value, &WriteOptions::default())
656    }
657
658    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
659    where
660        K: AsRef<[u8]>,
661        V: AsRef<[u8]>,
662    {
663        let key = key.as_ref();
664        let value = value.as_ref();
665        unsafe {
666            ffi_try!(ffi::rocksdb_transactiondb_put(
667                self.inner,
668                writeopts.inner,
669                key.as_ptr() as *const c_char,
670                key.len() as size_t,
671                value.as_ptr() as *const c_char,
672                value.len() as size_t
673            ));
674        }
675        Ok(())
676    }
677
678    pub fn put_cf_opt<K, V>(
679        &self,
680        cf: &impl AsColumnFamilyRef,
681        key: K,
682        value: V,
683        writeopts: &WriteOptions,
684    ) -> Result<(), Error>
685    where
686        K: AsRef<[u8]>,
687        V: AsRef<[u8]>,
688    {
689        let key = key.as_ref();
690        let value = value.as_ref();
691        unsafe {
692            ffi_try!(ffi::rocksdb_transactiondb_put_cf(
693                self.inner,
694                writeopts.inner,
695                cf.inner(),
696                key.as_ptr() as *const c_char,
697                key.len() as size_t,
698                value.as_ptr() as *const c_char,
699                value.len() as size_t
700            ));
701        }
702        Ok(())
703    }
704
705    pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
706        self.write_opt(batch, &WriteOptions::default())
707    }
708
709    pub fn write_opt(
710        &self,
711        batch: WriteBatchWithTransaction<true>,
712        writeopts: &WriteOptions,
713    ) -> Result<(), Error> {
714        unsafe {
715            ffi_try!(ffi::rocksdb_transactiondb_write(
716                self.inner,
717                writeopts.inner,
718                batch.inner
719            ));
720        }
721        Ok(())
722    }
723
724    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
725    where
726        K: AsRef<[u8]>,
727        V: AsRef<[u8]>,
728    {
729        self.merge_opt(key, value, &WriteOptions::default())
730    }
731
732    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
733    where
734        K: AsRef<[u8]>,
735        V: AsRef<[u8]>,
736    {
737        self.merge_cf_opt(cf, key, value, &WriteOptions::default())
738    }
739
740    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
741    where
742        K: AsRef<[u8]>,
743        V: AsRef<[u8]>,
744    {
745        let key = key.as_ref();
746        let value = value.as_ref();
747        unsafe {
748            ffi_try!(ffi::rocksdb_transactiondb_merge(
749                self.inner,
750                writeopts.inner,
751                key.as_ptr() as *const c_char,
752                key.len() as size_t,
753                value.as_ptr() as *const c_char,
754                value.len() as size_t,
755            ));
756            Ok(())
757        }
758    }
759
760    pub fn merge_cf_opt<K, V>(
761        &self,
762        cf: &impl AsColumnFamilyRef,
763        key: K,
764        value: V,
765        writeopts: &WriteOptions,
766    ) -> Result<(), Error>
767    where
768        K: AsRef<[u8]>,
769        V: AsRef<[u8]>,
770    {
771        let key = key.as_ref();
772        let value = value.as_ref();
773        unsafe {
774            ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
775                self.inner,
776                writeopts.inner,
777                cf.inner(),
778                key.as_ptr() as *const c_char,
779                key.len() as size_t,
780                value.as_ptr() as *const c_char,
781                value.len() as size_t,
782            ));
783            Ok(())
784        }
785    }
786
787    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
788        self.delete_opt(key, &WriteOptions::default())
789    }
790
791    pub fn delete_cf<K: AsRef<[u8]>>(
792        &self,
793        cf: &impl AsColumnFamilyRef,
794        key: K,
795    ) -> Result<(), Error> {
796        self.delete_cf_opt(cf, key, &WriteOptions::default())
797    }
798
799    pub fn delete_opt<K: AsRef<[u8]>>(
800        &self,
801        key: K,
802        writeopts: &WriteOptions,
803    ) -> Result<(), Error> {
804        let key = key.as_ref();
805        unsafe {
806            ffi_try!(ffi::rocksdb_transactiondb_delete(
807                self.inner,
808                writeopts.inner,
809                key.as_ptr() as *const c_char,
810                key.len() as size_t,
811            ));
812        }
813        Ok(())
814    }
815
816    pub fn delete_cf_opt<K: AsRef<[u8]>>(
817        &self,
818        cf: &impl AsColumnFamilyRef,
819        key: K,
820        writeopts: &WriteOptions,
821    ) -> Result<(), Error> {
822        let key = key.as_ref();
823        unsafe {
824            ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
825                self.inner,
826                writeopts.inner,
827                cf.inner(),
828                key.as_ptr() as *const c_char,
829                key.len() as size_t,
830            ));
831        }
832        Ok(())
833    }
834
835    pub fn iterator<'a: 'b, 'b>(
836        &'a self,
837        mode: IteratorMode,
838    ) -> DBIteratorWithThreadMode<'b, Self> {
839        let readopts = ReadOptions::default();
840        self.iterator_opt(mode, readopts)
841    }
842
843    pub fn iterator_opt<'a: 'b, 'b>(
844        &'a self,
845        mode: IteratorMode,
846        readopts: ReadOptions,
847    ) -> DBIteratorWithThreadMode<'b, Self> {
848        DBIteratorWithThreadMode::new(self, readopts, mode)
849    }
850
851    /// Opens an iterator using the provided ReadOptions.
852    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
853    pub fn iterator_cf_opt<'a: 'b, 'b>(
854        &'a self,
855        cf_handle: &impl AsColumnFamilyRef,
856        readopts: ReadOptions,
857        mode: IteratorMode,
858    ) -> DBIteratorWithThreadMode<'b, Self> {
859        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
860    }
861
862    /// Opens an iterator with `set_total_order_seek` enabled.
863    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
864    /// with a Hash-based implementation.
865    pub fn full_iterator<'a: 'b, 'b>(
866        &'a self,
867        mode: IteratorMode,
868    ) -> DBIteratorWithThreadMode<'b, Self> {
869        let mut opts = ReadOptions::default();
870        opts.set_total_order_seek(true);
871        DBIteratorWithThreadMode::new(self, opts, mode)
872    }
873
874    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
875        &'a self,
876        prefix: P,
877    ) -> DBIteratorWithThreadMode<'b, Self> {
878        let mut opts = ReadOptions::default();
879        opts.set_prefix_same_as_start(true);
880        DBIteratorWithThreadMode::new(
881            self,
882            opts,
883            IteratorMode::From(prefix.as_ref(), Direction::Forward),
884        )
885    }
886
887    pub fn iterator_cf<'a: 'b, 'b>(
888        &'a self,
889        cf_handle: &impl AsColumnFamilyRef,
890        mode: IteratorMode,
891    ) -> DBIteratorWithThreadMode<'b, Self> {
892        let opts = ReadOptions::default();
893        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
894    }
895
896    pub fn full_iterator_cf<'a: 'b, 'b>(
897        &'a self,
898        cf_handle: &impl AsColumnFamilyRef,
899        mode: IteratorMode,
900    ) -> DBIteratorWithThreadMode<'b, Self> {
901        let mut opts = ReadOptions::default();
902        opts.set_total_order_seek(true);
903        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
904    }
905
906    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
907        &'a self,
908        cf_handle: &impl AsColumnFamilyRef,
909        prefix: P,
910    ) -> DBIteratorWithThreadMode<'a, Self> {
911        let mut opts = ReadOptions::default();
912        opts.set_prefix_same_as_start(true);
913        DBIteratorWithThreadMode::<'a, Self>::new_cf(
914            self,
915            cf_handle.inner(),
916            opts,
917            IteratorMode::From(prefix.as_ref(), Direction::Forward),
918        )
919    }
920
921    /// Opens a raw iterator over the database, using the default read options
922    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
923        let opts = ReadOptions::default();
924        DBRawIteratorWithThreadMode::new(self, opts)
925    }
926
927    /// Opens a raw iterator over the given column family, using the default read options
928    pub fn raw_iterator_cf<'a: 'b, 'b>(
929        &'a self,
930        cf_handle: &impl AsColumnFamilyRef,
931    ) -> DBRawIteratorWithThreadMode<'b, Self> {
932        let opts = ReadOptions::default();
933        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
934    }
935
936    /// Opens a raw iterator over the database, using the given read options
937    pub fn raw_iterator_opt<'a: 'b, 'b>(
938        &'a self,
939        readopts: ReadOptions,
940    ) -> DBRawIteratorWithThreadMode<'b, Self> {
941        DBRawIteratorWithThreadMode::new(self, readopts)
942    }
943
944    /// Opens a raw iterator over the given column family, using the given read options
945    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
946        &'a self,
947        cf_handle: &impl AsColumnFamilyRef,
948        readopts: ReadOptions,
949    ) -> DBRawIteratorWithThreadMode<'b, Self> {
950        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
951    }
952
953    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
954        SnapshotWithThreadMode::<Self>::new(self)
955    }
956
957    fn drop_column_family<C>(
958        &self,
959        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
960        _cf: C,
961    ) -> Result<(), Error> {
962        unsafe {
963            // first mark the column family as dropped
964            ffi_try!(ffi::rocksdb_drop_column_family(
965                self.inner as *mut ffi::rocksdb_t,
966                cf_inner
967            ));
968        }
969        // Since `_cf` is dropped here, the column family handle is destroyed
970        // and any resources (mem, files) are reclaimed.
971        Ok(())
972    }
973}
974
975impl TransactionDB<SingleThreaded> {
976    /// Creates column family with given name and options.
977    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
978        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
979        self.cfs
980            .cfs
981            .insert(name.as_ref().to_string(), ColumnFamily { inner });
982        Ok(())
983    }
984
985    /// Returns the underlying column family handle.
986    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
987        self.cfs.cfs.get(name)
988    }
989
990    /// Drops the column family with the given name
991    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
992        if let Some(cf) = self.cfs.cfs.remove(name) {
993            self.drop_column_family(cf.inner, cf)
994        } else {
995            Err(Error::new(format!("Invalid column family: {name}")))
996        }
997    }
998}
999
1000impl TransactionDB<MultiThreaded> {
1001    /// Creates column family with given name and options.
1002    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1003        // Note that we acquire the cfs lock before inserting: otherwise we might race
1004        // another caller who observed the handle as missing.
1005        let mut cfs = self.cfs.cfs.write().unwrap();
1006        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1007        cfs.insert(
1008            name.as_ref().to_string(),
1009            Arc::new(UnboundColumnFamily { inner }),
1010        );
1011        Ok(())
1012    }
1013
1014    /// Returns the underlying column family handle.
1015    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
1016        self.cfs
1017            .cfs
1018            .read()
1019            .unwrap()
1020            .get(name)
1021            .cloned()
1022            .map(UnboundColumnFamily::bound_column_family)
1023    }
1024
1025    /// Drops the column family with the given name by internally locking the inner column
1026    /// family map. This avoids needing `&mut self` reference
1027    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1028        if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
1029            self.drop_column_family(cf.inner, cf)
1030        } else {
1031            Err(Error::new(format!("Invalid column family: {name}")))
1032        }
1033    }
1034
1035    /// Implementation for property_value et al methods.
1036    ///
1037    /// `name` is the name of the property.  It will be converted into a CString
1038    /// and passed to `get_property` as argument.  `get_property` reads the
1039    /// specified property and either returns NULL or a pointer to a C allocated
1040    /// string; this method takes ownership of that string and will free it at
1041    /// the end. That string is parsed using `parse` callback which produces
1042    /// the returned result.
1043    fn property_value_impl<R>(
1044        name: impl CStrLike,
1045        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1046        parse: impl FnOnce(&str) -> Result<R, Error>,
1047    ) -> Result<Option<R>, Error> {
1048        let value = match name.bake() {
1049            Ok(prop_name) => get_property(prop_name.as_ptr()),
1050            Err(e) => {
1051                return Err(Error::new(format!(
1052                    "Failed to convert property name to CString: {e}"
1053                )));
1054            }
1055        };
1056        if value.is_null() {
1057            return Ok(None);
1058        }
1059        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1060            Ok(s) => parse(s).map(|value| Some(value)),
1061            Err(e) => Err(Error::new(format!(
1062                "Failed to convert property value to string: {e}"
1063            ))),
1064        };
1065        unsafe {
1066            ffi::rocksdb_free(value as *mut c_void);
1067        }
1068        result
1069    }
1070
1071    /// Retrieves a RocksDB property by name.
1072    ///
1073    /// Full list of properties could be find
1074    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1075    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1076        Self::property_value_impl(
1077            name,
1078            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1079            |str_value| Ok(str_value.to_owned()),
1080        )
1081    }
1082
1083    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1084        value.parse::<u64>().map_err(|err| {
1085            Error::new(format!(
1086                "Failed to convert property value {value} to int: {err}"
1087            ))
1088        })
1089    }
1090
1091    /// Retrieves a RocksDB property and casts it to an integer.
1092    ///
1093    /// Full list of properties that return int values could be find
1094    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1095    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1096        Self::property_value_impl(
1097            name,
1098            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1099            Self::parse_property_int_value,
1100        )
1101    }
1102}
1103
1104impl<T: ThreadMode> Drop for TransactionDB<T> {
1105    fn drop(&mut self) {
1106        unsafe {
1107            self.prepared_transactions().clear();
1108            self.cfs.drop_all_cfs_internal();
1109            ffi::rocksdb_transactiondb_close(self.inner);
1110        }
1111    }
1112}