1use std::{marker::PhantomData, ptr};
17
18use crate::{
19 db::{convert_values, DBAccess},
20 ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
21 Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
22};
23use libc::{c_char, c_void, size_t};
24
25pub struct Transaction<'db, DB> {
32 pub(crate) inner: *mut ffi::rocksdb_transaction_t,
33 pub(crate) _marker: PhantomData<&'db DB>,
34}
35
36unsafe impl<'db, DB> Send for Transaction<'db, DB> {}
37
38impl<'db, DB> DBAccess for Transaction<'db, DB> {
39 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
40 ffi::rocksdb_transaction_get_snapshot(self.inner)
41 }
42
43 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
44 ffi::rocksdb_free(snapshot as *mut c_void);
45 }
46
47 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
48 ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner)
49 }
50
51 unsafe fn create_iterator_cf(
52 &self,
53 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
54 readopts: &ReadOptions,
55 ) -> *mut ffi::rocksdb_iterator_t {
56 ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
57 }
58
59 fn get_opt<K: AsRef<[u8]>>(
60 &self,
61 key: K,
62 readopts: &ReadOptions,
63 ) -> Result<Option<Vec<u8>>, Error> {
64 self.get_opt(key, readopts)
65 }
66
67 fn get_cf_opt<K: AsRef<[u8]>>(
68 &self,
69 cf: &impl AsColumnFamilyRef,
70 key: K,
71 readopts: &ReadOptions,
72 ) -> Result<Option<Vec<u8>>, Error> {
73 self.get_cf_opt(cf, key, readopts)
74 }
75
76 fn get_pinned_opt<K: AsRef<[u8]>>(
77 &self,
78 key: K,
79 readopts: &ReadOptions,
80 ) -> Result<Option<DBPinnableSlice>, Error> {
81 self.get_pinned_opt(key, readopts)
82 }
83
84 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
85 &self,
86 cf: &impl AsColumnFamilyRef,
87 key: K,
88 readopts: &ReadOptions,
89 ) -> Result<Option<DBPinnableSlice>, Error> {
90 self.get_pinned_cf_opt(cf, key, readopts)
91 }
92
93 fn multi_get_opt<K, I>(
94 &self,
95 keys: I,
96 readopts: &ReadOptions,
97 ) -> Vec<Result<Option<Vec<u8>>, Error>>
98 where
99 K: AsRef<[u8]>,
100 I: IntoIterator<Item = K>,
101 {
102 self.multi_get_opt(keys, readopts)
103 }
104
105 fn multi_get_cf_opt<'b, K, I, W>(
106 &self,
107 keys_cf: I,
108 readopts: &ReadOptions,
109 ) -> Vec<Result<Option<Vec<u8>>, Error>>
110 where
111 K: AsRef<[u8]>,
112 I: IntoIterator<Item = (&'b W, K)>,
113 W: AsColumnFamilyRef + 'b,
114 {
115 self.multi_get_cf_opt(keys_cf, readopts)
116 }
117}
118
119impl<'db, DB> Transaction<'db, DB> {
120 pub fn commit(self) -> Result<(), Error> {
142 unsafe {
143 ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
144 }
145 Ok(())
146 }
147
148 pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
149 let ptr = name.as_ptr();
150 let len = name.len();
151 unsafe {
152 ffi_try!(ffi::rocksdb_transaction_set_name(
153 self.inner, ptr as _, len as _
154 ));
155 }
156
157 Ok(())
158 }
159
160 pub fn get_name(&self) -> Option<Vec<u8>> {
161 unsafe {
162 let mut name_len = 0;
163 let name = ffi::rocksdb_transaction_get_name(self.inner, &mut name_len);
164 if name.is_null() {
165 None
166 } else {
167 let mut vec = vec![0; name_len];
168 std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
169 ffi::rocksdb_free(name as *mut c_void);
170 Some(vec)
171 }
172 }
173 }
174
175 pub fn prepare(&self) -> Result<(), Error> {
176 unsafe {
177 ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
178 }
179 Ok(())
180 }
181
182 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
187 SnapshotWithThreadMode::new(self)
188 }
189
190 pub fn rollback(&self) -> Result<(), Error> {
192 unsafe {
193 ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
194 Ok(())
195 }
196 }
197
198 pub fn set_savepoint(&self) {
203 unsafe {
204 ffi::rocksdb_transaction_set_savepoint(self.inner);
205 }
206 }
207
208 pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
215 unsafe {
216 ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
217 Ok(())
218 }
219 }
220
221 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
227 self.get_opt(key, &ReadOptions::default())
228 }
229
230 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
231 self.get_pinned_opt(key, &ReadOptions::default())
232 }
233
234 pub fn get_cf<K: AsRef<[u8]>>(
240 &self,
241 cf: &impl AsColumnFamilyRef,
242 key: K,
243 ) -> Result<Option<Vec<u8>>, Error> {
244 self.get_cf_opt(cf, key, &ReadOptions::default())
245 }
246
247 pub fn get_pinned_cf<K: AsRef<[u8]>>(
248 &self,
249 cf: &impl AsColumnFamilyRef,
250 key: K,
251 ) -> Result<Option<DBPinnableSlice>, Error> {
252 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
253 }
254
255 pub fn get_for_update<K: AsRef<[u8]>>(
264 &self,
265 key: K,
266 exclusive: bool,
267 ) -> Result<Option<Vec<u8>>, Error> {
268 self.get_for_update_opt(key, exclusive, &ReadOptions::default())
269 }
270
271 pub fn get_pinned_for_update<K: AsRef<[u8]>>(
272 &self,
273 key: K,
274 exclusive: bool,
275 ) -> Result<Option<DBPinnableSlice>, Error> {
276 self.get_pinned_for_update_opt(key, exclusive, &ReadOptions::default())
277 }
278
279 pub fn get_for_update_cf<K: AsRef<[u8]>>(
288 &self,
289 cf: &impl AsColumnFamilyRef,
290 key: K,
291 exclusive: bool,
292 ) -> Result<Option<Vec<u8>>, Error> {
293 self.get_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
294 }
295
296 pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
297 &self,
298 cf: &impl AsColumnFamilyRef,
299 key: K,
300 exclusive: bool,
301 ) -> Result<Option<DBPinnableSlice>, Error> {
302 self.get_pinned_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
303 }
304
305 pub fn get_opt<K: AsRef<[u8]>>(
311 &self,
312 key: K,
313 readopts: &ReadOptions,
314 ) -> Result<Option<Vec<u8>>, Error> {
315 self.get_pinned_opt(key, readopts)
316 .map(|x| x.map(|v| v.as_ref().to_vec()))
317 }
318
319 pub fn get_pinned_opt<K: AsRef<[u8]>>(
320 &self,
321 key: K,
322 readopts: &ReadOptions,
323 ) -> Result<Option<DBPinnableSlice>, Error> {
324 unsafe {
325 let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
326 self.inner,
327 readopts.inner,
328 key.as_ref().as_ptr() as *const c_char,
329 key.as_ref().len(),
330 ));
331 if val.is_null() {
332 Ok(None)
333 } else {
334 Ok(Some(DBPinnableSlice::from_c(val)))
335 }
336 }
337 }
338
339 pub fn get_cf_opt<K: AsRef<[u8]>>(
347 &self,
348 cf: &impl AsColumnFamilyRef,
349 key: K,
350 readopts: &ReadOptions,
351 ) -> Result<Option<Vec<u8>>, Error> {
352 self.get_pinned_cf_opt(cf, key, readopts)
353 .map(|x| x.map(|v| v.as_ref().to_vec()))
354 }
355
356 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
357 &self,
358 cf: &impl AsColumnFamilyRef,
359 key: K,
360 readopts: &ReadOptions,
361 ) -> Result<Option<DBPinnableSlice>, Error> {
362 unsafe {
363 let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
364 self.inner,
365 readopts.inner,
366 cf.inner(),
367 key.as_ref().as_ptr() as *const c_char,
368 key.as_ref().len(),
369 ));
370 if val.is_null() {
371 Ok(None)
372 } else {
373 Ok(Some(DBPinnableSlice::from_c(val)))
374 }
375 }
376 }
377
378 pub fn get_for_update_opt<K: AsRef<[u8]>>(
387 &self,
388 key: K,
389 exclusive: bool,
390 opts: &ReadOptions,
391 ) -> Result<Option<Vec<u8>>, Error> {
392 self.get_pinned_for_update_opt(key, exclusive, opts)
393 .map(|x| x.map(|v| v.as_ref().to_vec()))
394 }
395
396 pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
397 &self,
398 key: K,
399 exclusive: bool,
400 opts: &ReadOptions,
401 ) -> Result<Option<DBPinnableSlice>, Error> {
402 unsafe {
403 let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
404 self.inner,
405 opts.inner,
406 key.as_ref().as_ptr() as *const c_char,
407 key.as_ref().len() as size_t,
408 u8::from(exclusive),
409 ));
410 if val.is_null() {
411 Ok(None)
412 } else {
413 Ok(Some(DBPinnableSlice::from_c(val)))
414 }
415 }
416 }
417
418 pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
447 &self,
448 cf: &impl AsColumnFamilyRef,
449 key: K,
450 exclusive: bool,
451 opts: &ReadOptions,
452 ) -> Result<Option<Vec<u8>>, Error> {
453 self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
454 .map(|x| x.map(|v| v.as_ref().to_vec()))
455 }
456
457 pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
458 &self,
459 cf: &impl AsColumnFamilyRef,
460 key: K,
461 exclusive: bool,
462 opts: &ReadOptions,
463 ) -> Result<Option<DBPinnableSlice>, Error> {
464 unsafe {
465 let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
466 self.inner,
467 opts.inner,
468 cf.inner(),
469 key.as_ref().as_ptr() as *const c_char,
470 key.as_ref().len() as size_t,
471 u8::from(exclusive),
472 ));
473 if val.is_null() {
474 Ok(None)
475 } else {
476 Ok(Some(DBPinnableSlice::from_c(val)))
477 }
478 }
479 }
480
481 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
483 where
484 K: AsRef<[u8]>,
485 I: IntoIterator<Item = K>,
486 {
487 self.multi_get_opt(keys, &ReadOptions::default())
488 }
489
490 pub fn multi_get_opt<K, I>(
492 &self,
493 keys: I,
494 readopts: &ReadOptions,
495 ) -> Vec<Result<Option<Vec<u8>>, Error>>
496 where
497 K: AsRef<[u8]>,
498 I: IntoIterator<Item = K>,
499 {
500 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
501 .into_iter()
502 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
503 .unzip();
504 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
505
506 let mut values = vec![ptr::null_mut(); keys.len()];
507 let mut values_sizes = vec![0_usize; keys.len()];
508 let mut errors = vec![ptr::null_mut(); keys.len()];
509 unsafe {
510 ffi::rocksdb_transaction_multi_get(
511 self.inner,
512 readopts.inner,
513 ptr_keys.len(),
514 ptr_keys.as_ptr(),
515 keys_sizes.as_ptr(),
516 values.as_mut_ptr(),
517 values_sizes.as_mut_ptr(),
518 errors.as_mut_ptr(),
519 );
520 }
521
522 convert_values(values, values_sizes, errors)
523 }
524
525 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
527 &'a self,
528 keys: I,
529 ) -> Vec<Result<Option<Vec<u8>>, Error>>
530 where
531 K: AsRef<[u8]>,
532 I: IntoIterator<Item = (&'b W, K)>,
533 W: 'b + AsColumnFamilyRef,
534 {
535 self.multi_get_cf_opt(keys, &ReadOptions::default())
536 }
537
538 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
540 &'a self,
541 keys: I,
542 readopts: &ReadOptions,
543 ) -> Vec<Result<Option<Vec<u8>>, Error>>
544 where
545 K: AsRef<[u8]>,
546 I: IntoIterator<Item = (&'b W, K)>,
547 W: 'b + AsColumnFamilyRef,
548 {
549 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
550 .into_iter()
551 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
552 .unzip();
553 let ptr_keys: Vec<_> = cfs_and_keys
554 .iter()
555 .map(|(_, k)| k.as_ptr() as *const c_char)
556 .collect();
557 let ptr_cfs: Vec<_> = cfs_and_keys
558 .iter()
559 .map(|(c, _)| c.inner() as *const _)
560 .collect();
561
562 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
563 let mut values_sizes = vec![0_usize; ptr_keys.len()];
564 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
565 unsafe {
566 ffi::rocksdb_transaction_multi_get_cf(
567 self.inner,
568 readopts.inner,
569 ptr_cfs.as_ptr(),
570 ptr_keys.len(),
571 ptr_keys.as_ptr(),
572 keys_sizes.as_ptr(),
573 values.as_mut_ptr(),
574 values_sizes.as_mut_ptr(),
575 errors.as_mut_ptr(),
576 );
577 }
578
579 convert_values(values, values_sizes, errors)
580 }
581
582 pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
588 unsafe {
589 ffi_try!(ffi::rocksdb_transaction_put(
590 self.inner,
591 key.as_ref().as_ptr() as *const c_char,
592 key.as_ref().len() as size_t,
593 value.as_ref().as_ptr() as *const c_char,
594 value.as_ref().len() as size_t,
595 ));
596 Ok(())
597 }
598 }
599
600 pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
616 &self,
617 cf: &impl AsColumnFamilyRef,
618 key: K,
619 value: V,
620 ) -> Result<(), Error> {
621 unsafe {
622 ffi_try!(ffi::rocksdb_transaction_put_cf(
623 self.inner,
624 cf.inner(),
625 key.as_ref().as_ptr() as *const c_char,
626 key.as_ref().len() as size_t,
627 value.as_ref().as_ptr() as *const c_char,
628 value.as_ref().len() as size_t,
629 ));
630 Ok(())
631 }
632 }
633
634 pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
640 unsafe {
641 ffi_try!(ffi::rocksdb_transaction_merge(
642 self.inner,
643 key.as_ref().as_ptr() as *const c_char,
644 key.as_ref().len() as size_t,
645 value.as_ref().as_ptr() as *const c_char,
646 value.as_ref().len() as size_t
647 ));
648 Ok(())
649 }
650 }
651
652 pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
668 &self,
669 cf: &impl AsColumnFamilyRef,
670 key: K,
671 value: V,
672 ) -> Result<(), Error> {
673 unsafe {
674 ffi_try!(ffi::rocksdb_transaction_merge_cf(
675 self.inner,
676 cf.inner(),
677 key.as_ref().as_ptr() as *const c_char,
678 key.as_ref().len() as size_t,
679 value.as_ref().as_ptr() as *const c_char,
680 value.as_ref().len() as size_t
681 ));
682 Ok(())
683 }
684 }
685
686 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
692 unsafe {
693 ffi_try!(ffi::rocksdb_transaction_delete(
694 self.inner,
695 key.as_ref().as_ptr() as *const c_char,
696 key.as_ref().len() as size_t
697 ));
698 }
699 Ok(())
700 }
701
702 pub fn delete_cf<K: AsRef<[u8]>>(
717 &self,
718 cf: &impl AsColumnFamilyRef,
719 key: K,
720 ) -> Result<(), Error> {
721 unsafe {
722 ffi_try!(ffi::rocksdb_transaction_delete_cf(
723 self.inner,
724 cf.inner(),
725 key.as_ref().as_ptr() as *const c_char,
726 key.as_ref().len() as size_t
727 ));
728 }
729 Ok(())
730 }
731
732 pub fn iterator<'a: 'b, 'b>(
733 &'a self,
734 mode: IteratorMode,
735 ) -> DBIteratorWithThreadMode<'b, Self> {
736 let readopts = ReadOptions::default();
737 self.iterator_opt(mode, readopts)
738 }
739
740 pub fn iterator_opt<'a: 'b, 'b>(
741 &'a self,
742 mode: IteratorMode,
743 readopts: ReadOptions,
744 ) -> DBIteratorWithThreadMode<'b, Self> {
745 DBIteratorWithThreadMode::new(self, readopts, mode)
746 }
747
748 pub fn iterator_cf_opt<'a: 'b, 'b>(
751 &'a self,
752 cf_handle: &impl AsColumnFamilyRef,
753 readopts: ReadOptions,
754 mode: IteratorMode,
755 ) -> DBIteratorWithThreadMode<'b, Self> {
756 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
757 }
758
759 pub fn full_iterator<'a: 'b, 'b>(
763 &'a self,
764 mode: IteratorMode,
765 ) -> DBIteratorWithThreadMode<'b, Self> {
766 let mut opts = ReadOptions::default();
767 opts.set_total_order_seek(true);
768 DBIteratorWithThreadMode::new(self, opts, mode)
769 }
770
771 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
772 &'a self,
773 prefix: P,
774 ) -> DBIteratorWithThreadMode<'b, Self> {
775 let mut opts = ReadOptions::default();
776 opts.set_prefix_same_as_start(true);
777 DBIteratorWithThreadMode::new(
778 self,
779 opts,
780 IteratorMode::From(prefix.as_ref(), Direction::Forward),
781 )
782 }
783
784 pub fn iterator_cf<'a: 'b, 'b>(
785 &'a self,
786 cf_handle: &impl AsColumnFamilyRef,
787 mode: IteratorMode,
788 ) -> DBIteratorWithThreadMode<'b, Self> {
789 let opts = ReadOptions::default();
790 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
791 }
792
793 pub fn full_iterator_cf<'a: 'b, 'b>(
794 &'a self,
795 cf_handle: &impl AsColumnFamilyRef,
796 mode: IteratorMode,
797 ) -> DBIteratorWithThreadMode<'b, Self> {
798 let mut opts = ReadOptions::default();
799 opts.set_total_order_seek(true);
800 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
801 }
802
803 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
804 &'a self,
805 cf_handle: &impl AsColumnFamilyRef,
806 prefix: P,
807 ) -> DBIteratorWithThreadMode<'a, Self> {
808 let mut opts = ReadOptions::default();
809 opts.set_prefix_same_as_start(true);
810 DBIteratorWithThreadMode::<'a, Self>::new_cf(
811 self,
812 cf_handle.inner(),
813 opts,
814 IteratorMode::From(prefix.as_ref(), Direction::Forward),
815 )
816 }
817
818 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
820 let opts = ReadOptions::default();
821 DBRawIteratorWithThreadMode::new(self, opts)
822 }
823
824 pub fn raw_iterator_cf<'a: 'b, 'b>(
826 &'a self,
827 cf_handle: &impl AsColumnFamilyRef,
828 ) -> DBRawIteratorWithThreadMode<'b, Self> {
829 let opts = ReadOptions::default();
830 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
831 }
832
833 pub fn raw_iterator_opt<'a: 'b, 'b>(
835 &'a self,
836 readopts: ReadOptions,
837 ) -> DBRawIteratorWithThreadMode<'b, Self> {
838 DBRawIteratorWithThreadMode::new(self, readopts)
839 }
840
841 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
843 &'a self,
844 cf_handle: &impl AsColumnFamilyRef,
845 readopts: ReadOptions,
846 ) -> DBRawIteratorWithThreadMode<'b, Self> {
847 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
848 }
849
850 pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
851 unsafe {
852 let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
853 let mut len: usize = 0;
854 let ptr = ffi::rocksdb_writebatch_wi_data(wi, &mut len as _);
855 let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
856 ffi::rocksdb_free(wi as *mut c_void);
857 WriteBatchWithTransaction { inner: writebatch }
858 }
859 }
860
861 pub fn rebuild_from_writebatch(
862 &self,
863 writebatch: &WriteBatchWithTransaction<true>,
864 ) -> Result<(), Error> {
865 unsafe {
866 ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
867 self.inner,
868 writebatch.inner
869 ));
870 }
871 Ok(())
872 }
873}
874
875impl<'db, DB> Drop for Transaction<'db, DB> {
876 fn drop(&mut self) {
877 unsafe {
878 ffi::rocksdb_transaction_destroy(self.inner);
879 }
880 }
881}