rocksdb/transactions/
optimistic_transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int, size_t};
19
20use crate::column_family::ColumnFamilyTtl;
21use crate::{
22    db::{DBCommon, DBInner},
23    ffi,
24    ffi_util::to_cpath,
25    write_batch::WriteBatchWithTransaction,
26    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options,
27    ThreadMode, Transaction, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
28};
29
30/// A type alias to RocksDB Optimistic Transaction DB.
31///
32/// Please read the official
33/// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb)
34/// to learn more about RocksDB OptimisticTransactionDB.
35///
36/// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`]
37/// if feature `multi-threaded-cf` is not enabled.
38///
39/// See [`DBCommon`] for full list of methods.
40///
41/// # Examples
42///
43/// ```
44/// use rocksdb::{DB, Options, OptimisticTransactionDB, SingleThreaded};
45/// let tempdir = tempfile::Builder::new()
46///     .prefix("_path_for_optimistic_transaction_db")
47///     .tempdir()
48///     .expect("Failed to create temporary path for the _path_for_optimistic_transaction_db");
49/// let path = tempdir.path();
50/// {
51///     let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(path).unwrap();
52///     db.put(b"my key", b"my value").unwrap();
53///
54///     // create transaction
55///     let txn = db.transaction();
56///     txn.put(b"key2", b"value2");
57///     txn.put(b"key3", b"value3");
58///     txn.commit().unwrap();
59/// }
60/// let _ = DB::destroy(&Options::default(), path);
61/// ```
62///
63/// [`SingleThreaded`]: crate::SingleThreaded
64#[cfg(not(feature = "multi-threaded-cf"))]
65pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
66    DBCommon<T, OptimisticTransactionDBInner>;
67#[cfg(feature = "multi-threaded-cf")]
68pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
69    DBCommon<T, OptimisticTransactionDBInner>;
70
71pub struct OptimisticTransactionDBInner {
72    base: *mut ffi::rocksdb_t,
73    db: *mut ffi::rocksdb_optimistictransactiondb_t,
74}
75
76impl DBInner for OptimisticTransactionDBInner {
77    fn inner(&self) -> *mut ffi::rocksdb_t {
78        self.base
79    }
80}
81
82impl Drop for OptimisticTransactionDBInner {
83    fn drop(&mut self) {
84        unsafe {
85            ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
86            ffi::rocksdb_optimistictransactiondb_close(self.db);
87        }
88    }
89}
90
91/// Methods of `OptimisticTransactionDB`.
92impl<T: ThreadMode> OptimisticTransactionDB<T> {
93    /// Opens a database with default options.
94    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
95        let mut opts = Options::default();
96        opts.create_if_missing(true);
97        Self::open(&opts, path)
98    }
99
100    /// Opens the database with the specified options.
101    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
102        Self::open_cf(opts, path, None::<&str>)
103    }
104
105    /// Opens a database with the given database options and column family names.
106    ///
107    /// Column families opened using this function will be created with default `Options`.
108    /// *NOTE*: `default` column family will be opened with the `Options::default()`.
109    /// If you want to open `default` column family with custom options, use `open_cf_descriptors` and
110    /// provide a `ColumnFamilyDescriptor` with the desired options.
111    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
112    where
113        P: AsRef<Path>,
114        I: IntoIterator<Item = N>,
115        N: AsRef<str>,
116    {
117        let cfs = cfs
118            .into_iter()
119            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
120
121        Self::open_cf_descriptors_internal(opts, path, cfs)
122    }
123
124    /// Opens a database with the given database options and column family descriptors.
125    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
126    where
127        P: AsRef<Path>,
128        I: IntoIterator<Item = ColumnFamilyDescriptor>,
129    {
130        Self::open_cf_descriptors_internal(opts, path, cfs)
131    }
132
133    /// Internal implementation for opening RocksDB.
134    fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
135    where
136        P: AsRef<Path>,
137        I: IntoIterator<Item = ColumnFamilyDescriptor>,
138    {
139        let cfs: Vec<_> = cfs.into_iter().collect();
140        let outlive = iter::once(opts.outlive.clone())
141            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
142            .collect();
143
144        let cpath = to_cpath(&path)?;
145
146        if let Err(e) = fs::create_dir_all(&path) {
147            return Err(Error::new(format!(
148                "Failed to create RocksDB directory: `{e:?}`."
149            )));
150        }
151
152        let db: *mut ffi::rocksdb_optimistictransactiondb_t;
153        let mut cf_map = BTreeMap::new();
154
155        if cfs.is_empty() {
156            db = Self::open_raw(opts, &cpath)?;
157        } else {
158            let mut cfs_v = cfs;
159            // Always open the default column family.
160            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
161                cfs_v.push(ColumnFamilyDescriptor {
162                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
163                    options: Options::default(),
164                    ttl: ColumnFamilyTtl::SameAsDb,
165                });
166            }
167            // We need to store our CStrings in an intermediate vector
168            // so that their pointers remain valid.
169            let c_cfs: Vec<CString> = cfs_v
170                .iter()
171                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
172                .collect();
173
174            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
175
176            // These handles will be populated by DB.
177            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
178
179            let cfopts: Vec<_> = cfs_v
180                .iter()
181                .map(|cf| cf.options.inner.cast_const())
182                .collect();
183
184            db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
185
186            for handle in &cfhandles {
187                if handle.is_null() {
188                    return Err(Error::new(
189                        "Received null column family handle from DB.".to_owned(),
190                    ));
191                }
192            }
193
194            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
195                cf_map.insert(cf_desc.name.clone(), inner);
196            }
197        }
198
199        if db.is_null() {
200            return Err(Error::new("Could not initialize database.".to_owned()));
201        }
202
203        let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
204        if base.is_null() {
205            unsafe {
206                ffi::rocksdb_optimistictransactiondb_close(db);
207            }
208            return Err(Error::new("Could not initialize database.".to_owned()));
209        }
210        let inner = OptimisticTransactionDBInner { base, db };
211
212        Ok(Self::new(
213            inner,
214            T::new_cf_map_internal(cf_map),
215            path.as_ref().to_path_buf(),
216            outlive,
217        ))
218    }
219
220    fn open_raw(
221        opts: &Options,
222        cpath: &CString,
223    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
224        unsafe {
225            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
226                opts.inner,
227                cpath.as_ptr()
228            ));
229            Ok(db)
230        }
231    }
232
233    fn open_cf_raw(
234        opts: &Options,
235        cpath: &CString,
236        cfs_v: &[ColumnFamilyDescriptor],
237        cfnames: &[*const c_char],
238        cfopts: &[*const ffi::rocksdb_options_t],
239        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
240    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
241        unsafe {
242            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
243                opts.inner,
244                cpath.as_ptr(),
245                cfs_v.len() as c_int,
246                cfnames.as_ptr(),
247                cfopts.as_ptr(),
248                cfhandles.as_mut_ptr(),
249            ));
250            Ok(db)
251        }
252    }
253
254    /// Creates a transaction with default options.
255    pub fn transaction(&self) -> Transaction<Self> {
256        self.transaction_opt(
257            &WriteOptions::default(),
258            &OptimisticTransactionOptions::default(),
259        )
260    }
261
262    /// Creates a transaction with default options.
263    pub fn transaction_opt(
264        &self,
265        writeopts: &WriteOptions,
266        otxn_opts: &OptimisticTransactionOptions,
267    ) -> Transaction<Self> {
268        Transaction {
269            inner: unsafe {
270                ffi::rocksdb_optimistictransaction_begin(
271                    self.inner.db,
272                    writeopts.inner,
273                    otxn_opts.inner,
274                    std::ptr::null_mut(),
275                )
276            },
277            _marker: PhantomData,
278        }
279    }
280
281    pub fn write_opt(
282        &self,
283        batch: WriteBatchWithTransaction<true>,
284        writeopts: &WriteOptions,
285    ) -> Result<(), Error> {
286        unsafe {
287            ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
288                self.inner.db,
289                writeopts.inner,
290                batch.inner
291            ));
292        }
293        Ok(())
294    }
295
296    pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
297        self.write_opt(batch, &WriteOptions::default())
298    }
299
300    pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
301        let mut wo = WriteOptions::new();
302        wo.disable_wal(true);
303        self.write_opt(batch, &wo)
304    }
305
306    /// Removes the database entries in the range `["from", "to")` using given write options.
307    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
308        &self,
309        cf: &impl AsColumnFamilyRef,
310        from: K,
311        to: K,
312        writeopts: &WriteOptions,
313    ) -> Result<(), Error> {
314        let from = from.as_ref();
315        let to = to.as_ref();
316
317        unsafe {
318            ffi_try!(ffi::rocksdb_delete_range_cf(
319                self.inner.inner(),
320                writeopts.inner,
321                cf.inner(),
322                from.as_ptr() as *const c_char,
323                from.len() as size_t,
324                to.as_ptr() as *const c_char,
325                to.len() as size_t,
326            ));
327            Ok(())
328        }
329    }
330
331    /// Removes the database entries in the range `["from", "to")` using default write options.
332    pub fn delete_range_cf<K: AsRef<[u8]>>(
333        &self,
334        cf: &impl AsColumnFamilyRef,
335        from: K,
336        to: K,
337    ) -> Result<(), Error> {
338        self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
339    }
340}