rocksdb/transactions/
transaction.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{marker::PhantomData, ptr};
17
18use crate::{
19    db::{convert_values, DBAccess},
20    ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
21    Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
22};
23use libc::{c_char, c_void, size_t};
24
25/// RocksDB Transaction.
26///
27/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
28///
29/// [`TransactionDB`]: crate::TransactionDB
30/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
31pub struct Transaction<'db, DB> {
32    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
33    pub(crate) _marker: PhantomData<&'db DB>,
34}
35
36unsafe impl<'db, DB> Send for Transaction<'db, DB> {}
37
38impl<'db, DB> DBAccess for Transaction<'db, DB> {
39    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
40        ffi::rocksdb_transaction_get_snapshot(self.inner)
41    }
42
43    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
44        ffi::rocksdb_free(snapshot as *mut c_void);
45    }
46
47    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
48        ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner)
49    }
50
51    unsafe fn create_iterator_cf(
52        &self,
53        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
54        readopts: &ReadOptions,
55    ) -> *mut ffi::rocksdb_iterator_t {
56        ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
57    }
58
59    fn get_opt<K: AsRef<[u8]>>(
60        &self,
61        key: K,
62        readopts: &ReadOptions,
63    ) -> Result<Option<Vec<u8>>, Error> {
64        self.get_opt(key, readopts)
65    }
66
67    fn get_cf_opt<K: AsRef<[u8]>>(
68        &self,
69        cf: &impl AsColumnFamilyRef,
70        key: K,
71        readopts: &ReadOptions,
72    ) -> Result<Option<Vec<u8>>, Error> {
73        self.get_cf_opt(cf, key, readopts)
74    }
75
76    fn get_pinned_opt<K: AsRef<[u8]>>(
77        &self,
78        key: K,
79        readopts: &ReadOptions,
80    ) -> Result<Option<DBPinnableSlice>, Error> {
81        self.get_pinned_opt(key, readopts)
82    }
83
84    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
85        &self,
86        cf: &impl AsColumnFamilyRef,
87        key: K,
88        readopts: &ReadOptions,
89    ) -> Result<Option<DBPinnableSlice>, Error> {
90        self.get_pinned_cf_opt(cf, key, readopts)
91    }
92
93    fn multi_get_opt<K, I>(
94        &self,
95        keys: I,
96        readopts: &ReadOptions,
97    ) -> Vec<Result<Option<Vec<u8>>, Error>>
98    where
99        K: AsRef<[u8]>,
100        I: IntoIterator<Item = K>,
101    {
102        self.multi_get_opt(keys, readopts)
103    }
104
105    fn multi_get_cf_opt<'b, K, I, W>(
106        &self,
107        keys_cf: I,
108        readopts: &ReadOptions,
109    ) -> Vec<Result<Option<Vec<u8>>, Error>>
110    where
111        K: AsRef<[u8]>,
112        I: IntoIterator<Item = (&'b W, K)>,
113        W: AsColumnFamilyRef + 'b,
114    {
115        self.multi_get_cf_opt(keys_cf, readopts)
116    }
117}
118
119impl<'db, DB> Transaction<'db, DB> {
120    /// Write all batched keys to the DB atomically.
121    ///
122    /// May return any error that could be returned by `DB::write`.
123    ///
124    /// If this transaction was created by a [`TransactionDB`], an error of
125    /// the [`Expired`] kind may be returned if this transaction has
126    /// lived longer than expiration time in [`TransactionOptions`].
127    ///
128    /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
129    /// the [`Busy`] kind may be returned if the transaction
130    /// could not guarantee that there are no write conflicts.
131    /// An error of the [`TryAgain`] kind may be returned if the memtable
132    /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
133    ///
134    /// [`Expired`]: crate::ErrorKind::Expired
135    /// [`TransactionOptions`]: crate::TransactionOptions
136    /// [`TransactionDB`]: crate::TransactionDB
137    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
138    /// [`Busy`]: crate::ErrorKind::Busy
139    /// [`TryAgain`]: crate::ErrorKind::TryAgain
140    /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
141    pub fn commit(self) -> Result<(), Error> {
142        unsafe {
143            ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
144        }
145        Ok(())
146    }
147
148    pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
149        let ptr = name.as_ptr();
150        let len = name.len();
151        unsafe {
152            ffi_try!(ffi::rocksdb_transaction_set_name(
153                self.inner, ptr as _, len as _
154            ));
155        }
156
157        Ok(())
158    }
159
160    pub fn get_name(&self) -> Option<Vec<u8>> {
161        unsafe {
162            let mut name_len = 0;
163            let name = ffi::rocksdb_transaction_get_name(self.inner, &mut name_len);
164            if name.is_null() {
165                None
166            } else {
167                let mut vec = vec![0; name_len];
168                std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
169                ffi::rocksdb_free(name as *mut c_void);
170                Some(vec)
171            }
172        }
173    }
174
175    pub fn prepare(&self) -> Result<(), Error> {
176        unsafe {
177            ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
178        }
179        Ok(())
180    }
181
182    /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
183    /// Otherwise, returns a snapshot with `nullptr` inside which doesn't effect read operations.
184    ///
185    /// [`TransactionOptions`]: crate::TransactionOptions
186    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
187        SnapshotWithThreadMode::new(self)
188    }
189
190    /// Discard all batched writes in this transaction.
191    pub fn rollback(&self) -> Result<(), Error> {
192        unsafe {
193            ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
194            Ok(())
195        }
196    }
197
198    /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
199    /// May be called multiple times to set multiple save points.
200    ///
201    /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
202    pub fn set_savepoint(&self) {
203        unsafe {
204            ffi::rocksdb_transaction_set_savepoint(self.inner);
205        }
206    }
207
208    /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
209    /// and removes the most recent [`set_savepoint`].
210    ///
211    /// Returns error if there is no previous call to [`set_savepoint`].
212    ///
213    /// [`set_savepoint`]: Self::set_savepoint
214    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
215        unsafe {
216            ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
217            Ok(())
218        }
219    }
220
221    /// Get the bytes associated with a key value.
222    ///
223    /// See [`get_cf_opt`] for details.
224    ///
225    /// [`get_cf_opt`]: Self::get_cf_opt
226    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
227        self.get_opt(key, &ReadOptions::default())
228    }
229
230    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
231        self.get_pinned_opt(key, &ReadOptions::default())
232    }
233
234    /// Get the bytes associated with a key value and the given column family.
235    ///
236    /// See [`get_cf_opt`] for details.
237    ///
238    /// [`get_cf_opt`]: Self::get_cf_opt
239    pub fn get_cf<K: AsRef<[u8]>>(
240        &self,
241        cf: &impl AsColumnFamilyRef,
242        key: K,
243    ) -> Result<Option<Vec<u8>>, Error> {
244        self.get_cf_opt(cf, key, &ReadOptions::default())
245    }
246
247    pub fn get_pinned_cf<K: AsRef<[u8]>>(
248        &self,
249        cf: &impl AsColumnFamilyRef,
250        key: K,
251    ) -> Result<Option<DBPinnableSlice>, Error> {
252        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
253    }
254
255    /// Get the key and ensure that this transaction will only
256    /// be able to be committed if this key is not written outside this
257    /// transaction after it has first been read (or after the snapshot if a
258    /// snapshot is set in this transaction).
259    ///
260    /// See [`get_for_update_cf_opt`] for details.
261    ///
262    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
263    pub fn get_for_update<K: AsRef<[u8]>>(
264        &self,
265        key: K,
266        exclusive: bool,
267    ) -> Result<Option<Vec<u8>>, Error> {
268        self.get_for_update_opt(key, exclusive, &ReadOptions::default())
269    }
270
271    pub fn get_pinned_for_update<K: AsRef<[u8]>>(
272        &self,
273        key: K,
274        exclusive: bool,
275    ) -> Result<Option<DBPinnableSlice>, Error> {
276        self.get_pinned_for_update_opt(key, exclusive, &ReadOptions::default())
277    }
278
279    /// Get the key in the given column family and ensure that this transaction will only
280    /// be able to be committed if this key is not written outside this
281    /// transaction after it has first been read (or after the snapshot if a
282    /// snapshot is set in this transaction).
283    ///
284    /// See [`get_for_update_cf_opt`] for details.
285    ///
286    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
287    pub fn get_for_update_cf<K: AsRef<[u8]>>(
288        &self,
289        cf: &impl AsColumnFamilyRef,
290        key: K,
291        exclusive: bool,
292    ) -> Result<Option<Vec<u8>>, Error> {
293        self.get_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
294    }
295
296    pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
297        &self,
298        cf: &impl AsColumnFamilyRef,
299        key: K,
300        exclusive: bool,
301    ) -> Result<Option<DBPinnableSlice>, Error> {
302        self.get_pinned_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
303    }
304
305    /// Returns the bytes associated with a key value with read options.
306    ///
307    /// See [`get_cf_opt`] for details.
308    ///
309    /// [`get_cf_opt`]: Self::get_cf_opt
310    pub fn get_opt<K: AsRef<[u8]>>(
311        &self,
312        key: K,
313        readopts: &ReadOptions,
314    ) -> Result<Option<Vec<u8>>, Error> {
315        self.get_pinned_opt(key, readopts)
316            .map(|x| x.map(|v| v.as_ref().to_vec()))
317    }
318
319    pub fn get_pinned_opt<K: AsRef<[u8]>>(
320        &self,
321        key: K,
322        readopts: &ReadOptions,
323    ) -> Result<Option<DBPinnableSlice>, Error> {
324        unsafe {
325            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
326                self.inner,
327                readopts.inner,
328                key.as_ref().as_ptr() as *const c_char,
329                key.as_ref().len(),
330            ));
331            if val.is_null() {
332                Ok(None)
333            } else {
334                Ok(Some(DBPinnableSlice::from_c(val)))
335            }
336        }
337    }
338
339    /// Get the bytes associated with a key value and the given column family with read options.
340    ///
341    /// This function will also read pending changes in this transaction.
342    /// Currently, this function will return an error of the [`MergeInProgress`] kind
343    /// if the most recent write to the queried key in this batch is a Merge.
344    ///
345    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
346    pub fn get_cf_opt<K: AsRef<[u8]>>(
347        &self,
348        cf: &impl AsColumnFamilyRef,
349        key: K,
350        readopts: &ReadOptions,
351    ) -> Result<Option<Vec<u8>>, Error> {
352        self.get_pinned_cf_opt(cf, key, readopts)
353            .map(|x| x.map(|v| v.as_ref().to_vec()))
354    }
355
356    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
357        &self,
358        cf: &impl AsColumnFamilyRef,
359        key: K,
360        readopts: &ReadOptions,
361    ) -> Result<Option<DBPinnableSlice>, Error> {
362        unsafe {
363            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
364                self.inner,
365                readopts.inner,
366                cf.inner(),
367                key.as_ref().as_ptr() as *const c_char,
368                key.as_ref().len(),
369            ));
370            if val.is_null() {
371                Ok(None)
372            } else {
373                Ok(Some(DBPinnableSlice::from_c(val)))
374            }
375        }
376    }
377
378    /// Get the key with read options and ensure that this transaction will only
379    /// be able to be committed if this key is not written outside this
380    /// transaction after it has first been read (or after the snapshot if a
381    /// snapshot is set in this transaction).
382    ///
383    /// See [`get_for_update_cf_opt`] for details.
384    ///
385    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
386    pub fn get_for_update_opt<K: AsRef<[u8]>>(
387        &self,
388        key: K,
389        exclusive: bool,
390        opts: &ReadOptions,
391    ) -> Result<Option<Vec<u8>>, Error> {
392        self.get_pinned_for_update_opt(key, exclusive, opts)
393            .map(|x| x.map(|v| v.as_ref().to_vec()))
394    }
395
396    pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
397        &self,
398        key: K,
399        exclusive: bool,
400        opts: &ReadOptions,
401    ) -> Result<Option<DBPinnableSlice>, Error> {
402        unsafe {
403            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
404                self.inner,
405                opts.inner,
406                key.as_ref().as_ptr() as *const c_char,
407                key.as_ref().len() as size_t,
408                u8::from(exclusive),
409            ));
410            if val.is_null() {
411                Ok(None)
412            } else {
413                Ok(Some(DBPinnableSlice::from_c(val)))
414            }
415        }
416    }
417
418    /// Get the key in the given column family with read options
419    /// and ensure that this transaction will only
420    /// be able to be committed if this key is not written outside this
421    /// transaction after it has first been read (or after the snapshot if a
422    /// snapshot is set in this transaction).
423    ///
424    /// Currently, this function will return an error of the [`MergeInProgress`]
425    /// if the most recent write to the queried key in this batch is a Merge.
426    ///
427    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
428    /// * [`Busy`] if there is a write conflict.
429    /// * [`TimedOut`] if a lock could not be acquired.
430    /// * [`TryAgain`] if the memtable history size is not large enough.
431    /// * [`MergeInProgress`] if merge operations cannot be resolved.
432    /// * or other errors if this key could not be read.
433    ///
434    /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
435    /// can cause [`commit`] to fail. Otherwise, it could return any error that could
436    /// be returned by `[DB::get]`.
437    ///
438    /// [`Busy`]: crate::ErrorKind::Busy
439    /// [`TimedOut`]: crate::ErrorKind::TimedOut
440    /// [`TryAgain`]: crate::ErrorKind::TryAgain
441    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
442    /// [`TransactionDB`]: crate::TransactionDB
443    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
444    /// [`commit`]: Self::commit
445    /// [`DB::get`]: crate::DB::get
446    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
447        &self,
448        cf: &impl AsColumnFamilyRef,
449        key: K,
450        exclusive: bool,
451        opts: &ReadOptions,
452    ) -> Result<Option<Vec<u8>>, Error> {
453        self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
454            .map(|x| x.map(|v| v.as_ref().to_vec()))
455    }
456
457    pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
458        &self,
459        cf: &impl AsColumnFamilyRef,
460        key: K,
461        exclusive: bool,
462        opts: &ReadOptions,
463    ) -> Result<Option<DBPinnableSlice>, Error> {
464        unsafe {
465            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
466                self.inner,
467                opts.inner,
468                cf.inner(),
469                key.as_ref().as_ptr() as *const c_char,
470                key.as_ref().len() as size_t,
471                u8::from(exclusive),
472            ));
473            if val.is_null() {
474                Ok(None)
475            } else {
476                Ok(Some(DBPinnableSlice::from_c(val)))
477            }
478        }
479    }
480
481    /// Return the values associated with the given keys.
482    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
483    where
484        K: AsRef<[u8]>,
485        I: IntoIterator<Item = K>,
486    {
487        self.multi_get_opt(keys, &ReadOptions::default())
488    }
489
490    /// Return the values associated with the given keys using read options.
491    pub fn multi_get_opt<K, I>(
492        &self,
493        keys: I,
494        readopts: &ReadOptions,
495    ) -> Vec<Result<Option<Vec<u8>>, Error>>
496    where
497        K: AsRef<[u8]>,
498        I: IntoIterator<Item = K>,
499    {
500        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
501            .into_iter()
502            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
503            .unzip();
504        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
505
506        let mut values = vec![ptr::null_mut(); keys.len()];
507        let mut values_sizes = vec![0_usize; keys.len()];
508        let mut errors = vec![ptr::null_mut(); keys.len()];
509        unsafe {
510            ffi::rocksdb_transaction_multi_get(
511                self.inner,
512                readopts.inner,
513                ptr_keys.len(),
514                ptr_keys.as_ptr(),
515                keys_sizes.as_ptr(),
516                values.as_mut_ptr(),
517                values_sizes.as_mut_ptr(),
518                errors.as_mut_ptr(),
519            );
520        }
521
522        convert_values(values, values_sizes, errors)
523    }
524
525    /// Return the values associated with the given keys and column families.
526    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
527        &'a self,
528        keys: I,
529    ) -> Vec<Result<Option<Vec<u8>>, Error>>
530    where
531        K: AsRef<[u8]>,
532        I: IntoIterator<Item = (&'b W, K)>,
533        W: 'b + AsColumnFamilyRef,
534    {
535        self.multi_get_cf_opt(keys, &ReadOptions::default())
536    }
537
538    /// Return the values associated with the given keys and column families using read options.
539    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
540        &'a self,
541        keys: I,
542        readopts: &ReadOptions,
543    ) -> Vec<Result<Option<Vec<u8>>, Error>>
544    where
545        K: AsRef<[u8]>,
546        I: IntoIterator<Item = (&'b W, K)>,
547        W: 'b + AsColumnFamilyRef,
548    {
549        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
550            .into_iter()
551            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
552            .unzip();
553        let ptr_keys: Vec<_> = cfs_and_keys
554            .iter()
555            .map(|(_, k)| k.as_ptr() as *const c_char)
556            .collect();
557        let ptr_cfs: Vec<_> = cfs_and_keys
558            .iter()
559            .map(|(c, _)| c.inner() as *const _)
560            .collect();
561
562        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
563        let mut values_sizes = vec![0_usize; ptr_keys.len()];
564        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
565        unsafe {
566            ffi::rocksdb_transaction_multi_get_cf(
567                self.inner,
568                readopts.inner,
569                ptr_cfs.as_ptr(),
570                ptr_keys.len(),
571                ptr_keys.as_ptr(),
572                keys_sizes.as_ptr(),
573                values.as_mut_ptr(),
574                values_sizes.as_mut_ptr(),
575                errors.as_mut_ptr(),
576            );
577        }
578
579        convert_values(values, values_sizes, errors)
580    }
581
582    /// Put the key value in default column family and do conflict checking on the key.
583    ///
584    /// See [`put_cf`] for details.
585    ///
586    /// [`put_cf`]: Self::put_cf
587    pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
588        unsafe {
589            ffi_try!(ffi::rocksdb_transaction_put(
590                self.inner,
591                key.as_ref().as_ptr() as *const c_char,
592                key.as_ref().len() as size_t,
593                value.as_ref().as_ptr() as *const c_char,
594                value.as_ref().len() as size_t,
595            ));
596            Ok(())
597        }
598    }
599
600    /// Put the key value in the given column famuly and do conflict checking on the key.
601    ///
602    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
603    /// * [`Busy`] if there is a write conflict.
604    /// * [`TimedOut`] if a lock could not be acquired.
605    /// * [`TryAgain`] if the memtable history size is not large enough.
606    /// * [`MergeInProgress`] if merge operations cannot be resolved.
607    /// * or other errors on unexpected failures.
608    ///
609    /// [`Busy`]: crate::ErrorKind::Busy
610    /// [`TimedOut`]: crate::ErrorKind::TimedOut
611    /// [`TryAgain`]: crate::ErrorKind::TryAgain
612    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
613    /// [`TransactionDB`]: crate::TransactionDB
614    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
615    pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
616        &self,
617        cf: &impl AsColumnFamilyRef,
618        key: K,
619        value: V,
620    ) -> Result<(), Error> {
621        unsafe {
622            ffi_try!(ffi::rocksdb_transaction_put_cf(
623                self.inner,
624                cf.inner(),
625                key.as_ref().as_ptr() as *const c_char,
626                key.as_ref().len() as size_t,
627                value.as_ref().as_ptr() as *const c_char,
628                value.as_ref().len() as size_t,
629            ));
630            Ok(())
631        }
632    }
633
634    /// Merge value with existing value of key, and also do conflict checking on the key.
635    ///
636    /// See [`merge_cf`] for details.
637    ///
638    /// [`merge_cf`]: Self::merge_cf
639    pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
640        unsafe {
641            ffi_try!(ffi::rocksdb_transaction_merge(
642                self.inner,
643                key.as_ref().as_ptr() as *const c_char,
644                key.as_ref().len() as size_t,
645                value.as_ref().as_ptr() as *const c_char,
646                value.as_ref().len() as size_t
647            ));
648            Ok(())
649        }
650    }
651
652    /// Merge `value` with existing value of `key` in the given column family,
653    /// and also do conflict checking on the key.
654    ///
655    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
656    /// * [`Busy`] if there is a write conflict.
657    /// * [`TimedOut`] if a lock could not be acquired.
658    /// * [`TryAgain`] if the memtable history size is not large enough.
659    /// * [`MergeInProgress`] if merge operations cannot be resolved.
660    /// * or other errors on unexpected failures.
661    ///
662    /// [`Busy`]: crate::ErrorKind::Busy
663    /// [`TimedOut`]: crate::ErrorKind::TimedOut
664    /// [`TryAgain`]: crate::ErrorKind::TryAgain
665    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
666    /// [`TransactionDB`]: crate::TransactionDB
667    pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
668        &self,
669        cf: &impl AsColumnFamilyRef,
670        key: K,
671        value: V,
672    ) -> Result<(), Error> {
673        unsafe {
674            ffi_try!(ffi::rocksdb_transaction_merge_cf(
675                self.inner,
676                cf.inner(),
677                key.as_ref().as_ptr() as *const c_char,
678                key.as_ref().len() as size_t,
679                value.as_ref().as_ptr() as *const c_char,
680                value.as_ref().len() as size_t
681            ));
682            Ok(())
683        }
684    }
685
686    /// Delete the key value if it exists and do conflict checking on the key.
687    ///
688    /// See [`delete_cf`] for details.
689    ///
690    /// [`delete_cf`]: Self::delete_cf
691    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
692        unsafe {
693            ffi_try!(ffi::rocksdb_transaction_delete(
694                self.inner,
695                key.as_ref().as_ptr() as *const c_char,
696                key.as_ref().len() as size_t
697            ));
698        }
699        Ok(())
700    }
701
702    /// Delete the key value in the given column family and do conflict checking.
703    ///
704    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
705    /// * [`Busy`] if there is a write conflict.
706    /// * [`TimedOut`] if a lock could not be acquired.
707    /// * [`TryAgain`] if the memtable history size is not large enough.
708    /// * [`MergeInProgress`] if merge operations cannot be resolved.
709    /// * or other errors on unexpected failures.
710    ///
711    /// [`Busy`]: crate::ErrorKind::Busy
712    /// [`TimedOut`]: crate::ErrorKind::TimedOut
713    /// [`TryAgain`]: crate::ErrorKind::TryAgain
714    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
715    /// [`TransactionDB`]: crate::TransactionDB
716    pub fn delete_cf<K: AsRef<[u8]>>(
717        &self,
718        cf: &impl AsColumnFamilyRef,
719        key: K,
720    ) -> Result<(), Error> {
721        unsafe {
722            ffi_try!(ffi::rocksdb_transaction_delete_cf(
723                self.inner,
724                cf.inner(),
725                key.as_ref().as_ptr() as *const c_char,
726                key.as_ref().len() as size_t
727            ));
728        }
729        Ok(())
730    }
731
732    pub fn iterator<'a: 'b, 'b>(
733        &'a self,
734        mode: IteratorMode,
735    ) -> DBIteratorWithThreadMode<'b, Self> {
736        let readopts = ReadOptions::default();
737        self.iterator_opt(mode, readopts)
738    }
739
740    pub fn iterator_opt<'a: 'b, 'b>(
741        &'a self,
742        mode: IteratorMode,
743        readopts: ReadOptions,
744    ) -> DBIteratorWithThreadMode<'b, Self> {
745        DBIteratorWithThreadMode::new(self, readopts, mode)
746    }
747
748    /// Opens an iterator using the provided ReadOptions.
749    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
750    pub fn iterator_cf_opt<'a: 'b, 'b>(
751        &'a self,
752        cf_handle: &impl AsColumnFamilyRef,
753        readopts: ReadOptions,
754        mode: IteratorMode,
755    ) -> DBIteratorWithThreadMode<'b, Self> {
756        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
757    }
758
759    /// Opens an iterator with `set_total_order_seek` enabled.
760    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
761    /// with a Hash-based implementation.
762    pub fn full_iterator<'a: 'b, 'b>(
763        &'a self,
764        mode: IteratorMode,
765    ) -> DBIteratorWithThreadMode<'b, Self> {
766        let mut opts = ReadOptions::default();
767        opts.set_total_order_seek(true);
768        DBIteratorWithThreadMode::new(self, opts, mode)
769    }
770
771    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
772        &'a self,
773        prefix: P,
774    ) -> DBIteratorWithThreadMode<'b, Self> {
775        let mut opts = ReadOptions::default();
776        opts.set_prefix_same_as_start(true);
777        DBIteratorWithThreadMode::new(
778            self,
779            opts,
780            IteratorMode::From(prefix.as_ref(), Direction::Forward),
781        )
782    }
783
784    pub fn iterator_cf<'a: 'b, 'b>(
785        &'a self,
786        cf_handle: &impl AsColumnFamilyRef,
787        mode: IteratorMode,
788    ) -> DBIteratorWithThreadMode<'b, Self> {
789        let opts = ReadOptions::default();
790        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
791    }
792
793    pub fn full_iterator_cf<'a: 'b, 'b>(
794        &'a self,
795        cf_handle: &impl AsColumnFamilyRef,
796        mode: IteratorMode,
797    ) -> DBIteratorWithThreadMode<'b, Self> {
798        let mut opts = ReadOptions::default();
799        opts.set_total_order_seek(true);
800        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
801    }
802
803    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
804        &'a self,
805        cf_handle: &impl AsColumnFamilyRef,
806        prefix: P,
807    ) -> DBIteratorWithThreadMode<'a, Self> {
808        let mut opts = ReadOptions::default();
809        opts.set_prefix_same_as_start(true);
810        DBIteratorWithThreadMode::<'a, Self>::new_cf(
811            self,
812            cf_handle.inner(),
813            opts,
814            IteratorMode::From(prefix.as_ref(), Direction::Forward),
815        )
816    }
817
818    /// Opens a raw iterator over the database, using the default read options
819    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
820        let opts = ReadOptions::default();
821        DBRawIteratorWithThreadMode::new(self, opts)
822    }
823
824    /// Opens a raw iterator over the given column family, using the default read options
825    pub fn raw_iterator_cf<'a: 'b, 'b>(
826        &'a self,
827        cf_handle: &impl AsColumnFamilyRef,
828    ) -> DBRawIteratorWithThreadMode<'b, Self> {
829        let opts = ReadOptions::default();
830        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
831    }
832
833    /// Opens a raw iterator over the database, using the given read options
834    pub fn raw_iterator_opt<'a: 'b, 'b>(
835        &'a self,
836        readopts: ReadOptions,
837    ) -> DBRawIteratorWithThreadMode<'b, Self> {
838        DBRawIteratorWithThreadMode::new(self, readopts)
839    }
840
841    /// Opens a raw iterator over the given column family, using the given read options
842    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
843        &'a self,
844        cf_handle: &impl AsColumnFamilyRef,
845        readopts: ReadOptions,
846    ) -> DBRawIteratorWithThreadMode<'b, Self> {
847        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
848    }
849
850    pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
851        unsafe {
852            let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
853            let mut len: usize = 0;
854            let ptr = ffi::rocksdb_writebatch_wi_data(wi, &mut len as _);
855            let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
856            ffi::rocksdb_free(wi as *mut c_void);
857            WriteBatchWithTransaction { inner: writebatch }
858        }
859    }
860
861    pub fn rebuild_from_writebatch(
862        &self,
863        writebatch: &WriteBatchWithTransaction<true>,
864    ) -> Result<(), Error> {
865        unsafe {
866            ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
867                self.inner,
868                writebatch.inner
869            ));
870        }
871        Ok(())
872    }
873}
874
875impl<'db, DB> Drop for Transaction<'db, DB> {
876    fn drop(&mut self) {
877        unsafe {
878            ffi::rocksdb_transaction_destroy(self.inner);
879        }
880    }
881}