rocksdb/transactions/
transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{
17    collections::BTreeMap,
18    ffi::CString,
19    fs, iter,
20    marker::PhantomData,
21    path::{Path, PathBuf},
22    ptr,
23    sync::{Arc, Mutex},
24};
25
26use crate::{
27    column_family::UnboundColumnFamily,
28    db::{convert_values, DBAccess},
29    db_options::OptionsMustOutliveDB,
30    ffi,
31    ffi_util::to_cpath,
32    AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
33    DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
34    IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
35    ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
36    WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
37};
38use ffi::rocksdb_transaction_t;
39use libc::{c_char, c_int, c_void, size_t};
40
41#[cfg(not(feature = "multi-threaded-cf"))]
42type DefaultThreadMode = crate::SingleThreaded;
43#[cfg(feature = "multi-threaded-cf")]
44type DefaultThreadMode = crate::MultiThreaded;
45
46/// RocksDB TransactionDB.
47///
48/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
49/// to learn more about RocksDB TransactionDB.
50///
51/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
52/// if feature `multi-threaded-cf` is not enabled.
53///
54/// ```
55/// use rocksdb::{DB, Options, TransactionDB, SingleThreaded};
56/// let path = "_path_for_transaction_db";
57/// {
58///     let db: TransactionDB = TransactionDB::open_default(path).unwrap();
59///     db.put(b"my key", b"my value").unwrap();
60///
61///     // create transaction
62///     let txn = db.transaction();
63///     txn.put(b"key2", b"value2");
64///     txn.put(b"key3", b"value3");
65///     txn.commit().unwrap();
66/// }
67/// let _ = DB::destroy(&Options::default(), path);
68/// ```
69///
70/// [`SingleThreaded`]: crate::SingleThreaded
71pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
72    pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
73    cfs: T,
74    path: PathBuf,
75    // prepared 2pc transactions.
76    prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
77    _outlive: Vec<OptionsMustOutliveDB>,
78}
79
80unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
81unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
82
83impl<T: ThreadMode> DBAccess for TransactionDB<T> {
84    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
85        ffi::rocksdb_transactiondb_create_snapshot(self.inner)
86    }
87
88    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
89        ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
90    }
91
92    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
93        ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
94    }
95
96    unsafe fn create_iterator_cf(
97        &self,
98        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
99        readopts: &ReadOptions,
100    ) -> *mut ffi::rocksdb_iterator_t {
101        ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
102    }
103
104    fn get_opt<K: AsRef<[u8]>>(
105        &self,
106        key: K,
107        readopts: &ReadOptions,
108    ) -> Result<Option<Vec<u8>>, Error> {
109        self.get_opt(key, readopts)
110    }
111
112    fn get_cf_opt<K: AsRef<[u8]>>(
113        &self,
114        cf: &impl AsColumnFamilyRef,
115        key: K,
116        readopts: &ReadOptions,
117    ) -> Result<Option<Vec<u8>>, Error> {
118        self.get_cf_opt(cf, key, readopts)
119    }
120
121    fn get_pinned_opt<K: AsRef<[u8]>>(
122        &self,
123        key: K,
124        readopts: &ReadOptions,
125    ) -> Result<Option<DBPinnableSlice>, Error> {
126        self.get_pinned_opt(key, readopts)
127    }
128
129    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
130        &self,
131        cf: &impl AsColumnFamilyRef,
132        key: K,
133        readopts: &ReadOptions,
134    ) -> Result<Option<DBPinnableSlice>, Error> {
135        self.get_pinned_cf_opt(cf, key, readopts)
136    }
137
138    fn multi_get_opt<K, I>(
139        &self,
140        keys: I,
141        readopts: &ReadOptions,
142    ) -> Vec<Result<Option<Vec<u8>>, Error>>
143    where
144        K: AsRef<[u8]>,
145        I: IntoIterator<Item = K>,
146    {
147        self.multi_get_opt(keys, readopts)
148    }
149
150    fn multi_get_cf_opt<'b, K, I, W>(
151        &self,
152        keys_cf: I,
153        readopts: &ReadOptions,
154    ) -> Vec<Result<Option<Vec<u8>>, Error>>
155    where
156        K: AsRef<[u8]>,
157        I: IntoIterator<Item = (&'b W, K)>,
158        W: AsColumnFamilyRef + 'b,
159    {
160        self.multi_get_cf_opt(keys_cf, readopts)
161    }
162}
163
164impl<T: ThreadMode> TransactionDB<T> {
165    /// Opens a database with default options.
166    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
167        let mut opts = Options::default();
168        opts.create_if_missing(true);
169        let txn_db_opts = TransactionDBOptions::default();
170        Self::open(&opts, &txn_db_opts, path)
171    }
172
173    /// Opens the database with the specified options.
174    pub fn open<P: AsRef<Path>>(
175        opts: &Options,
176        txn_db_opts: &TransactionDBOptions,
177        path: P,
178    ) -> Result<Self, Error> {
179        Self::open_cf(opts, txn_db_opts, path, None::<&str>)
180    }
181
182    /// Opens a database with the given database options and column family names.
183    ///
184    /// Column families opened using this function will be created with default `Options`.
185    pub fn open_cf<P, I, N>(
186        opts: &Options,
187        txn_db_opts: &TransactionDBOptions,
188        path: P,
189        cfs: I,
190    ) -> Result<Self, Error>
191    where
192        P: AsRef<Path>,
193        I: IntoIterator<Item = N>,
194        N: AsRef<str>,
195    {
196        let cfs = cfs
197            .into_iter()
198            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
199
200        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
201    }
202
203    /// Opens a database with the given database options and column family descriptors.
204    pub fn open_cf_descriptors<P, I>(
205        opts: &Options,
206        txn_db_opts: &TransactionDBOptions,
207        path: P,
208        cfs: I,
209    ) -> Result<Self, Error>
210    where
211        P: AsRef<Path>,
212        I: IntoIterator<Item = ColumnFamilyDescriptor>,
213    {
214        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
215    }
216
217    /// Internal implementation for opening RocksDB.
218    fn open_cf_descriptors_internal<P, I>(
219        opts: &Options,
220        txn_db_opts: &TransactionDBOptions,
221        path: P,
222        cfs: I,
223    ) -> Result<Self, Error>
224    where
225        P: AsRef<Path>,
226        I: IntoIterator<Item = ColumnFamilyDescriptor>,
227    {
228        let cfs: Vec<_> = cfs.into_iter().collect();
229        let outlive = iter::once(opts.outlive.clone())
230            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
231            .collect();
232
233        let cpath = to_cpath(&path)?;
234
235        if let Err(e) = fs::create_dir_all(&path) {
236            return Err(Error::new(format!(
237                "Failed to create RocksDB directory: `{e:?}`."
238            )));
239        }
240
241        let db: *mut ffi::rocksdb_transactiondb_t;
242        let mut cf_map = BTreeMap::new();
243
244        if cfs.is_empty() {
245            db = Self::open_raw(opts, txn_db_opts, &cpath)?;
246        } else {
247            let mut cfs_v = cfs;
248            // Always open the default column family.
249            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
250                cfs_v.push(ColumnFamilyDescriptor {
251                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
252                    options: Options::default(),
253                });
254            }
255            // We need to store our CStrings in an intermediate vector
256            // so that their pointers remain valid.
257            let c_cfs: Vec<CString> = cfs_v
258                .iter()
259                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
260                .collect();
261
262            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
263
264            // These handles will be populated by DB.
265            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
266
267            let cfopts: Vec<_> = cfs_v
268                .iter()
269                .map(|cf| cf.options.inner as *const _)
270                .collect();
271
272            db = Self::open_cf_raw(
273                opts,
274                txn_db_opts,
275                &cpath,
276                &cfs_v,
277                &cfnames,
278                &cfopts,
279                &mut cfhandles,
280            )?;
281
282            for handle in &cfhandles {
283                if handle.is_null() {
284                    return Err(Error::new(
285                        "Received null column family handle from DB.".to_owned(),
286                    ));
287                }
288            }
289
290            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
291                cf_map.insert(cf_desc.name.clone(), inner);
292            }
293        }
294
295        if db.is_null() {
296            return Err(Error::new("Could not initialize database.".to_owned()));
297        }
298
299        let prepared = unsafe {
300            let mut cnt = 0;
301            let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
302            let mut vec = vec![std::ptr::null_mut(); cnt];
303            std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
304            if !ptr.is_null() {
305                ffi::rocksdb_free(ptr as *mut c_void);
306            }
307            vec
308        };
309
310        Ok(TransactionDB {
311            inner: db,
312            cfs: T::new_cf_map_internal(cf_map),
313            path: path.as_ref().to_path_buf(),
314            prepared: Mutex::new(prepared),
315            _outlive: outlive,
316        })
317    }
318
319    fn open_raw(
320        opts: &Options,
321        txn_db_opts: &TransactionDBOptions,
322        cpath: &CString,
323    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
324        unsafe {
325            let db = ffi_try!(ffi::rocksdb_transactiondb_open(
326                opts.inner,
327                txn_db_opts.inner,
328                cpath.as_ptr()
329            ));
330            Ok(db)
331        }
332    }
333
334    fn open_cf_raw(
335        opts: &Options,
336        txn_db_opts: &TransactionDBOptions,
337        cpath: &CString,
338        cfs_v: &[ColumnFamilyDescriptor],
339        cfnames: &[*const c_char],
340        cfopts: &[*const ffi::rocksdb_options_t],
341        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
342    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
343        unsafe {
344            let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
345                opts.inner,
346                txn_db_opts.inner,
347                cpath.as_ptr(),
348                cfs_v.len() as c_int,
349                cfnames.as_ptr(),
350                cfopts.as_ptr(),
351                cfhandles.as_mut_ptr(),
352            ));
353            Ok(db)
354        }
355    }
356
357    fn create_inner_cf_handle(
358        &self,
359        name: &str,
360        opts: &Options,
361    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
362        let cf_name = CString::new(name.as_bytes()).map_err(|_| {
363            Error::new("Failed to convert path to CString when creating cf".to_owned())
364        })?;
365
366        Ok(unsafe {
367            ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
368                self.inner,
369                opts.inner,
370                cf_name.as_ptr(),
371            ))
372        })
373    }
374
375    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
376        DB::list_cf(opts, path)
377    }
378
379    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
380        DB::destroy(opts, path)
381    }
382
383    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
384        DB::repair(opts, path)
385    }
386
387    pub fn path(&self) -> &Path {
388        self.path.as_path()
389    }
390
391    /// Creates a transaction with default options.
392    pub fn transaction(&self) -> Transaction<Self> {
393        self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
394    }
395
396    /// Creates a transaction with options.
397    pub fn transaction_opt<'a>(
398        &'a self,
399        write_opts: &WriteOptions,
400        txn_opts: &TransactionOptions,
401    ) -> Transaction<'a, Self> {
402        Transaction {
403            inner: unsafe {
404                ffi::rocksdb_transaction_begin(
405                    self.inner,
406                    write_opts.inner,
407                    txn_opts.inner,
408                    std::ptr::null_mut(),
409                )
410            },
411            _marker: PhantomData::default(),
412        }
413    }
414
415    /// Get all prepared transactions for recovery.
416    ///
417    /// This function is expected to call once after open database.
418    /// User should commit or rollback all transactions before start other transactions.
419    pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
420        self.prepared
421            .lock()
422            .unwrap()
423            .drain(0..)
424            .map(|inner| Transaction {
425                inner,
426                _marker: PhantomData::default(),
427            })
428            .collect()
429    }
430
431    /// Returns the bytes associated with a key value.
432    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
433        self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
434    }
435
436    /// Returns the bytes associated with a key value and the given column family.
437    pub fn get_cf<K: AsRef<[u8]>>(
438        &self,
439        cf: &impl AsColumnFamilyRef,
440        key: K,
441    ) -> Result<Option<Vec<u8>>, Error> {
442        self.get_pinned_cf(cf, key)
443            .map(|x| x.map(|v| v.as_ref().to_vec()))
444    }
445
446    /// Returns the bytes associated with a key value with read options.
447    pub fn get_opt<K: AsRef<[u8]>>(
448        &self,
449        key: K,
450        readopts: &ReadOptions,
451    ) -> Result<Option<Vec<u8>>, Error> {
452        self.get_pinned_opt(key, readopts)
453            .map(|x| x.map(|v| v.as_ref().to_vec()))
454    }
455
456    /// Returns the bytes associated with a key value and the given column family with read options.
457    pub fn get_cf_opt<K: AsRef<[u8]>>(
458        &self,
459        cf: &impl AsColumnFamilyRef,
460        key: K,
461        readopts: &ReadOptions,
462    ) -> Result<Option<Vec<u8>>, Error> {
463        self.get_pinned_cf_opt(cf, key, readopts)
464            .map(|x| x.map(|v| v.as_ref().to_vec()))
465    }
466
467    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
468        self.get_pinned_opt(key, &ReadOptions::default())
469    }
470
471    /// Returns the bytes associated with a key value and the given column family.
472    pub fn get_pinned_cf<K: AsRef<[u8]>>(
473        &self,
474        cf: &impl AsColumnFamilyRef,
475        key: K,
476    ) -> Result<Option<DBPinnableSlice>, Error> {
477        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
478    }
479
480    /// Returns the bytes associated with a key value with read options.
481    pub fn get_pinned_opt<K: AsRef<[u8]>>(
482        &self,
483        key: K,
484        readopts: &ReadOptions,
485    ) -> Result<Option<DBPinnableSlice>, Error> {
486        let key = key.as_ref();
487        unsafe {
488            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
489                self.inner,
490                readopts.inner,
491                key.as_ptr() as *const c_char,
492                key.len() as size_t,
493            ));
494            if val.is_null() {
495                Ok(None)
496            } else {
497                Ok(Some(DBPinnableSlice::from_c(val)))
498            }
499        }
500    }
501
502    /// Returns the bytes associated with a key value and the given column family with read options.
503    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
504        &self,
505        cf: &impl AsColumnFamilyRef,
506        key: K,
507        readopts: &ReadOptions,
508    ) -> Result<Option<DBPinnableSlice>, Error> {
509        unsafe {
510            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
511                self.inner,
512                readopts.inner,
513                cf.inner(),
514                key.as_ref().as_ptr() as *const c_char,
515                key.as_ref().len() as size_t,
516            ));
517            if val.is_null() {
518                Ok(None)
519            } else {
520                Ok(Some(DBPinnableSlice::from_c(val)))
521            }
522        }
523    }
524
525    /// Return the values associated with the given keys.
526    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
527    where
528        K: AsRef<[u8]>,
529        I: IntoIterator<Item = K>,
530    {
531        self.multi_get_opt(keys, &ReadOptions::default())
532    }
533
534    /// Return the values associated with the given keys using read options.
535    pub fn multi_get_opt<K, I>(
536        &self,
537        keys: I,
538        readopts: &ReadOptions,
539    ) -> Vec<Result<Option<Vec<u8>>, Error>>
540    where
541        K: AsRef<[u8]>,
542        I: IntoIterator<Item = K>,
543    {
544        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
545            .into_iter()
546            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
547            .unzip();
548        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
549
550        let mut values = vec![ptr::null_mut(); keys.len()];
551        let mut values_sizes = vec![0_usize; keys.len()];
552        let mut errors = vec![ptr::null_mut(); keys.len()];
553        unsafe {
554            ffi::rocksdb_transactiondb_multi_get(
555                self.inner,
556                readopts.inner,
557                ptr_keys.len(),
558                ptr_keys.as_ptr(),
559                keys_sizes.as_ptr(),
560                values.as_mut_ptr(),
561                values_sizes.as_mut_ptr(),
562                errors.as_mut_ptr(),
563            );
564        }
565
566        convert_values(values, values_sizes, errors)
567    }
568
569    /// Return the values associated with the given keys and column families.
570    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
571        &'a self,
572        keys: I,
573    ) -> Vec<Result<Option<Vec<u8>>, Error>>
574    where
575        K: AsRef<[u8]>,
576        I: IntoIterator<Item = (&'b W, K)>,
577        W: 'b + AsColumnFamilyRef,
578    {
579        self.multi_get_cf_opt(keys, &ReadOptions::default())
580    }
581
582    /// Return the values associated with the given keys and column families using read options.
583    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
584        &'a self,
585        keys: I,
586        readopts: &ReadOptions,
587    ) -> Vec<Result<Option<Vec<u8>>, Error>>
588    where
589        K: AsRef<[u8]>,
590        I: IntoIterator<Item = (&'b W, K)>,
591        W: 'b + AsColumnFamilyRef,
592    {
593        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
594            .into_iter()
595            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
596            .unzip();
597        let ptr_keys: Vec<_> = cfs_and_keys
598            .iter()
599            .map(|(_, k)| k.as_ptr() as *const c_char)
600            .collect();
601        let ptr_cfs: Vec<_> = cfs_and_keys
602            .iter()
603            .map(|(c, _)| c.inner() as *const _)
604            .collect();
605
606        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
607        let mut values_sizes = vec![0_usize; ptr_keys.len()];
608        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
609        unsafe {
610            ffi::rocksdb_transactiondb_multi_get_cf(
611                self.inner,
612                readopts.inner,
613                ptr_cfs.as_ptr(),
614                ptr_keys.len(),
615                ptr_keys.as_ptr(),
616                keys_sizes.as_ptr(),
617                values.as_mut_ptr(),
618                values_sizes.as_mut_ptr(),
619                errors.as_mut_ptr(),
620            );
621        }
622
623        convert_values(values, values_sizes, errors)
624    }
625
626    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
627    where
628        K: AsRef<[u8]>,
629        V: AsRef<[u8]>,
630    {
631        self.put_opt(key, value, &WriteOptions::default())
632    }
633
634    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
635    where
636        K: AsRef<[u8]>,
637        V: AsRef<[u8]>,
638    {
639        self.put_cf_opt(cf, key, value, &WriteOptions::default())
640    }
641
642    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
643    where
644        K: AsRef<[u8]>,
645        V: AsRef<[u8]>,
646    {
647        unsafe {
648            ffi_try!(ffi::rocksdb_transactiondb_put(
649                self.inner,
650                writeopts.inner,
651                key.as_ref().as_ptr() as *const c_char,
652                key.as_ref().len() as size_t,
653                value.as_ref().as_ptr() as *const c_char,
654                value.as_ref().len() as size_t
655            ));
656        }
657        Ok(())
658    }
659
660    pub fn put_cf_opt<K, V>(
661        &self,
662        cf: &impl AsColumnFamilyRef,
663        key: K,
664        value: V,
665        writeopts: &WriteOptions,
666    ) -> Result<(), Error>
667    where
668        K: AsRef<[u8]>,
669        V: AsRef<[u8]>,
670    {
671        unsafe {
672            ffi_try!(ffi::rocksdb_transactiondb_put_cf(
673                self.inner,
674                writeopts.inner,
675                cf.inner(),
676                key.as_ref().as_ptr() as *const c_char,
677                key.as_ref().len() as size_t,
678                value.as_ref().as_ptr() as *const c_char,
679                value.as_ref().len() as size_t
680            ));
681        }
682        Ok(())
683    }
684
685    pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
686        self.write_opt(batch, &WriteOptions::default())
687    }
688
689    pub fn write_opt(
690        &self,
691        batch: WriteBatchWithTransaction<true>,
692        writeopts: &WriteOptions,
693    ) -> Result<(), Error> {
694        unsafe {
695            ffi_try!(ffi::rocksdb_transactiondb_write(
696                self.inner,
697                writeopts.inner,
698                batch.inner
699            ));
700        }
701        Ok(())
702    }
703
704    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
705    where
706        K: AsRef<[u8]>,
707        V: AsRef<[u8]>,
708    {
709        self.merge_opt(key, value, &WriteOptions::default())
710    }
711
712    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
713    where
714        K: AsRef<[u8]>,
715        V: AsRef<[u8]>,
716    {
717        self.merge_cf_opt(cf, key, value, &WriteOptions::default())
718    }
719
720    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
721    where
722        K: AsRef<[u8]>,
723        V: AsRef<[u8]>,
724    {
725        unsafe {
726            ffi_try!(ffi::rocksdb_transactiondb_merge(
727                self.inner,
728                writeopts.inner,
729                key.as_ref().as_ptr() as *const c_char,
730                key.as_ref().len() as size_t,
731                value.as_ref().as_ptr() as *const c_char,
732                value.as_ref().len() as size_t,
733            ));
734            Ok(())
735        }
736    }
737
738    pub fn merge_cf_opt<K, V>(
739        &self,
740        cf: &impl AsColumnFamilyRef,
741        key: K,
742        value: V,
743        writeopts: &WriteOptions,
744    ) -> Result<(), Error>
745    where
746        K: AsRef<[u8]>,
747        V: AsRef<[u8]>,
748    {
749        unsafe {
750            ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
751                self.inner,
752                writeopts.inner,
753                cf.inner(),
754                key.as_ref().as_ptr() as *const c_char,
755                key.as_ref().len() as size_t,
756                value.as_ref().as_ptr() as *const c_char,
757                value.as_ref().len() as size_t,
758            ));
759            Ok(())
760        }
761    }
762
763    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
764        self.delete_opt(key, &WriteOptions::default())
765    }
766
767    pub fn delete_cf<K: AsRef<[u8]>>(
768        &self,
769        cf: &impl AsColumnFamilyRef,
770        key: K,
771    ) -> Result<(), Error> {
772        self.delete_cf_opt(cf, key, &WriteOptions::default())
773    }
774
775    pub fn delete_opt<K: AsRef<[u8]>>(
776        &self,
777        key: K,
778        writeopts: &WriteOptions,
779    ) -> Result<(), Error> {
780        unsafe {
781            ffi_try!(ffi::rocksdb_transactiondb_delete(
782                self.inner,
783                writeopts.inner,
784                key.as_ref().as_ptr() as *const c_char,
785                key.as_ref().len() as size_t,
786            ));
787        }
788        Ok(())
789    }
790
791    pub fn delete_cf_opt<K: AsRef<[u8]>>(
792        &self,
793        cf: &impl AsColumnFamilyRef,
794        key: K,
795        writeopts: &WriteOptions,
796    ) -> Result<(), Error> {
797        unsafe {
798            ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
799                self.inner,
800                writeopts.inner,
801                cf.inner(),
802                key.as_ref().as_ptr() as *const c_char,
803                key.as_ref().len() as size_t,
804            ));
805        }
806        Ok(())
807    }
808
809    pub fn iterator<'a: 'b, 'b>(
810        &'a self,
811        mode: IteratorMode,
812    ) -> DBIteratorWithThreadMode<'b, Self> {
813        let readopts = ReadOptions::default();
814        self.iterator_opt(mode, readopts)
815    }
816
817    pub fn iterator_opt<'a: 'b, 'b>(
818        &'a self,
819        mode: IteratorMode,
820        readopts: ReadOptions,
821    ) -> DBIteratorWithThreadMode<'b, Self> {
822        DBIteratorWithThreadMode::new(self, readopts, mode)
823    }
824
825    /// Opens an iterator using the provided ReadOptions.
826    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
827    pub fn iterator_cf_opt<'a: 'b, 'b>(
828        &'a self,
829        cf_handle: &impl AsColumnFamilyRef,
830        readopts: ReadOptions,
831        mode: IteratorMode,
832    ) -> DBIteratorWithThreadMode<'b, Self> {
833        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
834    }
835
836    /// Opens an iterator with `set_total_order_seek` enabled.
837    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
838    /// with a Hash-based implementation.
839    pub fn full_iterator<'a: 'b, 'b>(
840        &'a self,
841        mode: IteratorMode,
842    ) -> DBIteratorWithThreadMode<'b, Self> {
843        let mut opts = ReadOptions::default();
844        opts.set_total_order_seek(true);
845        DBIteratorWithThreadMode::new(self, opts, mode)
846    }
847
848    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
849        &'a self,
850        prefix: P,
851    ) -> DBIteratorWithThreadMode<'b, Self> {
852        let mut opts = ReadOptions::default();
853        opts.set_prefix_same_as_start(true);
854        DBIteratorWithThreadMode::new(
855            self,
856            opts,
857            IteratorMode::From(prefix.as_ref(), Direction::Forward),
858        )
859    }
860
861    pub fn iterator_cf<'a: 'b, 'b>(
862        &'a self,
863        cf_handle: &impl AsColumnFamilyRef,
864        mode: IteratorMode,
865    ) -> DBIteratorWithThreadMode<'b, Self> {
866        let opts = ReadOptions::default();
867        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
868    }
869
870    pub fn full_iterator_cf<'a: 'b, 'b>(
871        &'a self,
872        cf_handle: &impl AsColumnFamilyRef,
873        mode: IteratorMode,
874    ) -> DBIteratorWithThreadMode<'b, Self> {
875        let mut opts = ReadOptions::default();
876        opts.set_total_order_seek(true);
877        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
878    }
879
880    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
881        &'a self,
882        cf_handle: &impl AsColumnFamilyRef,
883        prefix: P,
884    ) -> DBIteratorWithThreadMode<'a, Self> {
885        let mut opts = ReadOptions::default();
886        opts.set_prefix_same_as_start(true);
887        DBIteratorWithThreadMode::<'a, Self>::new_cf(
888            self,
889            cf_handle.inner(),
890            opts,
891            IteratorMode::From(prefix.as_ref(), Direction::Forward),
892        )
893    }
894
895    /// Opens a raw iterator over the database, using the default read options
896    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
897        let opts = ReadOptions::default();
898        DBRawIteratorWithThreadMode::new(self, opts)
899    }
900
901    /// Opens a raw iterator over the given column family, using the default read options
902    pub fn raw_iterator_cf<'a: 'b, 'b>(
903        &'a self,
904        cf_handle: &impl AsColumnFamilyRef,
905    ) -> DBRawIteratorWithThreadMode<'b, Self> {
906        let opts = ReadOptions::default();
907        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
908    }
909
910    /// Opens a raw iterator over the database, using the given read options
911    pub fn raw_iterator_opt<'a: 'b, 'b>(
912        &'a self,
913        readopts: ReadOptions,
914    ) -> DBRawIteratorWithThreadMode<'b, Self> {
915        DBRawIteratorWithThreadMode::new(self, readopts)
916    }
917
918    /// Opens a raw iterator over the given column family, using the given read options
919    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
920        &'a self,
921        cf_handle: &impl AsColumnFamilyRef,
922        readopts: ReadOptions,
923    ) -> DBRawIteratorWithThreadMode<'b, Self> {
924        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
925    }
926
927    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
928        SnapshotWithThreadMode::<Self>::new(self)
929    }
930
931    fn drop_column_family<C>(
932        &self,
933        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
934        _cf: C,
935    ) -> Result<(), Error> {
936        unsafe {
937            // first mark the column family as dropped
938            ffi_try!(ffi::rocksdb_drop_column_family(
939                self.inner as *mut ffi::rocksdb_t,
940                cf_inner
941            ));
942        }
943        // Since `_cf` is dropped here, the column family handle is destroyed
944        // and any resources (mem, files) are reclaimed.
945        Ok(())
946    }
947}
948
949impl TransactionDB<SingleThreaded> {
950    /// Creates column family with given name and options.
951    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
952        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
953        self.cfs
954            .cfs
955            .insert(name.as_ref().to_string(), ColumnFamily { inner });
956        Ok(())
957    }
958
959    /// Returns the underlying column family handle.
960    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
961        self.cfs.cfs.get(name)
962    }
963
964    /// Drops the column family with the given name
965    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
966        if let Some(cf) = self.cfs.cfs.remove(name) {
967            self.drop_column_family(cf.inner, cf)
968        } else {
969            Err(Error::new(format!("Invalid column family: {name}")))
970        }
971    }
972}
973
974impl TransactionDB<MultiThreaded> {
975    /// Creates column family with given name and options.
976    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
977        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
978        self.cfs.cfs.write().unwrap().insert(
979            name.as_ref().to_string(),
980            Arc::new(UnboundColumnFamily { inner }),
981        );
982        Ok(())
983    }
984
985    /// Returns the underlying column family handle.
986    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
987        self.cfs
988            .cfs
989            .read()
990            .unwrap()
991            .get(name)
992            .cloned()
993            .map(UnboundColumnFamily::bound_column_family)
994    }
995
996    /// Drops the column family with the given name by internally locking the inner column
997    /// family map. This avoids needing `&mut self` reference
998    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
999        if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
1000            self.drop_column_family(cf.inner, cf)
1001        } else {
1002            Err(Error::new(format!("Invalid column family: {name}")))
1003        }
1004    }
1005}
1006
1007impl<T: ThreadMode> Drop for TransactionDB<T> {
1008    fn drop(&mut self) {
1009        unsafe {
1010            self.prepared_transactions().clear();
1011            self.cfs.drop_all_cfs_internal();
1012            ffi::rocksdb_transactiondb_close(self.inner);
1013        }
1014    }
1015}