1use std::{
17 collections::BTreeMap,
18 ffi::CString,
19 fs, iter,
20 marker::PhantomData,
21 path::{Path, PathBuf},
22 ptr,
23 sync::{Arc, Mutex},
24};
25
26use crate::{
27 column_family::UnboundColumnFamily,
28 db::{convert_values, DBAccess},
29 db_options::OptionsMustOutliveDB,
30 ffi,
31 ffi_util::to_cpath,
32 AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
33 DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
34 IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
35 ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
36 WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
37};
38use ffi::rocksdb_transaction_t;
39use libc::{c_char, c_int, c_void, size_t};
40
41#[cfg(not(feature = "multi-threaded-cf"))]
42type DefaultThreadMode = crate::SingleThreaded;
43#[cfg(feature = "multi-threaded-cf")]
44type DefaultThreadMode = crate::MultiThreaded;
45
46pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
72 pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
73 cfs: T,
74 path: PathBuf,
75 prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
77 _outlive: Vec<OptionsMustOutliveDB>,
78}
79
80unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
81unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
82
83impl<T: ThreadMode> DBAccess for TransactionDB<T> {
84 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
85 ffi::rocksdb_transactiondb_create_snapshot(self.inner)
86 }
87
88 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
89 ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
90 }
91
92 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
93 ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
94 }
95
96 unsafe fn create_iterator_cf(
97 &self,
98 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
99 readopts: &ReadOptions,
100 ) -> *mut ffi::rocksdb_iterator_t {
101 ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
102 }
103
104 fn get_opt<K: AsRef<[u8]>>(
105 &self,
106 key: K,
107 readopts: &ReadOptions,
108 ) -> Result<Option<Vec<u8>>, Error> {
109 self.get_opt(key, readopts)
110 }
111
112 fn get_cf_opt<K: AsRef<[u8]>>(
113 &self,
114 cf: &impl AsColumnFamilyRef,
115 key: K,
116 readopts: &ReadOptions,
117 ) -> Result<Option<Vec<u8>>, Error> {
118 self.get_cf_opt(cf, key, readopts)
119 }
120
121 fn get_pinned_opt<K: AsRef<[u8]>>(
122 &self,
123 key: K,
124 readopts: &ReadOptions,
125 ) -> Result<Option<DBPinnableSlice>, Error> {
126 self.get_pinned_opt(key, readopts)
127 }
128
129 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
130 &self,
131 cf: &impl AsColumnFamilyRef,
132 key: K,
133 readopts: &ReadOptions,
134 ) -> Result<Option<DBPinnableSlice>, Error> {
135 self.get_pinned_cf_opt(cf, key, readopts)
136 }
137
138 fn multi_get_opt<K, I>(
139 &self,
140 keys: I,
141 readopts: &ReadOptions,
142 ) -> Vec<Result<Option<Vec<u8>>, Error>>
143 where
144 K: AsRef<[u8]>,
145 I: IntoIterator<Item = K>,
146 {
147 self.multi_get_opt(keys, readopts)
148 }
149
150 fn multi_get_cf_opt<'b, K, I, W>(
151 &self,
152 keys_cf: I,
153 readopts: &ReadOptions,
154 ) -> Vec<Result<Option<Vec<u8>>, Error>>
155 where
156 K: AsRef<[u8]>,
157 I: IntoIterator<Item = (&'b W, K)>,
158 W: AsColumnFamilyRef + 'b,
159 {
160 self.multi_get_cf_opt(keys_cf, readopts)
161 }
162}
163
164impl<T: ThreadMode> TransactionDB<T> {
165 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
167 let mut opts = Options::default();
168 opts.create_if_missing(true);
169 let txn_db_opts = TransactionDBOptions::default();
170 Self::open(&opts, &txn_db_opts, path)
171 }
172
173 pub fn open<P: AsRef<Path>>(
175 opts: &Options,
176 txn_db_opts: &TransactionDBOptions,
177 path: P,
178 ) -> Result<Self, Error> {
179 Self::open_cf(opts, txn_db_opts, path, None::<&str>)
180 }
181
182 pub fn open_cf<P, I, N>(
186 opts: &Options,
187 txn_db_opts: &TransactionDBOptions,
188 path: P,
189 cfs: I,
190 ) -> Result<Self, Error>
191 where
192 P: AsRef<Path>,
193 I: IntoIterator<Item = N>,
194 N: AsRef<str>,
195 {
196 let cfs = cfs
197 .into_iter()
198 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
199
200 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
201 }
202
203 pub fn open_cf_descriptors<P, I>(
205 opts: &Options,
206 txn_db_opts: &TransactionDBOptions,
207 path: P,
208 cfs: I,
209 ) -> Result<Self, Error>
210 where
211 P: AsRef<Path>,
212 I: IntoIterator<Item = ColumnFamilyDescriptor>,
213 {
214 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
215 }
216
217 fn open_cf_descriptors_internal<P, I>(
219 opts: &Options,
220 txn_db_opts: &TransactionDBOptions,
221 path: P,
222 cfs: I,
223 ) -> Result<Self, Error>
224 where
225 P: AsRef<Path>,
226 I: IntoIterator<Item = ColumnFamilyDescriptor>,
227 {
228 let cfs: Vec<_> = cfs.into_iter().collect();
229 let outlive = iter::once(opts.outlive.clone())
230 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
231 .collect();
232
233 let cpath = to_cpath(&path)?;
234
235 if let Err(e) = fs::create_dir_all(&path) {
236 return Err(Error::new(format!(
237 "Failed to create RocksDB directory: `{e:?}`."
238 )));
239 }
240
241 let db: *mut ffi::rocksdb_transactiondb_t;
242 let mut cf_map = BTreeMap::new();
243
244 if cfs.is_empty() {
245 db = Self::open_raw(opts, txn_db_opts, &cpath)?;
246 } else {
247 let mut cfs_v = cfs;
248 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
250 cfs_v.push(ColumnFamilyDescriptor {
251 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
252 options: Options::default(),
253 });
254 }
255 let c_cfs: Vec<CString> = cfs_v
258 .iter()
259 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
260 .collect();
261
262 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
263
264 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
266
267 let cfopts: Vec<_> = cfs_v
268 .iter()
269 .map(|cf| cf.options.inner as *const _)
270 .collect();
271
272 db = Self::open_cf_raw(
273 opts,
274 txn_db_opts,
275 &cpath,
276 &cfs_v,
277 &cfnames,
278 &cfopts,
279 &mut cfhandles,
280 )?;
281
282 for handle in &cfhandles {
283 if handle.is_null() {
284 return Err(Error::new(
285 "Received null column family handle from DB.".to_owned(),
286 ));
287 }
288 }
289
290 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
291 cf_map.insert(cf_desc.name.clone(), inner);
292 }
293 }
294
295 if db.is_null() {
296 return Err(Error::new("Could not initialize database.".to_owned()));
297 }
298
299 let prepared = unsafe {
300 let mut cnt = 0;
301 let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
302 let mut vec = vec![std::ptr::null_mut(); cnt];
303 std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
304 if !ptr.is_null() {
305 ffi::rocksdb_free(ptr as *mut c_void);
306 }
307 vec
308 };
309
310 Ok(TransactionDB {
311 inner: db,
312 cfs: T::new_cf_map_internal(cf_map),
313 path: path.as_ref().to_path_buf(),
314 prepared: Mutex::new(prepared),
315 _outlive: outlive,
316 })
317 }
318
319 fn open_raw(
320 opts: &Options,
321 txn_db_opts: &TransactionDBOptions,
322 cpath: &CString,
323 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
324 unsafe {
325 let db = ffi_try!(ffi::rocksdb_transactiondb_open(
326 opts.inner,
327 txn_db_opts.inner,
328 cpath.as_ptr()
329 ));
330 Ok(db)
331 }
332 }
333
334 fn open_cf_raw(
335 opts: &Options,
336 txn_db_opts: &TransactionDBOptions,
337 cpath: &CString,
338 cfs_v: &[ColumnFamilyDescriptor],
339 cfnames: &[*const c_char],
340 cfopts: &[*const ffi::rocksdb_options_t],
341 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
342 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
343 unsafe {
344 let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
345 opts.inner,
346 txn_db_opts.inner,
347 cpath.as_ptr(),
348 cfs_v.len() as c_int,
349 cfnames.as_ptr(),
350 cfopts.as_ptr(),
351 cfhandles.as_mut_ptr(),
352 ));
353 Ok(db)
354 }
355 }
356
357 fn create_inner_cf_handle(
358 &self,
359 name: &str,
360 opts: &Options,
361 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
362 let cf_name = CString::new(name.as_bytes()).map_err(|_| {
363 Error::new("Failed to convert path to CString when creating cf".to_owned())
364 })?;
365
366 Ok(unsafe {
367 ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
368 self.inner,
369 opts.inner,
370 cf_name.as_ptr(),
371 ))
372 })
373 }
374
375 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
376 DB::list_cf(opts, path)
377 }
378
379 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
380 DB::destroy(opts, path)
381 }
382
383 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
384 DB::repair(opts, path)
385 }
386
387 pub fn path(&self) -> &Path {
388 self.path.as_path()
389 }
390
391 pub fn transaction(&self) -> Transaction<Self> {
393 self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
394 }
395
396 pub fn transaction_opt<'a>(
398 &'a self,
399 write_opts: &WriteOptions,
400 txn_opts: &TransactionOptions,
401 ) -> Transaction<'a, Self> {
402 Transaction {
403 inner: unsafe {
404 ffi::rocksdb_transaction_begin(
405 self.inner,
406 write_opts.inner,
407 txn_opts.inner,
408 std::ptr::null_mut(),
409 )
410 },
411 _marker: PhantomData::default(),
412 }
413 }
414
415 pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
420 self.prepared
421 .lock()
422 .unwrap()
423 .drain(0..)
424 .map(|inner| Transaction {
425 inner,
426 _marker: PhantomData::default(),
427 })
428 .collect()
429 }
430
431 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
433 self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
434 }
435
436 pub fn get_cf<K: AsRef<[u8]>>(
438 &self,
439 cf: &impl AsColumnFamilyRef,
440 key: K,
441 ) -> Result<Option<Vec<u8>>, Error> {
442 self.get_pinned_cf(cf, key)
443 .map(|x| x.map(|v| v.as_ref().to_vec()))
444 }
445
446 pub fn get_opt<K: AsRef<[u8]>>(
448 &self,
449 key: K,
450 readopts: &ReadOptions,
451 ) -> Result<Option<Vec<u8>>, Error> {
452 self.get_pinned_opt(key, readopts)
453 .map(|x| x.map(|v| v.as_ref().to_vec()))
454 }
455
456 pub fn get_cf_opt<K: AsRef<[u8]>>(
458 &self,
459 cf: &impl AsColumnFamilyRef,
460 key: K,
461 readopts: &ReadOptions,
462 ) -> Result<Option<Vec<u8>>, Error> {
463 self.get_pinned_cf_opt(cf, key, readopts)
464 .map(|x| x.map(|v| v.as_ref().to_vec()))
465 }
466
467 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
468 self.get_pinned_opt(key, &ReadOptions::default())
469 }
470
471 pub fn get_pinned_cf<K: AsRef<[u8]>>(
473 &self,
474 cf: &impl AsColumnFamilyRef,
475 key: K,
476 ) -> Result<Option<DBPinnableSlice>, Error> {
477 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
478 }
479
480 pub fn get_pinned_opt<K: AsRef<[u8]>>(
482 &self,
483 key: K,
484 readopts: &ReadOptions,
485 ) -> Result<Option<DBPinnableSlice>, Error> {
486 let key = key.as_ref();
487 unsafe {
488 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
489 self.inner,
490 readopts.inner,
491 key.as_ptr() as *const c_char,
492 key.len() as size_t,
493 ));
494 if val.is_null() {
495 Ok(None)
496 } else {
497 Ok(Some(DBPinnableSlice::from_c(val)))
498 }
499 }
500 }
501
502 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
504 &self,
505 cf: &impl AsColumnFamilyRef,
506 key: K,
507 readopts: &ReadOptions,
508 ) -> Result<Option<DBPinnableSlice>, Error> {
509 unsafe {
510 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
511 self.inner,
512 readopts.inner,
513 cf.inner(),
514 key.as_ref().as_ptr() as *const c_char,
515 key.as_ref().len() as size_t,
516 ));
517 if val.is_null() {
518 Ok(None)
519 } else {
520 Ok(Some(DBPinnableSlice::from_c(val)))
521 }
522 }
523 }
524
525 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
527 where
528 K: AsRef<[u8]>,
529 I: IntoIterator<Item = K>,
530 {
531 self.multi_get_opt(keys, &ReadOptions::default())
532 }
533
534 pub fn multi_get_opt<K, I>(
536 &self,
537 keys: I,
538 readopts: &ReadOptions,
539 ) -> Vec<Result<Option<Vec<u8>>, Error>>
540 where
541 K: AsRef<[u8]>,
542 I: IntoIterator<Item = K>,
543 {
544 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
545 .into_iter()
546 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
547 .unzip();
548 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
549
550 let mut values = vec![ptr::null_mut(); keys.len()];
551 let mut values_sizes = vec![0_usize; keys.len()];
552 let mut errors = vec![ptr::null_mut(); keys.len()];
553 unsafe {
554 ffi::rocksdb_transactiondb_multi_get(
555 self.inner,
556 readopts.inner,
557 ptr_keys.len(),
558 ptr_keys.as_ptr(),
559 keys_sizes.as_ptr(),
560 values.as_mut_ptr(),
561 values_sizes.as_mut_ptr(),
562 errors.as_mut_ptr(),
563 );
564 }
565
566 convert_values(values, values_sizes, errors)
567 }
568
569 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
571 &'a self,
572 keys: I,
573 ) -> Vec<Result<Option<Vec<u8>>, Error>>
574 where
575 K: AsRef<[u8]>,
576 I: IntoIterator<Item = (&'b W, K)>,
577 W: 'b + AsColumnFamilyRef,
578 {
579 self.multi_get_cf_opt(keys, &ReadOptions::default())
580 }
581
582 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
584 &'a self,
585 keys: I,
586 readopts: &ReadOptions,
587 ) -> Vec<Result<Option<Vec<u8>>, Error>>
588 where
589 K: AsRef<[u8]>,
590 I: IntoIterator<Item = (&'b W, K)>,
591 W: 'b + AsColumnFamilyRef,
592 {
593 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
594 .into_iter()
595 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
596 .unzip();
597 let ptr_keys: Vec<_> = cfs_and_keys
598 .iter()
599 .map(|(_, k)| k.as_ptr() as *const c_char)
600 .collect();
601 let ptr_cfs: Vec<_> = cfs_and_keys
602 .iter()
603 .map(|(c, _)| c.inner() as *const _)
604 .collect();
605
606 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
607 let mut values_sizes = vec![0_usize; ptr_keys.len()];
608 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
609 unsafe {
610 ffi::rocksdb_transactiondb_multi_get_cf(
611 self.inner,
612 readopts.inner,
613 ptr_cfs.as_ptr(),
614 ptr_keys.len(),
615 ptr_keys.as_ptr(),
616 keys_sizes.as_ptr(),
617 values.as_mut_ptr(),
618 values_sizes.as_mut_ptr(),
619 errors.as_mut_ptr(),
620 );
621 }
622
623 convert_values(values, values_sizes, errors)
624 }
625
626 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
627 where
628 K: AsRef<[u8]>,
629 V: AsRef<[u8]>,
630 {
631 self.put_opt(key, value, &WriteOptions::default())
632 }
633
634 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
635 where
636 K: AsRef<[u8]>,
637 V: AsRef<[u8]>,
638 {
639 self.put_cf_opt(cf, key, value, &WriteOptions::default())
640 }
641
642 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
643 where
644 K: AsRef<[u8]>,
645 V: AsRef<[u8]>,
646 {
647 unsafe {
648 ffi_try!(ffi::rocksdb_transactiondb_put(
649 self.inner,
650 writeopts.inner,
651 key.as_ref().as_ptr() as *const c_char,
652 key.as_ref().len() as size_t,
653 value.as_ref().as_ptr() as *const c_char,
654 value.as_ref().len() as size_t
655 ));
656 }
657 Ok(())
658 }
659
660 pub fn put_cf_opt<K, V>(
661 &self,
662 cf: &impl AsColumnFamilyRef,
663 key: K,
664 value: V,
665 writeopts: &WriteOptions,
666 ) -> Result<(), Error>
667 where
668 K: AsRef<[u8]>,
669 V: AsRef<[u8]>,
670 {
671 unsafe {
672 ffi_try!(ffi::rocksdb_transactiondb_put_cf(
673 self.inner,
674 writeopts.inner,
675 cf.inner(),
676 key.as_ref().as_ptr() as *const c_char,
677 key.as_ref().len() as size_t,
678 value.as_ref().as_ptr() as *const c_char,
679 value.as_ref().len() as size_t
680 ));
681 }
682 Ok(())
683 }
684
685 pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
686 self.write_opt(batch, &WriteOptions::default())
687 }
688
689 pub fn write_opt(
690 &self,
691 batch: WriteBatchWithTransaction<true>,
692 writeopts: &WriteOptions,
693 ) -> Result<(), Error> {
694 unsafe {
695 ffi_try!(ffi::rocksdb_transactiondb_write(
696 self.inner,
697 writeopts.inner,
698 batch.inner
699 ));
700 }
701 Ok(())
702 }
703
704 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
705 where
706 K: AsRef<[u8]>,
707 V: AsRef<[u8]>,
708 {
709 self.merge_opt(key, value, &WriteOptions::default())
710 }
711
712 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
713 where
714 K: AsRef<[u8]>,
715 V: AsRef<[u8]>,
716 {
717 self.merge_cf_opt(cf, key, value, &WriteOptions::default())
718 }
719
720 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
721 where
722 K: AsRef<[u8]>,
723 V: AsRef<[u8]>,
724 {
725 unsafe {
726 ffi_try!(ffi::rocksdb_transactiondb_merge(
727 self.inner,
728 writeopts.inner,
729 key.as_ref().as_ptr() as *const c_char,
730 key.as_ref().len() as size_t,
731 value.as_ref().as_ptr() as *const c_char,
732 value.as_ref().len() as size_t,
733 ));
734 Ok(())
735 }
736 }
737
738 pub fn merge_cf_opt<K, V>(
739 &self,
740 cf: &impl AsColumnFamilyRef,
741 key: K,
742 value: V,
743 writeopts: &WriteOptions,
744 ) -> Result<(), Error>
745 where
746 K: AsRef<[u8]>,
747 V: AsRef<[u8]>,
748 {
749 unsafe {
750 ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
751 self.inner,
752 writeopts.inner,
753 cf.inner(),
754 key.as_ref().as_ptr() as *const c_char,
755 key.as_ref().len() as size_t,
756 value.as_ref().as_ptr() as *const c_char,
757 value.as_ref().len() as size_t,
758 ));
759 Ok(())
760 }
761 }
762
763 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
764 self.delete_opt(key, &WriteOptions::default())
765 }
766
767 pub fn delete_cf<K: AsRef<[u8]>>(
768 &self,
769 cf: &impl AsColumnFamilyRef,
770 key: K,
771 ) -> Result<(), Error> {
772 self.delete_cf_opt(cf, key, &WriteOptions::default())
773 }
774
775 pub fn delete_opt<K: AsRef<[u8]>>(
776 &self,
777 key: K,
778 writeopts: &WriteOptions,
779 ) -> Result<(), Error> {
780 unsafe {
781 ffi_try!(ffi::rocksdb_transactiondb_delete(
782 self.inner,
783 writeopts.inner,
784 key.as_ref().as_ptr() as *const c_char,
785 key.as_ref().len() as size_t,
786 ));
787 }
788 Ok(())
789 }
790
791 pub fn delete_cf_opt<K: AsRef<[u8]>>(
792 &self,
793 cf: &impl AsColumnFamilyRef,
794 key: K,
795 writeopts: &WriteOptions,
796 ) -> Result<(), Error> {
797 unsafe {
798 ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
799 self.inner,
800 writeopts.inner,
801 cf.inner(),
802 key.as_ref().as_ptr() as *const c_char,
803 key.as_ref().len() as size_t,
804 ));
805 }
806 Ok(())
807 }
808
809 pub fn iterator<'a: 'b, 'b>(
810 &'a self,
811 mode: IteratorMode,
812 ) -> DBIteratorWithThreadMode<'b, Self> {
813 let readopts = ReadOptions::default();
814 self.iterator_opt(mode, readopts)
815 }
816
817 pub fn iterator_opt<'a: 'b, 'b>(
818 &'a self,
819 mode: IteratorMode,
820 readopts: ReadOptions,
821 ) -> DBIteratorWithThreadMode<'b, Self> {
822 DBIteratorWithThreadMode::new(self, readopts, mode)
823 }
824
825 pub fn iterator_cf_opt<'a: 'b, 'b>(
828 &'a self,
829 cf_handle: &impl AsColumnFamilyRef,
830 readopts: ReadOptions,
831 mode: IteratorMode,
832 ) -> DBIteratorWithThreadMode<'b, Self> {
833 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
834 }
835
836 pub fn full_iterator<'a: 'b, 'b>(
840 &'a self,
841 mode: IteratorMode,
842 ) -> DBIteratorWithThreadMode<'b, Self> {
843 let mut opts = ReadOptions::default();
844 opts.set_total_order_seek(true);
845 DBIteratorWithThreadMode::new(self, opts, mode)
846 }
847
848 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
849 &'a self,
850 prefix: P,
851 ) -> DBIteratorWithThreadMode<'b, Self> {
852 let mut opts = ReadOptions::default();
853 opts.set_prefix_same_as_start(true);
854 DBIteratorWithThreadMode::new(
855 self,
856 opts,
857 IteratorMode::From(prefix.as_ref(), Direction::Forward),
858 )
859 }
860
861 pub fn iterator_cf<'a: 'b, 'b>(
862 &'a self,
863 cf_handle: &impl AsColumnFamilyRef,
864 mode: IteratorMode,
865 ) -> DBIteratorWithThreadMode<'b, Self> {
866 let opts = ReadOptions::default();
867 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
868 }
869
870 pub fn full_iterator_cf<'a: 'b, 'b>(
871 &'a self,
872 cf_handle: &impl AsColumnFamilyRef,
873 mode: IteratorMode,
874 ) -> DBIteratorWithThreadMode<'b, Self> {
875 let mut opts = ReadOptions::default();
876 opts.set_total_order_seek(true);
877 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
878 }
879
880 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
881 &'a self,
882 cf_handle: &impl AsColumnFamilyRef,
883 prefix: P,
884 ) -> DBIteratorWithThreadMode<'a, Self> {
885 let mut opts = ReadOptions::default();
886 opts.set_prefix_same_as_start(true);
887 DBIteratorWithThreadMode::<'a, Self>::new_cf(
888 self,
889 cf_handle.inner(),
890 opts,
891 IteratorMode::From(prefix.as_ref(), Direction::Forward),
892 )
893 }
894
895 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
897 let opts = ReadOptions::default();
898 DBRawIteratorWithThreadMode::new(self, opts)
899 }
900
901 pub fn raw_iterator_cf<'a: 'b, 'b>(
903 &'a self,
904 cf_handle: &impl AsColumnFamilyRef,
905 ) -> DBRawIteratorWithThreadMode<'b, Self> {
906 let opts = ReadOptions::default();
907 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
908 }
909
910 pub fn raw_iterator_opt<'a: 'b, 'b>(
912 &'a self,
913 readopts: ReadOptions,
914 ) -> DBRawIteratorWithThreadMode<'b, Self> {
915 DBRawIteratorWithThreadMode::new(self, readopts)
916 }
917
918 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
920 &'a self,
921 cf_handle: &impl AsColumnFamilyRef,
922 readopts: ReadOptions,
923 ) -> DBRawIteratorWithThreadMode<'b, Self> {
924 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
925 }
926
927 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
928 SnapshotWithThreadMode::<Self>::new(self)
929 }
930
931 fn drop_column_family<C>(
932 &self,
933 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
934 _cf: C,
935 ) -> Result<(), Error> {
936 unsafe {
937 ffi_try!(ffi::rocksdb_drop_column_family(
939 self.inner as *mut ffi::rocksdb_t,
940 cf_inner
941 ));
942 }
943 Ok(())
946 }
947}
948
949impl TransactionDB<SingleThreaded> {
950 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
952 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
953 self.cfs
954 .cfs
955 .insert(name.as_ref().to_string(), ColumnFamily { inner });
956 Ok(())
957 }
958
959 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
961 self.cfs.cfs.get(name)
962 }
963
964 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
966 if let Some(cf) = self.cfs.cfs.remove(name) {
967 self.drop_column_family(cf.inner, cf)
968 } else {
969 Err(Error::new(format!("Invalid column family: {name}")))
970 }
971 }
972}
973
974impl TransactionDB<MultiThreaded> {
975 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
977 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
978 self.cfs.cfs.write().unwrap().insert(
979 name.as_ref().to_string(),
980 Arc::new(UnboundColumnFamily { inner }),
981 );
982 Ok(())
983 }
984
985 pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
987 self.cfs
988 .cfs
989 .read()
990 .unwrap()
991 .get(name)
992 .cloned()
993 .map(UnboundColumnFamily::bound_column_family)
994 }
995
996 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
999 if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
1000 self.drop_column_family(cf.inner, cf)
1001 } else {
1002 Err(Error::new(format!("Invalid column family: {name}")))
1003 }
1004 }
1005}
1006
1007impl<T: ThreadMode> Drop for TransactionDB<T> {
1008 fn drop(&mut self) {
1009 unsafe {
1010 self.prepared_transactions().clear();
1011 self.cfs.drop_all_cfs_internal();
1012 ffi::rocksdb_transactiondb_close(self.inner);
1013 }
1014 }
1015}