1use std::{
17 collections::BTreeMap,
18 ffi::CString,
19 fs, iter,
20 marker::PhantomData,
21 path::{Path, PathBuf},
22 ptr,
23 sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31 column_family::UnboundColumnFamily,
32 db::{convert_values, DBAccess},
33 db_options::OptionsMustOutliveDB,
34 ffi,
35 ffi_util::to_cpath,
36 AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
37 DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
38 IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
39 ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
40 WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_void, size_t};
44
45#[cfg(not(feature = "multi-threaded-cf"))]
46type DefaultThreadMode = crate::SingleThreaded;
47#[cfg(feature = "multi-threaded-cf")]
48type DefaultThreadMode = crate::MultiThreaded;
49
50pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
80 pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
81 cfs: T,
82 path: PathBuf,
83 prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
85 _outlive: Vec<OptionsMustOutliveDB>,
86}
87
88unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
89unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
90
91impl<T: ThreadMode> DBAccess for TransactionDB<T> {
92 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
93 ffi::rocksdb_transactiondb_create_snapshot(self.inner)
94 }
95
96 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
97 ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
98 }
99
100 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
101 ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
102 }
103
104 unsafe fn create_iterator_cf(
105 &self,
106 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
107 readopts: &ReadOptions,
108 ) -> *mut ffi::rocksdb_iterator_t {
109 ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
110 }
111
112 fn get_opt<K: AsRef<[u8]>>(
113 &self,
114 key: K,
115 readopts: &ReadOptions,
116 ) -> Result<Option<Vec<u8>>, Error> {
117 self.get_opt(key, readopts)
118 }
119
120 fn get_cf_opt<K: AsRef<[u8]>>(
121 &self,
122 cf: &impl AsColumnFamilyRef,
123 key: K,
124 readopts: &ReadOptions,
125 ) -> Result<Option<Vec<u8>>, Error> {
126 self.get_cf_opt(cf, key, readopts)
127 }
128
129 fn get_pinned_opt<K: AsRef<[u8]>>(
130 &self,
131 key: K,
132 readopts: &ReadOptions,
133 ) -> Result<Option<DBPinnableSlice>, Error> {
134 self.get_pinned_opt(key, readopts)
135 }
136
137 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
138 &self,
139 cf: &impl AsColumnFamilyRef,
140 key: K,
141 readopts: &ReadOptions,
142 ) -> Result<Option<DBPinnableSlice>, Error> {
143 self.get_pinned_cf_opt(cf, key, readopts)
144 }
145
146 fn multi_get_opt<K, I>(
147 &self,
148 keys: I,
149 readopts: &ReadOptions,
150 ) -> Vec<Result<Option<Vec<u8>>, Error>>
151 where
152 K: AsRef<[u8]>,
153 I: IntoIterator<Item = K>,
154 {
155 self.multi_get_opt(keys, readopts)
156 }
157
158 fn multi_get_cf_opt<'b, K, I, W>(
159 &self,
160 keys_cf: I,
161 readopts: &ReadOptions,
162 ) -> Vec<Result<Option<Vec<u8>>, Error>>
163 where
164 K: AsRef<[u8]>,
165 I: IntoIterator<Item = (&'b W, K)>,
166 W: AsColumnFamilyRef + 'b,
167 {
168 self.multi_get_cf_opt(keys_cf, readopts)
169 }
170}
171
172impl<T: ThreadMode> TransactionDB<T> {
173 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
175 let mut opts = Options::default();
176 opts.create_if_missing(true);
177 let txn_db_opts = TransactionDBOptions::default();
178 Self::open(&opts, &txn_db_opts, path)
179 }
180
181 pub fn open<P: AsRef<Path>>(
183 opts: &Options,
184 txn_db_opts: &TransactionDBOptions,
185 path: P,
186 ) -> Result<Self, Error> {
187 Self::open_cf(opts, txn_db_opts, path, None::<&str>)
188 }
189
190 pub fn open_cf<P, I, N>(
194 opts: &Options,
195 txn_db_opts: &TransactionDBOptions,
196 path: P,
197 cfs: I,
198 ) -> Result<Self, Error>
199 where
200 P: AsRef<Path>,
201 I: IntoIterator<Item = N>,
202 N: AsRef<str>,
203 {
204 let cfs = cfs
205 .into_iter()
206 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
207
208 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
209 }
210
211 pub fn open_cf_descriptors<P, I>(
213 opts: &Options,
214 txn_db_opts: &TransactionDBOptions,
215 path: P,
216 cfs: I,
217 ) -> Result<Self, Error>
218 where
219 P: AsRef<Path>,
220 I: IntoIterator<Item = ColumnFamilyDescriptor>,
221 {
222 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
223 }
224
225 fn open_cf_descriptors_internal<P, I>(
227 opts: &Options,
228 txn_db_opts: &TransactionDBOptions,
229 path: P,
230 cfs: I,
231 ) -> Result<Self, Error>
232 where
233 P: AsRef<Path>,
234 I: IntoIterator<Item = ColumnFamilyDescriptor>,
235 {
236 let cfs: Vec<_> = cfs.into_iter().collect();
237 let outlive = iter::once(opts.outlive.clone())
238 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
239 .collect();
240
241 let cpath = to_cpath(&path)?;
242
243 if let Err(e) = fs::create_dir_all(&path) {
244 return Err(Error::new(format!(
245 "Failed to create RocksDB directory: `{e:?}`."
246 )));
247 }
248
249 let db: *mut ffi::rocksdb_transactiondb_t;
250 let mut cf_map = BTreeMap::new();
251
252 if cfs.is_empty() {
253 db = Self::open_raw(opts, txn_db_opts, &cpath)?;
254 } else {
255 let mut cfs_v = cfs;
256 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
258 cfs_v.push(ColumnFamilyDescriptor {
259 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
260 options: Options::default(),
261 ttl: ColumnFamilyTtl::SameAsDb, });
263 }
264 let c_cfs: Vec<CString> = cfs_v
267 .iter()
268 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
269 .collect();
270
271 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
272
273 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
275
276 let cfopts: Vec<_> = cfs_v
277 .iter()
278 .map(|cf| cf.options.inner.cast_const())
279 .collect();
280
281 db = Self::open_cf_raw(
282 opts,
283 txn_db_opts,
284 &cpath,
285 &cfs_v,
286 &cfnames,
287 &cfopts,
288 &mut cfhandles,
289 )?;
290
291 for handle in &cfhandles {
292 if handle.is_null() {
293 return Err(Error::new(
294 "Received null column family handle from DB.".to_owned(),
295 ));
296 }
297 }
298
299 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
300 cf_map.insert(cf_desc.name.clone(), inner);
301 }
302 }
303
304 if db.is_null() {
305 return Err(Error::new("Could not initialize database.".to_owned()));
306 }
307
308 let prepared = unsafe {
309 let mut cnt = 0;
310 let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
311 let mut vec = vec![std::ptr::null_mut(); cnt];
312 if !ptr.is_null() {
313 std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
314 ffi::rocksdb_free(ptr as *mut c_void);
315 }
316 vec
317 };
318
319 Ok(TransactionDB {
320 inner: db,
321 cfs: T::new_cf_map_internal(cf_map),
322 path: path.as_ref().to_path_buf(),
323 prepared: Mutex::new(prepared),
324 _outlive: outlive,
325 })
326 }
327
328 fn open_raw(
329 opts: &Options,
330 txn_db_opts: &TransactionDBOptions,
331 cpath: &CString,
332 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
333 unsafe {
334 let db = ffi_try!(ffi::rocksdb_transactiondb_open(
335 opts.inner,
336 txn_db_opts.inner,
337 cpath.as_ptr()
338 ));
339 Ok(db)
340 }
341 }
342
343 fn open_cf_raw(
344 opts: &Options,
345 txn_db_opts: &TransactionDBOptions,
346 cpath: &CString,
347 cfs_v: &[ColumnFamilyDescriptor],
348 cfnames: &[*const c_char],
349 cfopts: &[*const ffi::rocksdb_options_t],
350 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
351 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
352 unsafe {
353 let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
354 opts.inner,
355 txn_db_opts.inner,
356 cpath.as_ptr(),
357 cfs_v.len() as c_int,
358 cfnames.as_ptr(),
359 cfopts.as_ptr(),
360 cfhandles.as_mut_ptr(),
361 ));
362 Ok(db)
363 }
364 }
365
366 fn create_inner_cf_handle(
367 &self,
368 name: &str,
369 opts: &Options,
370 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
371 let cf_name = CString::new(name.as_bytes()).map_err(|_| {
372 Error::new("Failed to convert path to CString when creating cf".to_owned())
373 })?;
374
375 Ok(unsafe {
376 ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
377 self.inner,
378 opts.inner,
379 cf_name.as_ptr(),
380 ))
381 })
382 }
383
384 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
385 DB::list_cf(opts, path)
386 }
387
388 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
389 DB::destroy(opts, path)
390 }
391
392 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
393 DB::repair(opts, path)
394 }
395
396 pub fn path(&self) -> &Path {
397 self.path.as_path()
398 }
399
400 pub fn transaction(&self) -> Transaction<Self> {
402 self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
403 }
404
405 pub fn transaction_opt<'a>(
407 &'a self,
408 write_opts: &WriteOptions,
409 txn_opts: &TransactionOptions,
410 ) -> Transaction<'a, Self> {
411 Transaction {
412 inner: unsafe {
413 ffi::rocksdb_transaction_begin(
414 self.inner,
415 write_opts.inner,
416 txn_opts.inner,
417 std::ptr::null_mut(),
418 )
419 },
420 _marker: PhantomData,
421 }
422 }
423
424 pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
429 self.prepared
430 .lock()
431 .unwrap()
432 .drain(0..)
433 .map(|inner| Transaction {
434 inner,
435 _marker: PhantomData,
436 })
437 .collect()
438 }
439
440 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
442 self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
443 }
444
445 pub fn get_cf<K: AsRef<[u8]>>(
447 &self,
448 cf: &impl AsColumnFamilyRef,
449 key: K,
450 ) -> Result<Option<Vec<u8>>, Error> {
451 self.get_pinned_cf(cf, key)
452 .map(|x| x.map(|v| v.as_ref().to_vec()))
453 }
454
455 pub fn get_opt<K: AsRef<[u8]>>(
457 &self,
458 key: K,
459 readopts: &ReadOptions,
460 ) -> Result<Option<Vec<u8>>, Error> {
461 self.get_pinned_opt(key, readopts)
462 .map(|x| x.map(|v| v.as_ref().to_vec()))
463 }
464
465 pub fn get_cf_opt<K: AsRef<[u8]>>(
467 &self,
468 cf: &impl AsColumnFamilyRef,
469 key: K,
470 readopts: &ReadOptions,
471 ) -> Result<Option<Vec<u8>>, Error> {
472 self.get_pinned_cf_opt(cf, key, readopts)
473 .map(|x| x.map(|v| v.as_ref().to_vec()))
474 }
475
476 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
477 self.get_pinned_opt(key, &ReadOptions::default())
478 }
479
480 pub fn get_pinned_cf<K: AsRef<[u8]>>(
482 &self,
483 cf: &impl AsColumnFamilyRef,
484 key: K,
485 ) -> Result<Option<DBPinnableSlice>, Error> {
486 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
487 }
488
489 pub fn get_pinned_opt<K: AsRef<[u8]>>(
491 &self,
492 key: K,
493 readopts: &ReadOptions,
494 ) -> Result<Option<DBPinnableSlice>, Error> {
495 let key = key.as_ref();
496 unsafe {
497 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
498 self.inner,
499 readopts.inner,
500 key.as_ptr() as *const c_char,
501 key.len() as size_t,
502 ));
503 if val.is_null() {
504 Ok(None)
505 } else {
506 Ok(Some(DBPinnableSlice::from_c(val)))
507 }
508 }
509 }
510
511 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
513 &self,
514 cf: &impl AsColumnFamilyRef,
515 key: K,
516 readopts: &ReadOptions,
517 ) -> Result<Option<DBPinnableSlice>, Error> {
518 let key = key.as_ref();
519 unsafe {
520 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
521 self.inner,
522 readopts.inner,
523 cf.inner(),
524 key.as_ptr() as *const c_char,
525 key.len() as size_t,
526 ));
527 if val.is_null() {
528 Ok(None)
529 } else {
530 Ok(Some(DBPinnableSlice::from_c(val)))
531 }
532 }
533 }
534
535 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
537 where
538 K: AsRef<[u8]>,
539 I: IntoIterator<Item = K>,
540 {
541 self.multi_get_opt(keys, &ReadOptions::default())
542 }
543
544 pub fn multi_get_opt<K, I>(
546 &self,
547 keys: I,
548 readopts: &ReadOptions,
549 ) -> Vec<Result<Option<Vec<u8>>, Error>>
550 where
551 K: AsRef<[u8]>,
552 I: IntoIterator<Item = K>,
553 {
554 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
555 .into_iter()
556 .map(|key| {
557 let key = key.as_ref();
558 (Box::from(key), key.len())
559 })
560 .unzip();
561 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
562
563 let mut values = vec![ptr::null_mut(); keys.len()];
564 let mut values_sizes = vec![0_usize; keys.len()];
565 let mut errors = vec![ptr::null_mut(); keys.len()];
566 unsafe {
567 ffi::rocksdb_transactiondb_multi_get(
568 self.inner,
569 readopts.inner,
570 ptr_keys.len(),
571 ptr_keys.as_ptr(),
572 keys_sizes.as_ptr(),
573 values.as_mut_ptr(),
574 values_sizes.as_mut_ptr(),
575 errors.as_mut_ptr(),
576 );
577 }
578
579 convert_values(values, values_sizes, errors)
580 }
581
582 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
584 &'a self,
585 keys: I,
586 ) -> Vec<Result<Option<Vec<u8>>, Error>>
587 where
588 K: AsRef<[u8]>,
589 I: IntoIterator<Item = (&'b W, K)>,
590 W: 'b + AsColumnFamilyRef,
591 {
592 self.multi_get_cf_opt(keys, &ReadOptions::default())
593 }
594
595 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
597 &'a self,
598 keys: I,
599 readopts: &ReadOptions,
600 ) -> Vec<Result<Option<Vec<u8>>, Error>>
601 where
602 K: AsRef<[u8]>,
603 I: IntoIterator<Item = (&'b W, K)>,
604 W: 'b + AsColumnFamilyRef,
605 {
606 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
607 .into_iter()
608 .map(|(cf, key)| {
609 let key = key.as_ref();
610 ((cf, Box::from(key)), key.len())
611 })
612 .unzip();
613 let ptr_keys: Vec<_> = cfs_and_keys
614 .iter()
615 .map(|(_, k)| k.as_ptr() as *const c_char)
616 .collect();
617 let ptr_cfs: Vec<_> = cfs_and_keys
618 .iter()
619 .map(|(c, _)| c.inner().cast_const())
620 .collect();
621
622 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
623 let mut values_sizes = vec![0_usize; ptr_keys.len()];
624 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
625 unsafe {
626 ffi::rocksdb_transactiondb_multi_get_cf(
627 self.inner,
628 readopts.inner,
629 ptr_cfs.as_ptr(),
630 ptr_keys.len(),
631 ptr_keys.as_ptr(),
632 keys_sizes.as_ptr(),
633 values.as_mut_ptr(),
634 values_sizes.as_mut_ptr(),
635 errors.as_mut_ptr(),
636 );
637 }
638
639 convert_values(values, values_sizes, errors)
640 }
641
642 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
643 where
644 K: AsRef<[u8]>,
645 V: AsRef<[u8]>,
646 {
647 self.put_opt(key, value, &WriteOptions::default())
648 }
649
650 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
651 where
652 K: AsRef<[u8]>,
653 V: AsRef<[u8]>,
654 {
655 self.put_cf_opt(cf, key, value, &WriteOptions::default())
656 }
657
658 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
659 where
660 K: AsRef<[u8]>,
661 V: AsRef<[u8]>,
662 {
663 let key = key.as_ref();
664 let value = value.as_ref();
665 unsafe {
666 ffi_try!(ffi::rocksdb_transactiondb_put(
667 self.inner,
668 writeopts.inner,
669 key.as_ptr() as *const c_char,
670 key.len() as size_t,
671 value.as_ptr() as *const c_char,
672 value.len() as size_t
673 ));
674 }
675 Ok(())
676 }
677
678 pub fn put_cf_opt<K, V>(
679 &self,
680 cf: &impl AsColumnFamilyRef,
681 key: K,
682 value: V,
683 writeopts: &WriteOptions,
684 ) -> Result<(), Error>
685 where
686 K: AsRef<[u8]>,
687 V: AsRef<[u8]>,
688 {
689 let key = key.as_ref();
690 let value = value.as_ref();
691 unsafe {
692 ffi_try!(ffi::rocksdb_transactiondb_put_cf(
693 self.inner,
694 writeopts.inner,
695 cf.inner(),
696 key.as_ptr() as *const c_char,
697 key.len() as size_t,
698 value.as_ptr() as *const c_char,
699 value.len() as size_t
700 ));
701 }
702 Ok(())
703 }
704
705 pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
706 self.write_opt(batch, &WriteOptions::default())
707 }
708
709 pub fn write_opt(
710 &self,
711 batch: WriteBatchWithTransaction<true>,
712 writeopts: &WriteOptions,
713 ) -> Result<(), Error> {
714 unsafe {
715 ffi_try!(ffi::rocksdb_transactiondb_write(
716 self.inner,
717 writeopts.inner,
718 batch.inner
719 ));
720 }
721 Ok(())
722 }
723
724 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
725 where
726 K: AsRef<[u8]>,
727 V: AsRef<[u8]>,
728 {
729 self.merge_opt(key, value, &WriteOptions::default())
730 }
731
732 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
733 where
734 K: AsRef<[u8]>,
735 V: AsRef<[u8]>,
736 {
737 self.merge_cf_opt(cf, key, value, &WriteOptions::default())
738 }
739
740 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
741 where
742 K: AsRef<[u8]>,
743 V: AsRef<[u8]>,
744 {
745 let key = key.as_ref();
746 let value = value.as_ref();
747 unsafe {
748 ffi_try!(ffi::rocksdb_transactiondb_merge(
749 self.inner,
750 writeopts.inner,
751 key.as_ptr() as *const c_char,
752 key.len() as size_t,
753 value.as_ptr() as *const c_char,
754 value.len() as size_t,
755 ));
756 Ok(())
757 }
758 }
759
760 pub fn merge_cf_opt<K, V>(
761 &self,
762 cf: &impl AsColumnFamilyRef,
763 key: K,
764 value: V,
765 writeopts: &WriteOptions,
766 ) -> Result<(), Error>
767 where
768 K: AsRef<[u8]>,
769 V: AsRef<[u8]>,
770 {
771 let key = key.as_ref();
772 let value = value.as_ref();
773 unsafe {
774 ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
775 self.inner,
776 writeopts.inner,
777 cf.inner(),
778 key.as_ptr() as *const c_char,
779 key.len() as size_t,
780 value.as_ptr() as *const c_char,
781 value.len() as size_t,
782 ));
783 Ok(())
784 }
785 }
786
787 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
788 self.delete_opt(key, &WriteOptions::default())
789 }
790
791 pub fn delete_cf<K: AsRef<[u8]>>(
792 &self,
793 cf: &impl AsColumnFamilyRef,
794 key: K,
795 ) -> Result<(), Error> {
796 self.delete_cf_opt(cf, key, &WriteOptions::default())
797 }
798
799 pub fn delete_opt<K: AsRef<[u8]>>(
800 &self,
801 key: K,
802 writeopts: &WriteOptions,
803 ) -> Result<(), Error> {
804 let key = key.as_ref();
805 unsafe {
806 ffi_try!(ffi::rocksdb_transactiondb_delete(
807 self.inner,
808 writeopts.inner,
809 key.as_ptr() as *const c_char,
810 key.len() as size_t,
811 ));
812 }
813 Ok(())
814 }
815
816 pub fn delete_cf_opt<K: AsRef<[u8]>>(
817 &self,
818 cf: &impl AsColumnFamilyRef,
819 key: K,
820 writeopts: &WriteOptions,
821 ) -> Result<(), Error> {
822 let key = key.as_ref();
823 unsafe {
824 ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
825 self.inner,
826 writeopts.inner,
827 cf.inner(),
828 key.as_ptr() as *const c_char,
829 key.len() as size_t,
830 ));
831 }
832 Ok(())
833 }
834
835 pub fn iterator<'a: 'b, 'b>(
836 &'a self,
837 mode: IteratorMode,
838 ) -> DBIteratorWithThreadMode<'b, Self> {
839 let readopts = ReadOptions::default();
840 self.iterator_opt(mode, readopts)
841 }
842
843 pub fn iterator_opt<'a: 'b, 'b>(
844 &'a self,
845 mode: IteratorMode,
846 readopts: ReadOptions,
847 ) -> DBIteratorWithThreadMode<'b, Self> {
848 DBIteratorWithThreadMode::new(self, readopts, mode)
849 }
850
851 pub fn iterator_cf_opt<'a: 'b, 'b>(
854 &'a self,
855 cf_handle: &impl AsColumnFamilyRef,
856 readopts: ReadOptions,
857 mode: IteratorMode,
858 ) -> DBIteratorWithThreadMode<'b, Self> {
859 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
860 }
861
862 pub fn full_iterator<'a: 'b, 'b>(
866 &'a self,
867 mode: IteratorMode,
868 ) -> DBIteratorWithThreadMode<'b, Self> {
869 let mut opts = ReadOptions::default();
870 opts.set_total_order_seek(true);
871 DBIteratorWithThreadMode::new(self, opts, mode)
872 }
873
874 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
875 &'a self,
876 prefix: P,
877 ) -> DBIteratorWithThreadMode<'b, Self> {
878 let mut opts = ReadOptions::default();
879 opts.set_prefix_same_as_start(true);
880 DBIteratorWithThreadMode::new(
881 self,
882 opts,
883 IteratorMode::From(prefix.as_ref(), Direction::Forward),
884 )
885 }
886
887 pub fn iterator_cf<'a: 'b, 'b>(
888 &'a self,
889 cf_handle: &impl AsColumnFamilyRef,
890 mode: IteratorMode,
891 ) -> DBIteratorWithThreadMode<'b, Self> {
892 let opts = ReadOptions::default();
893 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
894 }
895
896 pub fn full_iterator_cf<'a: 'b, 'b>(
897 &'a self,
898 cf_handle: &impl AsColumnFamilyRef,
899 mode: IteratorMode,
900 ) -> DBIteratorWithThreadMode<'b, Self> {
901 let mut opts = ReadOptions::default();
902 opts.set_total_order_seek(true);
903 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
904 }
905
906 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
907 &'a self,
908 cf_handle: &impl AsColumnFamilyRef,
909 prefix: P,
910 ) -> DBIteratorWithThreadMode<'a, Self> {
911 let mut opts = ReadOptions::default();
912 opts.set_prefix_same_as_start(true);
913 DBIteratorWithThreadMode::<'a, Self>::new_cf(
914 self,
915 cf_handle.inner(),
916 opts,
917 IteratorMode::From(prefix.as_ref(), Direction::Forward),
918 )
919 }
920
921 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
923 let opts = ReadOptions::default();
924 DBRawIteratorWithThreadMode::new(self, opts)
925 }
926
927 pub fn raw_iterator_cf<'a: 'b, 'b>(
929 &'a self,
930 cf_handle: &impl AsColumnFamilyRef,
931 ) -> DBRawIteratorWithThreadMode<'b, Self> {
932 let opts = ReadOptions::default();
933 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
934 }
935
936 pub fn raw_iterator_opt<'a: 'b, 'b>(
938 &'a self,
939 readopts: ReadOptions,
940 ) -> DBRawIteratorWithThreadMode<'b, Self> {
941 DBRawIteratorWithThreadMode::new(self, readopts)
942 }
943
944 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
946 &'a self,
947 cf_handle: &impl AsColumnFamilyRef,
948 readopts: ReadOptions,
949 ) -> DBRawIteratorWithThreadMode<'b, Self> {
950 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
951 }
952
953 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
954 SnapshotWithThreadMode::<Self>::new(self)
955 }
956
957 fn drop_column_family<C>(
958 &self,
959 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
960 _cf: C,
961 ) -> Result<(), Error> {
962 unsafe {
963 ffi_try!(ffi::rocksdb_drop_column_family(
965 self.inner as *mut ffi::rocksdb_t,
966 cf_inner
967 ));
968 }
969 Ok(())
972 }
973}
974
975impl TransactionDB<SingleThreaded> {
976 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
978 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
979 self.cfs
980 .cfs
981 .insert(name.as_ref().to_string(), ColumnFamily { inner });
982 Ok(())
983 }
984
985 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
987 self.cfs.cfs.get(name)
988 }
989
990 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
992 if let Some(cf) = self.cfs.cfs.remove(name) {
993 self.drop_column_family(cf.inner, cf)
994 } else {
995 Err(Error::new(format!("Invalid column family: {name}")))
996 }
997 }
998}
999
1000impl TransactionDB<MultiThreaded> {
1001 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1003 let mut cfs = self.cfs.cfs.write().unwrap();
1006 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1007 cfs.insert(
1008 name.as_ref().to_string(),
1009 Arc::new(UnboundColumnFamily { inner }),
1010 );
1011 Ok(())
1012 }
1013
1014 pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
1016 self.cfs
1017 .cfs
1018 .read()
1019 .unwrap()
1020 .get(name)
1021 .cloned()
1022 .map(UnboundColumnFamily::bound_column_family)
1023 }
1024
1025 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1028 if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
1029 self.drop_column_family(cf.inner, cf)
1030 } else {
1031 Err(Error::new(format!("Invalid column family: {name}")))
1032 }
1033 }
1034
1035 fn property_value_impl<R>(
1044 name: impl CStrLike,
1045 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1046 parse: impl FnOnce(&str) -> Result<R, Error>,
1047 ) -> Result<Option<R>, Error> {
1048 let value = match name.bake() {
1049 Ok(prop_name) => get_property(prop_name.as_ptr()),
1050 Err(e) => {
1051 return Err(Error::new(format!(
1052 "Failed to convert property name to CString: {e}"
1053 )));
1054 }
1055 };
1056 if value.is_null() {
1057 return Ok(None);
1058 }
1059 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1060 Ok(s) => parse(s).map(|value| Some(value)),
1061 Err(e) => Err(Error::new(format!(
1062 "Failed to convert property value to string: {e}"
1063 ))),
1064 };
1065 unsafe {
1066 ffi::rocksdb_free(value as *mut c_void);
1067 }
1068 result
1069 }
1070
1071 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1076 Self::property_value_impl(
1077 name,
1078 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1079 |str_value| Ok(str_value.to_owned()),
1080 )
1081 }
1082
1083 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1084 value.parse::<u64>().map_err(|err| {
1085 Error::new(format!(
1086 "Failed to convert property value {value} to int: {err}"
1087 ))
1088 })
1089 }
1090
1091 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1096 Self::property_value_impl(
1097 name,
1098 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1099 Self::parse_property_int_value,
1100 )
1101 }
1102}
1103
1104impl<T: ThreadMode> Drop for TransactionDB<T> {
1105 fn drop(&mut self) {
1106 unsafe {
1107 self.prepared_transactions().clear();
1108 self.cfs.drop_all_cfs_internal();
1109 ffi::rocksdb_transactiondb_close(self.inner);
1110 }
1111 }
1112}