rocksdb/transactions/
transaction.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{marker::PhantomData, ptr};
17
18use crate::{
19    db::{convert_values, DBAccess},
20    ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
21    Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
22};
23use libc::{c_char, c_void, size_t};
24
25/// RocksDB Transaction.
26///
27/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
28///
29/// [`TransactionDB`]: crate::TransactionDB
30/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
31pub struct Transaction<'db, DB> {
32    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
33    pub(crate) _marker: PhantomData<&'db DB>,
34}
35
36unsafe impl<DB> Send for Transaction<'_, DB> {}
37
38impl<DB> DBAccess for Transaction<'_, DB> {
39    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
40        ffi::rocksdb_transaction_get_snapshot(self.inner)
41    }
42
43    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
44        ffi::rocksdb_free(snapshot as *mut c_void);
45    }
46
47    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
48        ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner)
49    }
50
51    unsafe fn create_iterator_cf(
52        &self,
53        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
54        readopts: &ReadOptions,
55    ) -> *mut ffi::rocksdb_iterator_t {
56        ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
57    }
58
59    fn get_opt<K: AsRef<[u8]>>(
60        &self,
61        key: K,
62        readopts: &ReadOptions,
63    ) -> Result<Option<Vec<u8>>, Error> {
64        self.get_opt(key, readopts)
65    }
66
67    fn get_cf_opt<K: AsRef<[u8]>>(
68        &self,
69        cf: &impl AsColumnFamilyRef,
70        key: K,
71        readopts: &ReadOptions,
72    ) -> Result<Option<Vec<u8>>, Error> {
73        self.get_cf_opt(cf, key, readopts)
74    }
75
76    fn get_pinned_opt<K: AsRef<[u8]>>(
77        &self,
78        key: K,
79        readopts: &ReadOptions,
80    ) -> Result<Option<DBPinnableSlice>, Error> {
81        self.get_pinned_opt(key, readopts)
82    }
83
84    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
85        &self,
86        cf: &impl AsColumnFamilyRef,
87        key: K,
88        readopts: &ReadOptions,
89    ) -> Result<Option<DBPinnableSlice>, Error> {
90        self.get_pinned_cf_opt(cf, key, readopts)
91    }
92
93    fn multi_get_opt<K, I>(
94        &self,
95        keys: I,
96        readopts: &ReadOptions,
97    ) -> Vec<Result<Option<Vec<u8>>, Error>>
98    where
99        K: AsRef<[u8]>,
100        I: IntoIterator<Item = K>,
101    {
102        self.multi_get_opt(keys, readopts)
103    }
104
105    fn multi_get_cf_opt<'b, K, I, W>(
106        &self,
107        keys_cf: I,
108        readopts: &ReadOptions,
109    ) -> Vec<Result<Option<Vec<u8>>, Error>>
110    where
111        K: AsRef<[u8]>,
112        I: IntoIterator<Item = (&'b W, K)>,
113        W: AsColumnFamilyRef + 'b,
114    {
115        self.multi_get_cf_opt(keys_cf, readopts)
116    }
117}
118
119impl<DB> Transaction<'_, DB> {
120    /// Write all batched keys to the DB atomically.
121    ///
122    /// May return any error that could be returned by `DB::write`.
123    ///
124    /// If this transaction was created by a [`TransactionDB`], an error of
125    /// the [`Expired`] kind may be returned if this transaction has
126    /// lived longer than expiration time in [`TransactionOptions`].
127    ///
128    /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
129    /// the [`Busy`] kind may be returned if the transaction
130    /// could not guarantee that there are no write conflicts.
131    /// An error of the [`TryAgain`] kind may be returned if the memtable
132    /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
133    ///
134    /// [`Expired`]: crate::ErrorKind::Expired
135    /// [`TransactionOptions`]: crate::TransactionOptions
136    /// [`TransactionDB`]: crate::TransactionDB
137    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
138    /// [`Busy`]: crate::ErrorKind::Busy
139    /// [`TryAgain`]: crate::ErrorKind::TryAgain
140    /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
141    pub fn commit(self) -> Result<(), Error> {
142        unsafe {
143            ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
144        }
145        Ok(())
146    }
147
148    pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
149        let ptr = name.as_ptr();
150        let len = name.len();
151        unsafe {
152            ffi_try!(ffi::rocksdb_transaction_set_name(
153                self.inner, ptr as _, len as _
154            ));
155        }
156
157        Ok(())
158    }
159
160    pub fn get_name(&self) -> Option<Vec<u8>> {
161        unsafe {
162            let mut name_len = 0;
163            let name = ffi::rocksdb_transaction_get_name(self.inner, &mut name_len);
164            if name.is_null() {
165                None
166            } else {
167                let mut vec = vec![0; name_len];
168                std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
169                ffi::rocksdb_free(name as *mut c_void);
170                Some(vec)
171            }
172        }
173    }
174
175    pub fn prepare(&self) -> Result<(), Error> {
176        unsafe {
177            ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
178        }
179        Ok(())
180    }
181
182    /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
183    /// Otherwise, returns a snapshot with `nullptr` inside which doesn't affect read operations.
184    ///
185    /// [`TransactionOptions`]: crate::TransactionOptions
186    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
187        SnapshotWithThreadMode::new(self)
188    }
189
190    /// Discard all batched writes in this transaction.
191    pub fn rollback(&self) -> Result<(), Error> {
192        unsafe {
193            ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
194            Ok(())
195        }
196    }
197
198    /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
199    /// May be called multiple times to set multiple save points.
200    ///
201    /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
202    pub fn set_savepoint(&self) {
203        unsafe {
204            ffi::rocksdb_transaction_set_savepoint(self.inner);
205        }
206    }
207
208    /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
209    /// and removes the most recent [`set_savepoint`].
210    ///
211    /// Returns error if there is no previous call to [`set_savepoint`].
212    ///
213    /// [`set_savepoint`]: Self::set_savepoint
214    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
215        unsafe {
216            ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
217            Ok(())
218        }
219    }
220
221    /// Get the bytes associated with a key value.
222    ///
223    /// See [`get_cf_opt`] for details.
224    ///
225    /// [`get_cf_opt`]: Self::get_cf_opt
226    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
227        self.get_opt(key, &ReadOptions::default())
228    }
229
230    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
231        self.get_pinned_opt(key, &ReadOptions::default())
232    }
233
234    /// Get the bytes associated with a key value and the given column family.
235    ///
236    /// See [`get_cf_opt`] for details.
237    ///
238    /// [`get_cf_opt`]: Self::get_cf_opt
239    pub fn get_cf<K: AsRef<[u8]>>(
240        &self,
241        cf: &impl AsColumnFamilyRef,
242        key: K,
243    ) -> Result<Option<Vec<u8>>, Error> {
244        self.get_cf_opt(cf, key, &ReadOptions::default())
245    }
246
247    pub fn get_pinned_cf<K: AsRef<[u8]>>(
248        &self,
249        cf: &impl AsColumnFamilyRef,
250        key: K,
251    ) -> Result<Option<DBPinnableSlice>, Error> {
252        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
253    }
254
255    /// Get the key and ensure that this transaction will only
256    /// be able to be committed if this key is not written outside this
257    /// transaction after it has first been read (or after the snapshot if a
258    /// snapshot is set in this transaction).
259    ///
260    /// See [`get_for_update_cf_opt`] for details.
261    ///
262    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
263    pub fn get_for_update<K: AsRef<[u8]>>(
264        &self,
265        key: K,
266        exclusive: bool,
267    ) -> Result<Option<Vec<u8>>, Error> {
268        self.get_for_update_opt(key, exclusive, &ReadOptions::default())
269    }
270
271    pub fn get_pinned_for_update<K: AsRef<[u8]>>(
272        &self,
273        key: K,
274        exclusive: bool,
275    ) -> Result<Option<DBPinnableSlice>, Error> {
276        self.get_pinned_for_update_opt(key, exclusive, &ReadOptions::default())
277    }
278
279    /// Get the key in the given column family and ensure that this transaction will only
280    /// be able to be committed if this key is not written outside this
281    /// transaction after it has first been read (or after the snapshot if a
282    /// snapshot is set in this transaction).
283    ///
284    /// See [`get_for_update_cf_opt`] for details.
285    ///
286    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
287    pub fn get_for_update_cf<K: AsRef<[u8]>>(
288        &self,
289        cf: &impl AsColumnFamilyRef,
290        key: K,
291        exclusive: bool,
292    ) -> Result<Option<Vec<u8>>, Error> {
293        self.get_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
294    }
295
296    pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
297        &self,
298        cf: &impl AsColumnFamilyRef,
299        key: K,
300        exclusive: bool,
301    ) -> Result<Option<DBPinnableSlice>, Error> {
302        self.get_pinned_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
303    }
304
305    /// Returns the bytes associated with a key value with read options.
306    ///
307    /// See [`get_cf_opt`] for details.
308    ///
309    /// [`get_cf_opt`]: Self::get_cf_opt
310    pub fn get_opt<K: AsRef<[u8]>>(
311        &self,
312        key: K,
313        readopts: &ReadOptions,
314    ) -> Result<Option<Vec<u8>>, Error> {
315        self.get_pinned_opt(key, readopts)
316            .map(|x| x.map(|v| v.as_ref().to_vec()))
317    }
318
319    pub fn get_pinned_opt<K: AsRef<[u8]>>(
320        &self,
321        key: K,
322        readopts: &ReadOptions,
323    ) -> Result<Option<DBPinnableSlice>, Error> {
324        let key = key.as_ref();
325        unsafe {
326            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
327                self.inner,
328                readopts.inner,
329                key.as_ptr() as *const c_char,
330                key.len(),
331            ));
332            if val.is_null() {
333                Ok(None)
334            } else {
335                Ok(Some(DBPinnableSlice::from_c(val)))
336            }
337        }
338    }
339
340    /// Get the bytes associated with a key value and the given column family with read options.
341    ///
342    /// This function will also read pending changes in this transaction.
343    /// Currently, this function will return an error of the [`MergeInProgress`] kind
344    /// if the most recent write to the queried key in this batch is a Merge.
345    ///
346    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
347    pub fn get_cf_opt<K: AsRef<[u8]>>(
348        &self,
349        cf: &impl AsColumnFamilyRef,
350        key: K,
351        readopts: &ReadOptions,
352    ) -> Result<Option<Vec<u8>>, Error> {
353        self.get_pinned_cf_opt(cf, key, readopts)
354            .map(|x| x.map(|v| v.as_ref().to_vec()))
355    }
356
357    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
358        &self,
359        cf: &impl AsColumnFamilyRef,
360        key: K,
361        readopts: &ReadOptions,
362    ) -> Result<Option<DBPinnableSlice>, Error> {
363        let key = key.as_ref();
364        unsafe {
365            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
366                self.inner,
367                readopts.inner,
368                cf.inner(),
369                key.as_ptr() as *const c_char,
370                key.len(),
371            ));
372            if val.is_null() {
373                Ok(None)
374            } else {
375                Ok(Some(DBPinnableSlice::from_c(val)))
376            }
377        }
378    }
379
380    /// Get the key with read options and ensure that this transaction will only
381    /// be able to be committed if this key is not written outside this
382    /// transaction after it has first been read (or after the snapshot if a
383    /// snapshot is set in this transaction).
384    ///
385    /// See [`get_for_update_cf_opt`] for details.
386    ///
387    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
388    pub fn get_for_update_opt<K: AsRef<[u8]>>(
389        &self,
390        key: K,
391        exclusive: bool,
392        opts: &ReadOptions,
393    ) -> Result<Option<Vec<u8>>, Error> {
394        self.get_pinned_for_update_opt(key, exclusive, opts)
395            .map(|x| x.map(|v| v.as_ref().to_vec()))
396    }
397
398    pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
399        &self,
400        key: K,
401        exclusive: bool,
402        opts: &ReadOptions,
403    ) -> Result<Option<DBPinnableSlice>, Error> {
404        let key = key.as_ref();
405        unsafe {
406            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
407                self.inner,
408                opts.inner,
409                key.as_ptr() as *const c_char,
410                key.len() as size_t,
411                u8::from(exclusive),
412            ));
413            if val.is_null() {
414                Ok(None)
415            } else {
416                Ok(Some(DBPinnableSlice::from_c(val)))
417            }
418        }
419    }
420
421    /// Get the key in the given column family with read options
422    /// and ensure that this transaction will only
423    /// be able to be committed if this key is not written outside this
424    /// transaction after it has first been read (or after the snapshot if a
425    /// snapshot is set in this transaction).
426    ///
427    /// Currently, this function will return an error of the [`MergeInProgress`]
428    /// if the most recent write to the queried key in this batch is a Merge.
429    ///
430    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
431    /// * [`Busy`] if there is a write conflict.
432    /// * [`TimedOut`] if a lock could not be acquired.
433    /// * [`TryAgain`] if the memtable history size is not large enough.
434    /// * [`MergeInProgress`] if merge operations cannot be resolved.
435    /// * or other errors if this key could not be read.
436    ///
437    /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
438    /// can cause [`commit`] to fail. Otherwise, it could return any error that could
439    /// be returned by `[DB::get]`.
440    ///
441    /// [`Busy`]: crate::ErrorKind::Busy
442    /// [`TimedOut`]: crate::ErrorKind::TimedOut
443    /// [`TryAgain`]: crate::ErrorKind::TryAgain
444    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
445    /// [`TransactionDB`]: crate::TransactionDB
446    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
447    /// [`commit`]: Self::commit
448    /// [`DB::get`]: crate::DB::get
449    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
450        &self,
451        cf: &impl AsColumnFamilyRef,
452        key: K,
453        exclusive: bool,
454        opts: &ReadOptions,
455    ) -> Result<Option<Vec<u8>>, Error> {
456        self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
457            .map(|x| x.map(|v| v.as_ref().to_vec()))
458    }
459
460    pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
461        &self,
462        cf: &impl AsColumnFamilyRef,
463        key: K,
464        exclusive: bool,
465        opts: &ReadOptions,
466    ) -> Result<Option<DBPinnableSlice>, Error> {
467        let key = key.as_ref();
468        unsafe {
469            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
470                self.inner,
471                opts.inner,
472                cf.inner(),
473                key.as_ptr() as *const c_char,
474                key.len() as size_t,
475                u8::from(exclusive),
476            ));
477            if val.is_null() {
478                Ok(None)
479            } else {
480                Ok(Some(DBPinnableSlice::from_c(val)))
481            }
482        }
483    }
484
485    /// Return the values associated with the given keys.
486    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
487    where
488        K: AsRef<[u8]>,
489        I: IntoIterator<Item = K>,
490    {
491        self.multi_get_opt(keys, &ReadOptions::default())
492    }
493
494    /// Return the values associated with the given keys using read options.
495    pub fn multi_get_opt<K, I>(
496        &self,
497        keys: I,
498        readopts: &ReadOptions,
499    ) -> Vec<Result<Option<Vec<u8>>, Error>>
500    where
501        K: AsRef<[u8]>,
502        I: IntoIterator<Item = K>,
503    {
504        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
505            .into_iter()
506            .map(|key| {
507                let key = key.as_ref();
508                (Box::from(key), key.len())
509            })
510            .unzip();
511        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
512
513        let mut values = vec![ptr::null_mut(); keys.len()];
514        let mut values_sizes = vec![0_usize; keys.len()];
515        let mut errors = vec![ptr::null_mut(); keys.len()];
516        unsafe {
517            ffi::rocksdb_transaction_multi_get(
518                self.inner,
519                readopts.inner,
520                ptr_keys.len(),
521                ptr_keys.as_ptr(),
522                keys_sizes.as_ptr(),
523                values.as_mut_ptr(),
524                values_sizes.as_mut_ptr(),
525                errors.as_mut_ptr(),
526            );
527        }
528
529        convert_values(values, values_sizes, errors)
530    }
531
532    /// Return the values associated with the given keys and column families.
533    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
534        &'a self,
535        keys: I,
536    ) -> Vec<Result<Option<Vec<u8>>, Error>>
537    where
538        K: AsRef<[u8]>,
539        I: IntoIterator<Item = (&'b W, K)>,
540        W: 'b + AsColumnFamilyRef,
541    {
542        self.multi_get_cf_opt(keys, &ReadOptions::default())
543    }
544
545    /// Return the values associated with the given keys and column families using read options.
546    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
547        &'a self,
548        keys: I,
549        readopts: &ReadOptions,
550    ) -> Vec<Result<Option<Vec<u8>>, Error>>
551    where
552        K: AsRef<[u8]>,
553        I: IntoIterator<Item = (&'b W, K)>,
554        W: 'b + AsColumnFamilyRef,
555    {
556        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
557            .into_iter()
558            .map(|(cf, key)| {
559                let key = key.as_ref();
560                ((cf, Box::from(key)), key.len())
561            })
562            .unzip();
563        let ptr_keys: Vec<_> = cfs_and_keys
564            .iter()
565            .map(|(_, k)| k.as_ptr() as *const c_char)
566            .collect();
567        let ptr_cfs: Vec<_> = cfs_and_keys
568            .iter()
569            .map(|(c, _)| c.inner().cast_const())
570            .collect();
571
572        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
573        let mut values_sizes = vec![0_usize; ptr_keys.len()];
574        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
575        unsafe {
576            ffi::rocksdb_transaction_multi_get_cf(
577                self.inner,
578                readopts.inner,
579                ptr_cfs.as_ptr(),
580                ptr_keys.len(),
581                ptr_keys.as_ptr(),
582                keys_sizes.as_ptr(),
583                values.as_mut_ptr(),
584                values_sizes.as_mut_ptr(),
585                errors.as_mut_ptr(),
586            );
587        }
588
589        convert_values(values, values_sizes, errors)
590    }
591
592    /// Put the key value in default column family and do conflict checking on the key.
593    ///
594    /// See [`put_cf`] for details.
595    ///
596    /// [`put_cf`]: Self::put_cf
597    pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
598        let key = key.as_ref();
599        let value = value.as_ref();
600        unsafe {
601            ffi_try!(ffi::rocksdb_transaction_put(
602                self.inner,
603                key.as_ptr() as *const c_char,
604                key.len() as size_t,
605                value.as_ptr() as *const c_char,
606                value.len() as size_t,
607            ));
608            Ok(())
609        }
610    }
611
612    /// Put the key value in the given column family and do conflict checking on the key.
613    ///
614    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
615    /// * [`Busy`] if there is a write conflict.
616    /// * [`TimedOut`] if a lock could not be acquired.
617    /// * [`TryAgain`] if the memtable history size is not large enough.
618    /// * [`MergeInProgress`] if merge operations cannot be resolved.
619    /// * or other errors on unexpected failures.
620    ///
621    /// [`Busy`]: crate::ErrorKind::Busy
622    /// [`TimedOut`]: crate::ErrorKind::TimedOut
623    /// [`TryAgain`]: crate::ErrorKind::TryAgain
624    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
625    /// [`TransactionDB`]: crate::TransactionDB
626    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
627    pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
628        &self,
629        cf: &impl AsColumnFamilyRef,
630        key: K,
631        value: V,
632    ) -> Result<(), Error> {
633        let key = key.as_ref();
634        let value = value.as_ref();
635        unsafe {
636            ffi_try!(ffi::rocksdb_transaction_put_cf(
637                self.inner,
638                cf.inner(),
639                key.as_ptr() as *const c_char,
640                key.len() as size_t,
641                value.as_ptr() as *const c_char,
642                value.len() as size_t,
643            ));
644            Ok(())
645        }
646    }
647
648    /// Merge value with existing value of key, and also do conflict checking on the key.
649    ///
650    /// See [`merge_cf`] for details.
651    ///
652    /// [`merge_cf`]: Self::merge_cf
653    pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
654        let key = key.as_ref();
655        let value = value.as_ref();
656        unsafe {
657            ffi_try!(ffi::rocksdb_transaction_merge(
658                self.inner,
659                key.as_ptr() as *const c_char,
660                key.len() as size_t,
661                value.as_ptr() as *const c_char,
662                value.len() as size_t
663            ));
664            Ok(())
665        }
666    }
667
668    /// Merge `value` with existing value of `key` in the given column family,
669    /// and also do conflict checking on the key.
670    ///
671    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
672    /// * [`Busy`] if there is a write conflict.
673    /// * [`TimedOut`] if a lock could not be acquired.
674    /// * [`TryAgain`] if the memtable history size is not large enough.
675    /// * [`MergeInProgress`] if merge operations cannot be resolved.
676    /// * or other errors on unexpected failures.
677    ///
678    /// [`Busy`]: crate::ErrorKind::Busy
679    /// [`TimedOut`]: crate::ErrorKind::TimedOut
680    /// [`TryAgain`]: crate::ErrorKind::TryAgain
681    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
682    /// [`TransactionDB`]: crate::TransactionDB
683    pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
684        &self,
685        cf: &impl AsColumnFamilyRef,
686        key: K,
687        value: V,
688    ) -> Result<(), Error> {
689        let key = key.as_ref();
690        let value = value.as_ref();
691        unsafe {
692            ffi_try!(ffi::rocksdb_transaction_merge_cf(
693                self.inner,
694                cf.inner(),
695                key.as_ptr() as *const c_char,
696                key.len() as size_t,
697                value.as_ptr() as *const c_char,
698                value.len() as size_t
699            ));
700            Ok(())
701        }
702    }
703
704    /// Delete the key value if it exists and do conflict checking on the key.
705    ///
706    /// See [`delete_cf`] for details.
707    ///
708    /// [`delete_cf`]: Self::delete_cf
709    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
710        let key = key.as_ref();
711        unsafe {
712            ffi_try!(ffi::rocksdb_transaction_delete(
713                self.inner,
714                key.as_ptr() as *const c_char,
715                key.len() as size_t
716            ));
717        }
718        Ok(())
719    }
720
721    /// Delete the key value in the given column family and do conflict checking.
722    ///
723    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
724    /// * [`Busy`] if there is a write conflict.
725    /// * [`TimedOut`] if a lock could not be acquired.
726    /// * [`TryAgain`] if the memtable history size is not large enough.
727    /// * [`MergeInProgress`] if merge operations cannot be resolved.
728    /// * or other errors on unexpected failures.
729    ///
730    /// [`Busy`]: crate::ErrorKind::Busy
731    /// [`TimedOut`]: crate::ErrorKind::TimedOut
732    /// [`TryAgain`]: crate::ErrorKind::TryAgain
733    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
734    /// [`TransactionDB`]: crate::TransactionDB
735    pub fn delete_cf<K: AsRef<[u8]>>(
736        &self,
737        cf: &impl AsColumnFamilyRef,
738        key: K,
739    ) -> Result<(), Error> {
740        let key = key.as_ref();
741        unsafe {
742            ffi_try!(ffi::rocksdb_transaction_delete_cf(
743                self.inner,
744                cf.inner(),
745                key.as_ptr() as *const c_char,
746                key.len() as size_t
747            ));
748        }
749        Ok(())
750    }
751
752    pub fn iterator<'a: 'b, 'b>(
753        &'a self,
754        mode: IteratorMode,
755    ) -> DBIteratorWithThreadMode<'b, Self> {
756        let readopts = ReadOptions::default();
757        self.iterator_opt(mode, readopts)
758    }
759
760    pub fn iterator_opt<'a: 'b, 'b>(
761        &'a self,
762        mode: IteratorMode,
763        readopts: ReadOptions,
764    ) -> DBIteratorWithThreadMode<'b, Self> {
765        DBIteratorWithThreadMode::new(self, readopts, mode)
766    }
767
768    /// Opens an iterator using the provided ReadOptions.
769    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
770    pub fn iterator_cf_opt<'a: 'b, 'b>(
771        &'a self,
772        cf_handle: &impl AsColumnFamilyRef,
773        readopts: ReadOptions,
774        mode: IteratorMode,
775    ) -> DBIteratorWithThreadMode<'b, Self> {
776        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
777    }
778
779    /// Opens an iterator with `set_total_order_seek` enabled.
780    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
781    /// with a Hash-based implementation.
782    pub fn full_iterator<'a: 'b, 'b>(
783        &'a self,
784        mode: IteratorMode,
785    ) -> DBIteratorWithThreadMode<'b, Self> {
786        let mut opts = ReadOptions::default();
787        opts.set_total_order_seek(true);
788        DBIteratorWithThreadMode::new(self, opts, mode)
789    }
790
791    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
792        &'a self,
793        prefix: P,
794    ) -> DBIteratorWithThreadMode<'b, Self> {
795        let mut opts = ReadOptions::default();
796        opts.set_prefix_same_as_start(true);
797        DBIteratorWithThreadMode::new(
798            self,
799            opts,
800            IteratorMode::From(prefix.as_ref(), Direction::Forward),
801        )
802    }
803
804    pub fn iterator_cf<'a: 'b, 'b>(
805        &'a self,
806        cf_handle: &impl AsColumnFamilyRef,
807        mode: IteratorMode,
808    ) -> DBIteratorWithThreadMode<'b, Self> {
809        let opts = ReadOptions::default();
810        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
811    }
812
813    pub fn full_iterator_cf<'a: 'b, 'b>(
814        &'a self,
815        cf_handle: &impl AsColumnFamilyRef,
816        mode: IteratorMode,
817    ) -> DBIteratorWithThreadMode<'b, Self> {
818        let mut opts = ReadOptions::default();
819        opts.set_total_order_seek(true);
820        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
821    }
822
823    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
824        &'a self,
825        cf_handle: &impl AsColumnFamilyRef,
826        prefix: P,
827    ) -> DBIteratorWithThreadMode<'a, Self> {
828        let mut opts = ReadOptions::default();
829        opts.set_prefix_same_as_start(true);
830        DBIteratorWithThreadMode::<'a, Self>::new_cf(
831            self,
832            cf_handle.inner(),
833            opts,
834            IteratorMode::From(prefix.as_ref(), Direction::Forward),
835        )
836    }
837
838    /// Opens a raw iterator over the database, using the default read options
839    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
840        let opts = ReadOptions::default();
841        DBRawIteratorWithThreadMode::new(self, opts)
842    }
843
844    /// Opens a raw iterator over the given column family, using the default read options
845    pub fn raw_iterator_cf<'a: 'b, 'b>(
846        &'a self,
847        cf_handle: &impl AsColumnFamilyRef,
848    ) -> DBRawIteratorWithThreadMode<'b, Self> {
849        let opts = ReadOptions::default();
850        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
851    }
852
853    /// Opens a raw iterator over the database, using the given read options
854    pub fn raw_iterator_opt<'a: 'b, 'b>(
855        &'a self,
856        readopts: ReadOptions,
857    ) -> DBRawIteratorWithThreadMode<'b, Self> {
858        DBRawIteratorWithThreadMode::new(self, readopts)
859    }
860
861    /// Opens a raw iterator over the given column family, using the given read options
862    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
863        &'a self,
864        cf_handle: &impl AsColumnFamilyRef,
865        readopts: ReadOptions,
866    ) -> DBRawIteratorWithThreadMode<'b, Self> {
867        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
868    }
869
870    pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
871        unsafe {
872            let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
873            let mut len: usize = 0;
874            let ptr = ffi::rocksdb_writebatch_wi_data(wi, ptr::from_mut(&mut len));
875            let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
876            ffi::rocksdb_free(wi as *mut c_void);
877            WriteBatchWithTransaction { inner: writebatch }
878        }
879    }
880
881    pub fn rebuild_from_writebatch(
882        &self,
883        writebatch: &WriteBatchWithTransaction<true>,
884    ) -> Result<(), Error> {
885        unsafe {
886            ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
887                self.inner,
888                writebatch.inner
889            ));
890        }
891        Ok(())
892    }
893}
894
895impl<DB> Drop for Transaction<'_, DB> {
896    fn drop(&mut self) {
897        unsafe {
898            ffi::rocksdb_transaction_destroy(self.inner);
899        }
900    }
901}