1use crate::{
17 column_family::AsColumnFamilyRef,
18 column_family::BoundColumnFamily,
19 column_family::UnboundColumnFamily,
20 db_options::OptionsMustOutliveDB,
21 ffi,
22 ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23 ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24 DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25 IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26 WaitForCompactOptions, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
27};
28
29use crate::ffi_util::CSlice;
30use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
31use std::collections::BTreeMap;
32use std::ffi::{CStr, CString};
33use std::fmt;
34use std::fs;
35use std::iter;
36use std::path::Path;
37use std::path::PathBuf;
38use std::ptr;
39use std::slice;
40use std::str;
41use std::sync::Arc;
42use std::sync::RwLock;
43use std::time::Duration;
44
45pub trait ThreadMode {
56 fn new_cf_map_internal(
58 cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
59 ) -> Self;
60 fn drop_all_cfs_internal(&mut self);
62}
63
64pub struct SingleThreaded {
71 pub(crate) cfs: BTreeMap<String, ColumnFamily>,
72}
73
74pub struct MultiThreaded {
80 pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
81}
82
83impl ThreadMode for SingleThreaded {
84 fn new_cf_map_internal(
85 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
86 ) -> Self {
87 Self {
88 cfs: cfs
89 .into_iter()
90 .map(|(n, c)| (n, ColumnFamily { inner: c }))
91 .collect(),
92 }
93 }
94
95 fn drop_all_cfs_internal(&mut self) {
96 self.cfs.clear();
98 }
99}
100
101impl ThreadMode for MultiThreaded {
102 fn new_cf_map_internal(
103 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
104 ) -> Self {
105 Self {
106 cfs: RwLock::new(
107 cfs.into_iter()
108 .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
109 .collect(),
110 ),
111 }
112 }
113
114 fn drop_all_cfs_internal(&mut self) {
115 self.cfs.write().unwrap().clear();
117 }
118}
119
120pub trait DBInner {
122 fn inner(&self) -> *mut ffi::rocksdb_t;
123}
124
125pub struct DBCommon<T: ThreadMode, D: DBInner> {
130 pub(crate) inner: D,
131 cfs: T, path: PathBuf,
133 _outlive: Vec<OptionsMustOutliveDB>,
134}
135
136pub trait DBAccess {
139 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
140
141 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
142
143 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
144
145 unsafe fn create_iterator_cf(
146 &self,
147 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
148 readopts: &ReadOptions,
149 ) -> *mut ffi::rocksdb_iterator_t;
150
151 fn get_opt<K: AsRef<[u8]>>(
152 &self,
153 key: K,
154 readopts: &ReadOptions,
155 ) -> Result<Option<Vec<u8>>, Error>;
156
157 fn get_cf_opt<K: AsRef<[u8]>>(
158 &self,
159 cf: &impl AsColumnFamilyRef,
160 key: K,
161 readopts: &ReadOptions,
162 ) -> Result<Option<Vec<u8>>, Error>;
163
164 fn get_pinned_opt<K: AsRef<[u8]>>(
165 &self,
166 key: K,
167 readopts: &ReadOptions,
168 ) -> Result<Option<DBPinnableSlice>, Error>;
169
170 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
171 &self,
172 cf: &impl AsColumnFamilyRef,
173 key: K,
174 readopts: &ReadOptions,
175 ) -> Result<Option<DBPinnableSlice>, Error>;
176
177 fn multi_get_opt<K, I>(
178 &self,
179 keys: I,
180 readopts: &ReadOptions,
181 ) -> Vec<Result<Option<Vec<u8>>, Error>>
182 where
183 K: AsRef<[u8]>,
184 I: IntoIterator<Item = K>;
185
186 fn multi_get_cf_opt<'b, K, I, W>(
187 &self,
188 keys_cf: I,
189 readopts: &ReadOptions,
190 ) -> Vec<Result<Option<Vec<u8>>, Error>>
191 where
192 K: AsRef<[u8]>,
193 I: IntoIterator<Item = (&'b W, K)>,
194 W: AsColumnFamilyRef + 'b;
195}
196
197impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
198 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
199 ffi::rocksdb_create_snapshot(self.inner.inner())
200 }
201
202 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
203 ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
204 }
205
206 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
207 ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
208 }
209
210 unsafe fn create_iterator_cf(
211 &self,
212 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
213 readopts: &ReadOptions,
214 ) -> *mut ffi::rocksdb_iterator_t {
215 ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
216 }
217
218 fn get_opt<K: AsRef<[u8]>>(
219 &self,
220 key: K,
221 readopts: &ReadOptions,
222 ) -> Result<Option<Vec<u8>>, Error> {
223 self.get_opt(key, readopts)
224 }
225
226 fn get_cf_opt<K: AsRef<[u8]>>(
227 &self,
228 cf: &impl AsColumnFamilyRef,
229 key: K,
230 readopts: &ReadOptions,
231 ) -> Result<Option<Vec<u8>>, Error> {
232 self.get_cf_opt(cf, key, readopts)
233 }
234
235 fn get_pinned_opt<K: AsRef<[u8]>>(
236 &self,
237 key: K,
238 readopts: &ReadOptions,
239 ) -> Result<Option<DBPinnableSlice>, Error> {
240 self.get_pinned_opt(key, readopts)
241 }
242
243 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
244 &self,
245 cf: &impl AsColumnFamilyRef,
246 key: K,
247 readopts: &ReadOptions,
248 ) -> Result<Option<DBPinnableSlice>, Error> {
249 self.get_pinned_cf_opt(cf, key, readopts)
250 }
251
252 fn multi_get_opt<K, Iter>(
253 &self,
254 keys: Iter,
255 readopts: &ReadOptions,
256 ) -> Vec<Result<Option<Vec<u8>>, Error>>
257 where
258 K: AsRef<[u8]>,
259 Iter: IntoIterator<Item = K>,
260 {
261 self.multi_get_opt(keys, readopts)
262 }
263
264 fn multi_get_cf_opt<'b, K, Iter, W>(
265 &self,
266 keys_cf: Iter,
267 readopts: &ReadOptions,
268 ) -> Vec<Result<Option<Vec<u8>>, Error>>
269 where
270 K: AsRef<[u8]>,
271 Iter: IntoIterator<Item = (&'b W, K)>,
272 W: AsColumnFamilyRef + 'b,
273 {
274 self.multi_get_cf_opt(keys_cf, readopts)
275 }
276}
277
278pub struct DBWithThreadModeInner {
279 inner: *mut ffi::rocksdb_t,
280}
281
282impl DBInner for DBWithThreadModeInner {
283 fn inner(&self) -> *mut ffi::rocksdb_t {
284 self.inner
285 }
286}
287
288impl Drop for DBWithThreadModeInner {
289 fn drop(&mut self) {
290 unsafe {
291 ffi::rocksdb_close(self.inner);
292 }
293 }
294}
295
296pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
301
302#[cfg(not(feature = "multi-threaded-cf"))]
325pub type DB = DBWithThreadMode<SingleThreaded>;
326
327#[cfg(feature = "multi-threaded-cf")]
328pub type DB = DBWithThreadMode<MultiThreaded>;
329
330unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
334
335unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
338
339enum AccessType<'a> {
341 ReadWrite,
342 ReadOnly { error_if_log_file_exist: bool },
343 Secondary { secondary_path: &'a Path },
344 WithTTL { ttl: Duration },
345}
346
347impl<T: ThreadMode> DBWithThreadMode<T> {
349 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
351 let mut opts = Options::default();
352 opts.create_if_missing(true);
353 Self::open(&opts, path)
354 }
355
356 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
358 Self::open_cf(opts, path, None::<&str>)
359 }
360
361 pub fn open_for_read_only<P: AsRef<Path>>(
363 opts: &Options,
364 path: P,
365 error_if_log_file_exist: bool,
366 ) -> Result<Self, Error> {
367 Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
368 }
369
370 pub fn open_as_secondary<P: AsRef<Path>>(
372 opts: &Options,
373 primary_path: P,
374 secondary_path: P,
375 ) -> Result<Self, Error> {
376 Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
377 }
378
379 pub fn open_with_ttl<P: AsRef<Path>>(
381 opts: &Options,
382 path: P,
383 ttl: Duration,
384 ) -> Result<Self, Error> {
385 Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
386 }
387
388 pub fn open_cf_with_ttl<P, I, N>(
392 opts: &Options,
393 path: P,
394 cfs: I,
395 ttl: Duration,
396 ) -> Result<Self, Error>
397 where
398 P: AsRef<Path>,
399 I: IntoIterator<Item = N>,
400 N: AsRef<str>,
401 {
402 let cfs = cfs
403 .into_iter()
404 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
405
406 Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
407 }
408
409 pub fn open_cf_descriptors_with_ttl<P, I>(
412 opts: &Options,
413 path: P,
414 cfs: I,
415 ttl: Duration,
416 ) -> Result<Self, Error>
417 where
418 P: AsRef<Path>,
419 I: IntoIterator<Item = ColumnFamilyDescriptor>,
420 {
421 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
422 }
423
424 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
428 where
429 P: AsRef<Path>,
430 I: IntoIterator<Item = N>,
431 N: AsRef<str>,
432 {
433 let cfs = cfs
434 .into_iter()
435 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
436
437 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
438 }
439
440 pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
444 where
445 P: AsRef<Path>,
446 I: IntoIterator<Item = (N, Options)>,
447 N: AsRef<str>,
448 {
449 let cfs = cfs
450 .into_iter()
451 .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
452
453 Self::open_cf_descriptors(opts, path, cfs)
454 }
455
456 pub fn open_cf_for_read_only<P, I, N>(
458 opts: &Options,
459 path: P,
460 cfs: I,
461 error_if_log_file_exist: bool,
462 ) -> Result<Self, Error>
463 where
464 P: AsRef<Path>,
465 I: IntoIterator<Item = N>,
466 N: AsRef<str>,
467 {
468 let cfs = cfs
469 .into_iter()
470 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
471
472 Self::open_cf_descriptors_internal(
473 opts,
474 path,
475 cfs,
476 &AccessType::ReadOnly {
477 error_if_log_file_exist,
478 },
479 )
480 }
481
482 pub fn open_cf_with_opts_for_read_only<P, I, N>(
484 db_opts: &Options,
485 path: P,
486 cfs: I,
487 error_if_log_file_exist: bool,
488 ) -> Result<Self, Error>
489 where
490 P: AsRef<Path>,
491 I: IntoIterator<Item = (N, Options)>,
492 N: AsRef<str>,
493 {
494 let cfs = cfs
495 .into_iter()
496 .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
497
498 Self::open_cf_descriptors_internal(
499 db_opts,
500 path,
501 cfs,
502 &AccessType::ReadOnly {
503 error_if_log_file_exist,
504 },
505 )
506 }
507
508 pub fn open_cf_descriptors_read_only<P, I>(
511 opts: &Options,
512 path: P,
513 cfs: I,
514 error_if_log_file_exist: bool,
515 ) -> Result<Self, Error>
516 where
517 P: AsRef<Path>,
518 I: IntoIterator<Item = ColumnFamilyDescriptor>,
519 {
520 Self::open_cf_descriptors_internal(
521 opts,
522 path,
523 cfs,
524 &AccessType::ReadOnly {
525 error_if_log_file_exist,
526 },
527 )
528 }
529
530 pub fn open_cf_as_secondary<P, I, N>(
532 opts: &Options,
533 primary_path: P,
534 secondary_path: P,
535 cfs: I,
536 ) -> Result<Self, Error>
537 where
538 P: AsRef<Path>,
539 I: IntoIterator<Item = N>,
540 N: AsRef<str>,
541 {
542 let cfs = cfs
543 .into_iter()
544 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
545
546 Self::open_cf_descriptors_internal(
547 opts,
548 primary_path,
549 cfs,
550 &AccessType::Secondary {
551 secondary_path: secondary_path.as_ref(),
552 },
553 )
554 }
555
556 pub fn open_cf_descriptors_as_secondary<P, I>(
559 opts: &Options,
560 path: P,
561 secondary_path: P,
562 cfs: I,
563 ) -> Result<Self, Error>
564 where
565 P: AsRef<Path>,
566 I: IntoIterator<Item = ColumnFamilyDescriptor>,
567 {
568 Self::open_cf_descriptors_internal(
569 opts,
570 path,
571 cfs,
572 &AccessType::Secondary {
573 secondary_path: secondary_path.as_ref(),
574 },
575 )
576 }
577
578 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
580 where
581 P: AsRef<Path>,
582 I: IntoIterator<Item = ColumnFamilyDescriptor>,
583 {
584 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
585 }
586
587 fn open_cf_descriptors_internal<P, I>(
589 opts: &Options,
590 path: P,
591 cfs: I,
592 access_type: &AccessType,
593 ) -> Result<Self, Error>
594 where
595 P: AsRef<Path>,
596 I: IntoIterator<Item = ColumnFamilyDescriptor>,
597 {
598 let cfs: Vec<_> = cfs.into_iter().collect();
599 let outlive = iter::once(opts.outlive.clone())
600 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
601 .collect();
602
603 let cpath = to_cpath(&path)?;
604
605 if let Err(e) = fs::create_dir_all(&path) {
606 return Err(Error::new(format!(
607 "Failed to create RocksDB directory: `{e:?}`."
608 )));
609 }
610
611 let db: *mut ffi::rocksdb_t;
612 let mut cf_map = BTreeMap::new();
613
614 if cfs.is_empty() {
615 db = Self::open_raw(opts, &cpath, access_type)?;
616 } else {
617 let mut cfs_v = cfs;
618 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
620 cfs_v.push(ColumnFamilyDescriptor {
621 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
622 options: Options::default(),
623 });
624 }
625 let c_cfs: Vec<CString> = cfs_v
628 .iter()
629 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
630 .collect();
631
632 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
633
634 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
636
637 let cfopts: Vec<_> = cfs_v
638 .iter()
639 .map(|cf| cf.options.inner as *const _)
640 .collect();
641
642 db = Self::open_cf_raw(
643 opts,
644 &cpath,
645 &cfs_v,
646 &cfnames,
647 &cfopts,
648 &mut cfhandles,
649 access_type,
650 )?;
651 for handle in &cfhandles {
652 if handle.is_null() {
653 return Err(Error::new(
654 "Received null column family handle from DB.".to_owned(),
655 ));
656 }
657 }
658
659 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
660 cf_map.insert(cf_desc.name.clone(), inner);
661 }
662 }
663
664 if db.is_null() {
665 return Err(Error::new("Could not initialize database.".to_owned()));
666 }
667
668 Ok(Self {
669 inner: DBWithThreadModeInner { inner: db },
670 path: path.as_ref().to_path_buf(),
671 cfs: T::new_cf_map_internal(cf_map),
672 _outlive: outlive,
673 })
674 }
675
676 fn open_raw(
677 opts: &Options,
678 cpath: &CString,
679 access_type: &AccessType,
680 ) -> Result<*mut ffi::rocksdb_t, Error> {
681 let db = unsafe {
682 match *access_type {
683 AccessType::ReadOnly {
684 error_if_log_file_exist,
685 } => ffi_try!(ffi::rocksdb_open_for_read_only(
686 opts.inner,
687 cpath.as_ptr(),
688 c_uchar::from(error_if_log_file_exist),
689 )),
690 AccessType::ReadWrite => {
691 ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
692 }
693 AccessType::Secondary { secondary_path } => {
694 ffi_try!(ffi::rocksdb_open_as_secondary(
695 opts.inner,
696 cpath.as_ptr(),
697 to_cpath(secondary_path)?.as_ptr(),
698 ))
699 }
700 AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
701 opts.inner,
702 cpath.as_ptr(),
703 ttl.as_secs() as c_int,
704 )),
705 }
706 };
707 Ok(db)
708 }
709
710 #[allow(clippy::pedantic)]
711 fn open_cf_raw(
712 opts: &Options,
713 cpath: &CString,
714 cfs_v: &[ColumnFamilyDescriptor],
715 cfnames: &[*const c_char],
716 cfopts: &[*const ffi::rocksdb_options_t],
717 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
718 access_type: &AccessType,
719 ) -> Result<*mut ffi::rocksdb_t, Error> {
720 let db = unsafe {
721 match *access_type {
722 AccessType::ReadOnly {
723 error_if_log_file_exist,
724 } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
725 opts.inner,
726 cpath.as_ptr(),
727 cfs_v.len() as c_int,
728 cfnames.as_ptr(),
729 cfopts.as_ptr(),
730 cfhandles.as_mut_ptr(),
731 c_uchar::from(error_if_log_file_exist),
732 )),
733 AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
734 opts.inner,
735 cpath.as_ptr(),
736 cfs_v.len() as c_int,
737 cfnames.as_ptr(),
738 cfopts.as_ptr(),
739 cfhandles.as_mut_ptr(),
740 )),
741 AccessType::Secondary { secondary_path } => {
742 ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
743 opts.inner,
744 cpath.as_ptr(),
745 to_cpath(secondary_path)?.as_ptr(),
746 cfs_v.len() as c_int,
747 cfnames.as_ptr(),
748 cfopts.as_ptr(),
749 cfhandles.as_mut_ptr(),
750 ))
751 }
752 AccessType::WithTTL { ttl } => {
753 let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
754 ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
755 opts.inner,
756 cpath.as_ptr(),
757 cfs_v.len() as c_int,
758 cfnames.as_ptr(),
759 cfopts.as_ptr(),
760 cfhandles.as_mut_ptr(),
761 ttls_v.as_ptr(),
762 ))
763 }
764 }
765 };
766 Ok(db)
767 }
768
769 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
771 &self,
772 cf: &impl AsColumnFamilyRef,
773 from: K,
774 to: K,
775 writeopts: &WriteOptions,
776 ) -> Result<(), Error> {
777 let from = from.as_ref();
778 let to = to.as_ref();
779
780 unsafe {
781 ffi_try!(ffi::rocksdb_delete_range_cf(
782 self.inner.inner(),
783 writeopts.inner,
784 cf.inner(),
785 from.as_ptr() as *const c_char,
786 from.len() as size_t,
787 to.as_ptr() as *const c_char,
788 to.len() as size_t,
789 ));
790 Ok(())
791 }
792 }
793
794 pub fn delete_range_cf<K: AsRef<[u8]>>(
796 &self,
797 cf: &impl AsColumnFamilyRef,
798 from: K,
799 to: K,
800 ) -> Result<(), Error> {
801 self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
802 }
803
804 pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
805 unsafe {
806 ffi_try!(ffi::rocksdb_write(
807 self.inner.inner(),
808 writeopts.inner,
809 batch.inner
810 ));
811 }
812 Ok(())
813 }
814
815 pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
816 self.write_opt(batch, &WriteOptions::default())
817 }
818
819 pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
820 let mut wo = WriteOptions::new();
821 wo.disable_wal(true);
822 self.write_opt(batch, &wo)
823 }
824}
825
826impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
828 pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
829 Self {
830 inner,
831 cfs,
832 path,
833 _outlive: outlive,
834 }
835 }
836
837 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
838 let cpath = to_cpath(path)?;
839 let mut length = 0;
840
841 unsafe {
842 let ptr = ffi_try!(ffi::rocksdb_list_column_families(
843 opts.inner,
844 cpath.as_ptr(),
845 &mut length,
846 ));
847
848 let vec = slice::from_raw_parts(ptr, length)
849 .iter()
850 .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
851 .collect();
852 ffi::rocksdb_list_column_families_destroy(ptr, length);
853 Ok(vec)
854 }
855 }
856
857 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
858 let cpath = to_cpath(path)?;
859 unsafe {
860 ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
861 }
862 Ok(())
863 }
864
865 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
866 let cpath = to_cpath(path)?;
867 unsafe {
868 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
869 }
870 Ok(())
871 }
872
873 pub fn path(&self) -> &Path {
874 self.path.as_path()
875 }
876
877 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
880 unsafe {
881 ffi_try!(ffi::rocksdb_flush_wal(
882 self.inner.inner(),
883 c_uchar::from(sync)
884 ));
885 }
886 Ok(())
887 }
888
889 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
891 unsafe {
892 ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
893 }
894 Ok(())
895 }
896
897 pub fn flush(&self) -> Result<(), Error> {
899 self.flush_opt(&FlushOptions::default())
900 }
901
902 pub fn flush_cf_opt(
904 &self,
905 cf: &impl AsColumnFamilyRef,
906 flushopts: &FlushOptions,
907 ) -> Result<(), Error> {
908 unsafe {
909 ffi_try!(ffi::rocksdb_flush_cf(
910 self.inner.inner(),
911 flushopts.inner,
912 cf.inner()
913 ));
914 }
915 Ok(())
916 }
917
918 pub fn flush_cfs_opt(
924 &self,
925 cfs: &[&impl AsColumnFamilyRef],
926 opts: &FlushOptions,
927 ) -> Result<(), Error> {
928 let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
929 unsafe {
930 ffi_try!(ffi::rocksdb_flush_cfs(
931 self.inner.inner(),
932 opts.inner,
933 cfs.as_mut_ptr(),
934 cfs.len() as libc::c_int,
935 ));
936 }
937 Ok(())
938 }
939
940 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
943 self.flush_cf_opt(cf, &FlushOptions::default())
944 }
945
946 pub fn get_opt<K: AsRef<[u8]>>(
950 &self,
951 key: K,
952 readopts: &ReadOptions,
953 ) -> Result<Option<Vec<u8>>, Error> {
954 self.get_pinned_opt(key, readopts)
955 .map(|x| x.map(|v| v.as_ref().to_vec()))
956 }
957
958 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
962 self.get_opt(key.as_ref(), &ReadOptions::default())
963 }
964
965 pub fn get_cf_opt<K: AsRef<[u8]>>(
969 &self,
970 cf: &impl AsColumnFamilyRef,
971 key: K,
972 readopts: &ReadOptions,
973 ) -> Result<Option<Vec<u8>>, Error> {
974 self.get_pinned_cf_opt(cf, key, readopts)
975 .map(|x| x.map(|v| v.as_ref().to_vec()))
976 }
977
978 pub fn get_cf<K: AsRef<[u8]>>(
982 &self,
983 cf: &impl AsColumnFamilyRef,
984 key: K,
985 ) -> Result<Option<Vec<u8>>, Error> {
986 self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
987 }
988
989 pub fn get_pinned_opt<K: AsRef<[u8]>>(
992 &self,
993 key: K,
994 readopts: &ReadOptions,
995 ) -> Result<Option<DBPinnableSlice>, Error> {
996 if readopts.inner.is_null() {
997 return Err(Error::new(
998 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
999 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1000 .to_owned(),
1001 ));
1002 }
1003
1004 let key = key.as_ref();
1005 unsafe {
1006 let val = ffi_try!(ffi::rocksdb_get_pinned(
1007 self.inner.inner(),
1008 readopts.inner,
1009 key.as_ptr() as *const c_char,
1010 key.len() as size_t,
1011 ));
1012 if val.is_null() {
1013 Ok(None)
1014 } else {
1015 Ok(Some(DBPinnableSlice::from_c(val)))
1016 }
1017 }
1018 }
1019
1020 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1024 self.get_pinned_opt(key, &ReadOptions::default())
1025 }
1026
1027 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1031 &self,
1032 cf: &impl AsColumnFamilyRef,
1033 key: K,
1034 readopts: &ReadOptions,
1035 ) -> Result<Option<DBPinnableSlice>, Error> {
1036 if readopts.inner.is_null() {
1037 return Err(Error::new(
1038 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1039 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1040 .to_owned(),
1041 ));
1042 }
1043
1044 let key = key.as_ref();
1045 unsafe {
1046 let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1047 self.inner.inner(),
1048 readopts.inner,
1049 cf.inner(),
1050 key.as_ptr() as *const c_char,
1051 key.len() as size_t,
1052 ));
1053 if val.is_null() {
1054 Ok(None)
1055 } else {
1056 Ok(Some(DBPinnableSlice::from_c(val)))
1057 }
1058 }
1059 }
1060
1061 pub fn get_pinned_cf<K: AsRef<[u8]>>(
1065 &self,
1066 cf: &impl AsColumnFamilyRef,
1067 key: K,
1068 ) -> Result<Option<DBPinnableSlice>, Error> {
1069 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1070 }
1071
1072 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1074 where
1075 K: AsRef<[u8]>,
1076 I: IntoIterator<Item = K>,
1077 {
1078 self.multi_get_opt(keys, &ReadOptions::default())
1079 }
1080
1081 pub fn multi_get_opt<K, I>(
1083 &self,
1084 keys: I,
1085 readopts: &ReadOptions,
1086 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1087 where
1088 K: AsRef<[u8]>,
1089 I: IntoIterator<Item = K>,
1090 {
1091 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1092 .into_iter()
1093 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1094 .unzip();
1095 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1096
1097 let mut values = vec![ptr::null_mut(); keys.len()];
1098 let mut values_sizes = vec![0_usize; keys.len()];
1099 let mut errors = vec![ptr::null_mut(); keys.len()];
1100 unsafe {
1101 ffi::rocksdb_multi_get(
1102 self.inner.inner(),
1103 readopts.inner,
1104 ptr_keys.len(),
1105 ptr_keys.as_ptr(),
1106 keys_sizes.as_ptr(),
1107 values.as_mut_ptr(),
1108 values_sizes.as_mut_ptr(),
1109 errors.as_mut_ptr(),
1110 );
1111 }
1112
1113 convert_values(values, values_sizes, errors)
1114 }
1115
1116 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1118 &'a self,
1119 keys: I,
1120 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1121 where
1122 K: AsRef<[u8]>,
1123 I: IntoIterator<Item = (&'b W, K)>,
1124 W: 'b + AsColumnFamilyRef,
1125 {
1126 self.multi_get_cf_opt(keys, &ReadOptions::default())
1127 }
1128
1129 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1131 &'a self,
1132 keys: I,
1133 readopts: &ReadOptions,
1134 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1135 where
1136 K: AsRef<[u8]>,
1137 I: IntoIterator<Item = (&'b W, K)>,
1138 W: 'b + AsColumnFamilyRef,
1139 {
1140 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1141 .into_iter()
1142 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1143 .unzip();
1144 let ptr_keys: Vec<_> = cfs_and_keys
1145 .iter()
1146 .map(|(_, k)| k.as_ptr() as *const c_char)
1147 .collect();
1148 let ptr_cfs: Vec<_> = cfs_and_keys
1149 .iter()
1150 .map(|(c, _)| c.inner() as *const _)
1151 .collect();
1152
1153 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1154 let mut values_sizes = vec![0_usize; ptr_keys.len()];
1155 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1156 unsafe {
1157 ffi::rocksdb_multi_get_cf(
1158 self.inner.inner(),
1159 readopts.inner,
1160 ptr_cfs.as_ptr(),
1161 ptr_keys.len(),
1162 ptr_keys.as_ptr(),
1163 keys_sizes.as_ptr(),
1164 values.as_mut_ptr(),
1165 values_sizes.as_mut_ptr(),
1166 errors.as_mut_ptr(),
1167 );
1168 }
1169
1170 convert_values(values, values_sizes, errors)
1171 }
1172
1173 pub fn batched_multi_get_cf<'a, K, I>(
1177 &self,
1178 cf: &impl AsColumnFamilyRef,
1179 keys: I,
1180 sorted_input: bool,
1181 ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1182 where
1183 K: AsRef<[u8]> + 'a + ?Sized,
1184 I: IntoIterator<Item = &'a K>,
1185 {
1186 self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1187 }
1188
1189 pub fn batched_multi_get_cf_opt<'a, K, I>(
1193 &self,
1194 cf: &impl AsColumnFamilyRef,
1195 keys: I,
1196 sorted_input: bool,
1197 readopts: &ReadOptions,
1198 ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1199 where
1200 K: AsRef<[u8]> + 'a + ?Sized,
1201 I: IntoIterator<Item = &'a K>,
1202 {
1203 let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1204 .into_iter()
1205 .map(|k| {
1206 let k = k.as_ref();
1207 (k.as_ptr() as *const c_char, k.len())
1208 })
1209 .unzip();
1210
1211 let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1212 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1213
1214 unsafe {
1215 ffi::rocksdb_batched_multi_get_cf(
1216 self.inner.inner(),
1217 readopts.inner,
1218 cf.inner(),
1219 ptr_keys.len(),
1220 ptr_keys.as_ptr(),
1221 keys_sizes.as_ptr(),
1222 pinned_values.as_mut_ptr(),
1223 errors.as_mut_ptr(),
1224 sorted_input,
1225 );
1226 pinned_values
1227 .into_iter()
1228 .zip(errors.into_iter())
1229 .map(|(v, e)| {
1230 if e.is_null() {
1231 if v.is_null() {
1232 Ok(None)
1233 } else {
1234 Ok(Some(DBPinnableSlice::from_c(v)))
1235 }
1236 } else {
1237 Err(Error::new(crate::ffi_util::error_message(e)))
1238 }
1239 })
1240 .collect()
1241 }
1242 }
1243
1244 pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1247 self.key_may_exist_opt(key, &ReadOptions::default())
1248 }
1249
1250 pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1253 let key = key.as_ref();
1254 unsafe {
1255 0 != ffi::rocksdb_key_may_exist(
1256 self.inner.inner(),
1257 readopts.inner,
1258 key.as_ptr() as *const c_char,
1259 key.len() as size_t,
1260 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1266 }
1267 }
1268
1269 pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1272 self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1273 }
1274
1275 pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1278 &self,
1279 cf: &impl AsColumnFamilyRef,
1280 key: K,
1281 readopts: &ReadOptions,
1282 ) -> bool {
1283 let key = key.as_ref();
1284 0 != unsafe {
1285 ffi::rocksdb_key_may_exist_cf(
1286 self.inner.inner(),
1287 readopts.inner,
1288 cf.inner(),
1289 key.as_ptr() as *const c_char,
1290 key.len() as size_t,
1291 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1297 }
1298 }
1299
1300 pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1307 &self,
1308 cf: &impl AsColumnFamilyRef,
1309 key: K,
1310 readopts: &ReadOptions,
1311 ) -> (bool, Option<CSlice>) {
1312 let key = key.as_ref();
1313 let mut val: *mut c_char = ptr::null_mut();
1314 let mut val_len: usize = 0;
1315 let mut value_found: c_uchar = 0;
1316 let may_exists = 0
1317 != unsafe {
1318 ffi::rocksdb_key_may_exist_cf(
1319 self.inner.inner(),
1320 readopts.inner,
1321 cf.inner(),
1322 key.as_ptr() as *const c_char,
1323 key.len() as size_t,
1324 &mut val, &mut val_len, ptr::null(), 0, &mut value_found, )
1330 };
1331 if may_exists && value_found != 0 {
1334 (
1335 may_exists,
1336 Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1337 )
1338 } else {
1339 (may_exists, None)
1340 }
1341 }
1342
1343 fn create_inner_cf_handle(
1344 &self,
1345 name: impl CStrLike,
1346 opts: &Options,
1347 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1348 let cf_name = name.bake().map_err(|err| {
1349 Error::new(format!(
1350 "Failed to convert path to CString when creating cf: {err}"
1351 ))
1352 })?;
1353 Ok(unsafe {
1354 ffi_try!(ffi::rocksdb_create_column_family(
1355 self.inner.inner(),
1356 opts.inner,
1357 cf_name.as_ptr(),
1358 ))
1359 })
1360 }
1361
1362 pub fn iterator<'a: 'b, 'b>(
1363 &'a self,
1364 mode: IteratorMode,
1365 ) -> DBIteratorWithThreadMode<'b, Self> {
1366 let readopts = ReadOptions::default();
1367 self.iterator_opt(mode, readopts)
1368 }
1369
1370 pub fn iterator_opt<'a: 'b, 'b>(
1371 &'a self,
1372 mode: IteratorMode,
1373 readopts: ReadOptions,
1374 ) -> DBIteratorWithThreadMode<'b, Self> {
1375 DBIteratorWithThreadMode::new(self, readopts, mode)
1376 }
1377
1378 pub fn iterator_cf_opt<'a: 'b, 'b>(
1381 &'a self,
1382 cf_handle: &impl AsColumnFamilyRef,
1383 readopts: ReadOptions,
1384 mode: IteratorMode,
1385 ) -> DBIteratorWithThreadMode<'b, Self> {
1386 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1387 }
1388
1389 pub fn full_iterator<'a: 'b, 'b>(
1393 &'a self,
1394 mode: IteratorMode,
1395 ) -> DBIteratorWithThreadMode<'b, Self> {
1396 let mut opts = ReadOptions::default();
1397 opts.set_total_order_seek(true);
1398 DBIteratorWithThreadMode::new(self, opts, mode)
1399 }
1400
1401 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1402 &'a self,
1403 prefix: P,
1404 ) -> DBIteratorWithThreadMode<'b, Self> {
1405 let mut opts = ReadOptions::default();
1406 opts.set_prefix_same_as_start(true);
1407 DBIteratorWithThreadMode::new(
1408 self,
1409 opts,
1410 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1411 )
1412 }
1413
1414 pub fn iterator_cf<'a: 'b, 'b>(
1415 &'a self,
1416 cf_handle: &impl AsColumnFamilyRef,
1417 mode: IteratorMode,
1418 ) -> DBIteratorWithThreadMode<'b, Self> {
1419 let opts = ReadOptions::default();
1420 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1421 }
1422
1423 pub fn full_iterator_cf<'a: 'b, 'b>(
1424 &'a self,
1425 cf_handle: &impl AsColumnFamilyRef,
1426 mode: IteratorMode,
1427 ) -> DBIteratorWithThreadMode<'b, Self> {
1428 let mut opts = ReadOptions::default();
1429 opts.set_total_order_seek(true);
1430 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1431 }
1432
1433 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1434 &'a self,
1435 cf_handle: &impl AsColumnFamilyRef,
1436 prefix: P,
1437 ) -> DBIteratorWithThreadMode<'a, Self> {
1438 let mut opts = ReadOptions::default();
1439 opts.set_prefix_same_as_start(true);
1440 DBIteratorWithThreadMode::<'a, Self>::new_cf(
1441 self,
1442 cf_handle.inner(),
1443 opts,
1444 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1445 )
1446 }
1447
1448 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1450 let opts = ReadOptions::default();
1451 DBRawIteratorWithThreadMode::new(self, opts)
1452 }
1453
1454 pub fn raw_iterator_cf<'a: 'b, 'b>(
1456 &'a self,
1457 cf_handle: &impl AsColumnFamilyRef,
1458 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1459 let opts = ReadOptions::default();
1460 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1461 }
1462
1463 pub fn raw_iterator_opt<'a: 'b, 'b>(
1465 &'a self,
1466 readopts: ReadOptions,
1467 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1468 DBRawIteratorWithThreadMode::new(self, readopts)
1469 }
1470
1471 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1473 &'a self,
1474 cf_handle: &impl AsColumnFamilyRef,
1475 readopts: ReadOptions,
1476 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1477 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1478 }
1479
1480 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1481 SnapshotWithThreadMode::<Self>::new(self)
1482 }
1483
1484 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1485 where
1486 K: AsRef<[u8]>,
1487 V: AsRef<[u8]>,
1488 {
1489 let key = key.as_ref();
1490 let value = value.as_ref();
1491
1492 unsafe {
1493 ffi_try!(ffi::rocksdb_put(
1494 self.inner.inner(),
1495 writeopts.inner,
1496 key.as_ptr() as *const c_char,
1497 key.len() as size_t,
1498 value.as_ptr() as *const c_char,
1499 value.len() as size_t,
1500 ));
1501 Ok(())
1502 }
1503 }
1504
1505 pub fn put_cf_opt<K, V>(
1506 &self,
1507 cf: &impl AsColumnFamilyRef,
1508 key: K,
1509 value: V,
1510 writeopts: &WriteOptions,
1511 ) -> Result<(), Error>
1512 where
1513 K: AsRef<[u8]>,
1514 V: AsRef<[u8]>,
1515 {
1516 let key = key.as_ref();
1517 let value = value.as_ref();
1518
1519 unsafe {
1520 ffi_try!(ffi::rocksdb_put_cf(
1521 self.inner.inner(),
1522 writeopts.inner,
1523 cf.inner(),
1524 key.as_ptr() as *const c_char,
1525 key.len() as size_t,
1526 value.as_ptr() as *const c_char,
1527 value.len() as size_t,
1528 ));
1529 Ok(())
1530 }
1531 }
1532
1533 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1534 where
1535 K: AsRef<[u8]>,
1536 V: AsRef<[u8]>,
1537 {
1538 let key = key.as_ref();
1539 let value = value.as_ref();
1540
1541 unsafe {
1542 ffi_try!(ffi::rocksdb_merge(
1543 self.inner.inner(),
1544 writeopts.inner,
1545 key.as_ptr() as *const c_char,
1546 key.len() as size_t,
1547 value.as_ptr() as *const c_char,
1548 value.len() as size_t,
1549 ));
1550 Ok(())
1551 }
1552 }
1553
1554 pub fn merge_cf_opt<K, V>(
1555 &self,
1556 cf: &impl AsColumnFamilyRef,
1557 key: K,
1558 value: V,
1559 writeopts: &WriteOptions,
1560 ) -> Result<(), Error>
1561 where
1562 K: AsRef<[u8]>,
1563 V: AsRef<[u8]>,
1564 {
1565 let key = key.as_ref();
1566 let value = value.as_ref();
1567
1568 unsafe {
1569 ffi_try!(ffi::rocksdb_merge_cf(
1570 self.inner.inner(),
1571 writeopts.inner,
1572 cf.inner(),
1573 key.as_ptr() as *const c_char,
1574 key.len() as size_t,
1575 value.as_ptr() as *const c_char,
1576 value.len() as size_t,
1577 ));
1578 Ok(())
1579 }
1580 }
1581
1582 pub fn delete_opt<K: AsRef<[u8]>>(
1583 &self,
1584 key: K,
1585 writeopts: &WriteOptions,
1586 ) -> Result<(), Error> {
1587 let key = key.as_ref();
1588
1589 unsafe {
1590 ffi_try!(ffi::rocksdb_delete(
1591 self.inner.inner(),
1592 writeopts.inner,
1593 key.as_ptr() as *const c_char,
1594 key.len() as size_t,
1595 ));
1596 Ok(())
1597 }
1598 }
1599
1600 pub fn delete_cf_opt<K: AsRef<[u8]>>(
1601 &self,
1602 cf: &impl AsColumnFamilyRef,
1603 key: K,
1604 writeopts: &WriteOptions,
1605 ) -> Result<(), Error> {
1606 let key = key.as_ref();
1607
1608 unsafe {
1609 ffi_try!(ffi::rocksdb_delete_cf(
1610 self.inner.inner(),
1611 writeopts.inner,
1612 cf.inner(),
1613 key.as_ptr() as *const c_char,
1614 key.len() as size_t,
1615 ));
1616 Ok(())
1617 }
1618 }
1619
1620 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1621 where
1622 K: AsRef<[u8]>,
1623 V: AsRef<[u8]>,
1624 {
1625 self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1626 }
1627
1628 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1629 where
1630 K: AsRef<[u8]>,
1631 V: AsRef<[u8]>,
1632 {
1633 self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1634 }
1635
1636 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1637 where
1638 K: AsRef<[u8]>,
1639 V: AsRef<[u8]>,
1640 {
1641 self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1642 }
1643
1644 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1645 where
1646 K: AsRef<[u8]>,
1647 V: AsRef<[u8]>,
1648 {
1649 self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1650 }
1651
1652 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1653 self.delete_opt(key.as_ref(), &WriteOptions::default())
1654 }
1655
1656 pub fn delete_cf<K: AsRef<[u8]>>(
1657 &self,
1658 cf: &impl AsColumnFamilyRef,
1659 key: K,
1660 ) -> Result<(), Error> {
1661 self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1662 }
1663
1664 pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1666 unsafe {
1667 let start = start.as_ref().map(AsRef::as_ref);
1668 let end = end.as_ref().map(AsRef::as_ref);
1669
1670 ffi::rocksdb_compact_range(
1671 self.inner.inner(),
1672 opt_bytes_to_ptr(start),
1673 start.map_or(0, <[u8]>::len) as size_t,
1674 opt_bytes_to_ptr(end),
1675 end.map_or(0, <[u8]>::len) as size_t,
1676 );
1677 }
1678 }
1679
1680 pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1682 &self,
1683 start: Option<S>,
1684 end: Option<E>,
1685 opts: &CompactOptions,
1686 ) {
1687 unsafe {
1688 let start = start.as_ref().map(AsRef::as_ref);
1689 let end = end.as_ref().map(AsRef::as_ref);
1690
1691 ffi::rocksdb_compact_range_opt(
1692 self.inner.inner(),
1693 opts.inner,
1694 opt_bytes_to_ptr(start),
1695 start.map_or(0, <[u8]>::len) as size_t,
1696 opt_bytes_to_ptr(end),
1697 end.map_or(0, <[u8]>::len) as size_t,
1698 );
1699 }
1700 }
1701
1702 pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1705 &self,
1706 cf: &impl AsColumnFamilyRef,
1707 start: Option<S>,
1708 end: Option<E>,
1709 ) {
1710 unsafe {
1711 let start = start.as_ref().map(AsRef::as_ref);
1712 let end = end.as_ref().map(AsRef::as_ref);
1713
1714 ffi::rocksdb_compact_range_cf(
1715 self.inner.inner(),
1716 cf.inner(),
1717 opt_bytes_to_ptr(start),
1718 start.map_or(0, <[u8]>::len) as size_t,
1719 opt_bytes_to_ptr(end),
1720 end.map_or(0, <[u8]>::len) as size_t,
1721 );
1722 }
1723 }
1724
1725 pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1727 &self,
1728 cf: &impl AsColumnFamilyRef,
1729 start: Option<S>,
1730 end: Option<E>,
1731 opts: &CompactOptions,
1732 ) {
1733 unsafe {
1734 let start = start.as_ref().map(AsRef::as_ref);
1735 let end = end.as_ref().map(AsRef::as_ref);
1736
1737 ffi::rocksdb_compact_range_cf_opt(
1738 self.inner.inner(),
1739 cf.inner(),
1740 opts.inner,
1741 opt_bytes_to_ptr(start),
1742 start.map_or(0, <[u8]>::len) as size_t,
1743 opt_bytes_to_ptr(end),
1744 end.map_or(0, <[u8]>::len) as size_t,
1745 );
1746 }
1747 }
1748
1749 pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
1758 unsafe {
1759 ffi_try!(ffi::rocksdb_wait_for_compact(
1760 self.inner.inner(),
1761 opts.inner
1762 ));
1763 }
1764 Ok(())
1765 }
1766
1767 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1768 let copts = convert_options(opts)?;
1769 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1770 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1771 let count = opts.len() as i32;
1772 unsafe {
1773 ffi_try!(ffi::rocksdb_set_options(
1774 self.inner.inner(),
1775 count,
1776 cnames.as_ptr(),
1777 cvalues.as_ptr(),
1778 ));
1779 }
1780 Ok(())
1781 }
1782
1783 pub fn set_options_cf(
1784 &self,
1785 cf: &impl AsColumnFamilyRef,
1786 opts: &[(&str, &str)],
1787 ) -> Result<(), Error> {
1788 let copts = convert_options(opts)?;
1789 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1790 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1791 let count = opts.len() as i32;
1792 unsafe {
1793 ffi_try!(ffi::rocksdb_set_options_cf(
1794 self.inner.inner(),
1795 cf.inner(),
1796 count,
1797 cnames.as_ptr(),
1798 cvalues.as_ptr(),
1799 ));
1800 }
1801 Ok(())
1802 }
1803
1804 fn property_value_impl<R>(
1813 name: impl CStrLike,
1814 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1815 parse: impl FnOnce(&str) -> Result<R, Error>,
1816 ) -> Result<Option<R>, Error> {
1817 let value = match name.bake() {
1818 Ok(prop_name) => get_property(prop_name.as_ptr()),
1819 Err(e) => {
1820 return Err(Error::new(format!(
1821 "Failed to convert property name to CString: {e}"
1822 )));
1823 }
1824 };
1825 if value.is_null() {
1826 return Ok(None);
1827 }
1828 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1829 Ok(s) => parse(s).map(|value| Some(value)),
1830 Err(e) => Err(Error::new(format!(
1831 "Failed to convert property value to string: {e}"
1832 ))),
1833 };
1834 unsafe {
1835 ffi::rocksdb_free(value as *mut c_void);
1836 }
1837 result
1838 }
1839
1840 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1845 Self::property_value_impl(
1846 name,
1847 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1848 |str_value| Ok(str_value.to_owned()),
1849 )
1850 }
1851
1852 pub fn property_value_cf(
1857 &self,
1858 cf: &impl AsColumnFamilyRef,
1859 name: impl CStrLike,
1860 ) -> Result<Option<String>, Error> {
1861 Self::property_value_impl(
1862 name,
1863 |prop_name| unsafe {
1864 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1865 },
1866 |str_value| Ok(str_value.to_owned()),
1867 )
1868 }
1869
1870 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1871 value.parse::<u64>().map_err(|err| {
1872 Error::new(format!(
1873 "Failed to convert property value {value} to int: {err}"
1874 ))
1875 })
1876 }
1877
1878 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1883 Self::property_value_impl(
1884 name,
1885 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1886 Self::parse_property_int_value,
1887 )
1888 }
1889
1890 pub fn property_int_value_cf(
1895 &self,
1896 cf: &impl AsColumnFamilyRef,
1897 name: impl CStrLike,
1898 ) -> Result<Option<u64>, Error> {
1899 Self::property_value_impl(
1900 name,
1901 |prop_name| unsafe {
1902 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1903 },
1904 Self::parse_property_int_value,
1905 )
1906 }
1907
1908 pub fn latest_sequence_number(&self) -> u64 {
1910 unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1911 }
1912
1913 pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1924 unsafe {
1925 let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1929 let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1930 self.inner.inner(),
1931 seq_number,
1932 opts
1933 ));
1934 Ok(DBWALIterator {
1935 inner: iter,
1936 start_seq_number: seq_number,
1937 })
1938 }
1939 }
1940
1941 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1944 unsafe {
1945 ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1946 }
1947 Ok(())
1948 }
1949
1950 pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1952 let opts = IngestExternalFileOptions::default();
1953 self.ingest_external_file_opts(&opts, paths)
1954 }
1955
1956 pub fn ingest_external_file_opts<P: AsRef<Path>>(
1958 &self,
1959 opts: &IngestExternalFileOptions,
1960 paths: Vec<P>,
1961 ) -> Result<(), Error> {
1962 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1963 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1964
1965 self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1966 }
1967
1968 pub fn ingest_external_file_cf<P: AsRef<Path>>(
1971 &self,
1972 cf: &impl AsColumnFamilyRef,
1973 paths: Vec<P>,
1974 ) -> Result<(), Error> {
1975 let opts = IngestExternalFileOptions::default();
1976 self.ingest_external_file_cf_opts(cf, &opts, paths)
1977 }
1978
1979 pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1981 &self,
1982 cf: &impl AsColumnFamilyRef,
1983 opts: &IngestExternalFileOptions,
1984 paths: Vec<P>,
1985 ) -> Result<(), Error> {
1986 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1987 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1988
1989 self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1990 }
1991
1992 fn ingest_external_file_raw(
1993 &self,
1994 opts: &IngestExternalFileOptions,
1995 paths_v: &[CString],
1996 cpaths: &[*const c_char],
1997 ) -> Result<(), Error> {
1998 unsafe {
1999 ffi_try!(ffi::rocksdb_ingest_external_file(
2000 self.inner.inner(),
2001 cpaths.as_ptr(),
2002 paths_v.len(),
2003 opts.inner as *const _
2004 ));
2005 Ok(())
2006 }
2007 }
2008
2009 fn ingest_external_file_raw_cf(
2010 &self,
2011 cf: &impl AsColumnFamilyRef,
2012 opts: &IngestExternalFileOptions,
2013 paths_v: &[CString],
2014 cpaths: &[*const c_char],
2015 ) -> Result<(), Error> {
2016 unsafe {
2017 ffi_try!(ffi::rocksdb_ingest_external_file_cf(
2018 self.inner.inner(),
2019 cf.inner(),
2020 cpaths.as_ptr(),
2021 paths_v.len(),
2022 opts.inner as *const _
2023 ));
2024 Ok(())
2025 }
2026 }
2027
2028 pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
2030 unsafe {
2031 let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
2032
2033 let metadata = ColumnFamilyMetaData {
2034 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2035 name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2036 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2037 };
2038
2039 ffi::rocksdb_column_family_metadata_destroy(ptr);
2041
2042 metadata
2044 }
2045 }
2046
2047 pub fn get_column_family_metadata_cf(
2049 &self,
2050 cf: &impl AsColumnFamilyRef,
2051 ) -> ColumnFamilyMetaData {
2052 unsafe {
2053 let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
2054
2055 let metadata = ColumnFamilyMetaData {
2056 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2057 name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2058 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2059 };
2060
2061 ffi::rocksdb_column_family_metadata_destroy(ptr);
2063
2064 metadata
2066 }
2067 }
2068
2069 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
2072 unsafe {
2073 let files = ffi::rocksdb_livefiles(self.inner.inner());
2074 if files.is_null() {
2075 Err(Error::new("Could not get live files".to_owned()))
2076 } else {
2077 let n = ffi::rocksdb_livefiles_count(files);
2078
2079 let mut livefiles = Vec::with_capacity(n as usize);
2080 let mut key_size: usize = 0;
2081
2082 for i in 0..n {
2083 let column_family_name =
2084 from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2085 let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2086 let size = ffi::rocksdb_livefiles_size(files, i);
2087 let level = ffi::rocksdb_livefiles_level(files, i);
2088
2089 let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2091 let smallest_key = raw_data(smallest_key, key_size);
2092
2093 let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2095 let largest_key = raw_data(largest_key, key_size);
2096
2097 livefiles.push(LiveFile {
2098 column_family_name,
2099 name,
2100 size,
2101 level,
2102 start_key: smallest_key,
2103 end_key: largest_key,
2104 num_entries: ffi::rocksdb_livefiles_entries(files, i),
2105 num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2106 });
2107 }
2108
2109 ffi::rocksdb_livefiles_destroy(files);
2111
2112 Ok(livefiles)
2114 }
2115 }
2116 }
2117
2118 pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2127 let from = from.as_ref();
2128 let to = to.as_ref();
2129 unsafe {
2130 ffi_try!(ffi::rocksdb_delete_file_in_range(
2131 self.inner.inner(),
2132 from.as_ptr() as *const c_char,
2133 from.len() as size_t,
2134 to.as_ptr() as *const c_char,
2135 to.len() as size_t,
2136 ));
2137 Ok(())
2138 }
2139 }
2140
2141 pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2143 &self,
2144 cf: &impl AsColumnFamilyRef,
2145 from: K,
2146 to: K,
2147 ) -> Result<(), Error> {
2148 let from = from.as_ref();
2149 let to = to.as_ref();
2150 unsafe {
2151 ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2152 self.inner.inner(),
2153 cf.inner(),
2154 from.as_ptr() as *const c_char,
2155 from.len() as size_t,
2156 to.as_ptr() as *const c_char,
2157 to.len() as size_t,
2158 ));
2159 Ok(())
2160 }
2161 }
2162
2163 pub fn cancel_all_background_work(&self, wait: bool) {
2165 unsafe {
2166 ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2167 }
2168 }
2169
2170 fn drop_column_family<C>(
2171 &self,
2172 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2173 cf: C,
2174 ) -> Result<(), Error> {
2175 unsafe {
2176 ffi_try!(ffi::rocksdb_drop_column_family(
2178 self.inner.inner(),
2179 cf_inner
2180 ));
2181 }
2182 drop(cf);
2185 Ok(())
2186 }
2187}
2188
2189impl<I: DBInner> DBCommon<SingleThreaded, I> {
2190 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2192 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2193 self.cfs
2194 .cfs
2195 .insert(name.as_ref().to_string(), ColumnFamily { inner });
2196 Ok(())
2197 }
2198
2199 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2201 if let Some(cf) = self.cfs.cfs.remove(name) {
2202 self.drop_column_family(cf.inner, cf)
2203 } else {
2204 Err(Error::new(format!("Invalid column family: {name}")))
2205 }
2206 }
2207
2208 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2210 self.cfs.cfs.get(name)
2211 }
2212}
2213
2214impl<I: DBInner> DBCommon<MultiThreaded, I> {
2215 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2217 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2218 self.cfs.cfs.write().unwrap().insert(
2219 name.as_ref().to_string(),
2220 Arc::new(UnboundColumnFamily { inner }),
2221 );
2222 Ok(())
2223 }
2224
2225 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2228 if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2229 self.drop_column_family(cf.inner, cf)
2230 } else {
2231 Err(Error::new(format!("Invalid column family: {name}")))
2232 }
2233 }
2234
2235 pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2237 self.cfs
2238 .cfs
2239 .read()
2240 .unwrap()
2241 .get(name)
2242 .cloned()
2243 .map(UnboundColumnFamily::bound_column_family)
2244 }
2245}
2246
2247impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2248 fn drop(&mut self) {
2249 self.cfs.drop_all_cfs_internal();
2250 }
2251}
2252
2253impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2254 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2255 write!(f, "RocksDB {{ path: {:?} }}", self.path())
2256 }
2257}
2258
2259#[derive(Debug, Clone)]
2261pub struct ColumnFamilyMetaData {
2262 pub size: u64,
2265 pub name: String,
2267 pub file_count: usize,
2269}
2270
2271#[derive(Debug, Clone)]
2273pub struct LiveFile {
2274 pub column_family_name: String,
2276 pub name: String,
2278 pub size: usize,
2280 pub level: i32,
2282 pub start_key: Option<Vec<u8>>,
2284 pub end_key: Option<Vec<u8>>,
2286 pub num_entries: u64,
2288 pub num_deletions: u64,
2290}
2291
2292fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2293 opts.iter()
2294 .map(|(name, value)| {
2295 let cname = match CString::new(name.as_bytes()) {
2296 Ok(cname) => cname,
2297 Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2298 };
2299 let cvalue = match CString::new(value.as_bytes()) {
2300 Ok(cvalue) => cvalue,
2301 Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2302 };
2303 Ok((cname, cvalue))
2304 })
2305 .collect()
2306}
2307
2308pub(crate) fn convert_values(
2309 values: Vec<*mut c_char>,
2310 values_sizes: Vec<usize>,
2311 errors: Vec<*mut c_char>,
2312) -> Vec<Result<Option<Vec<u8>>, Error>> {
2313 values
2314 .into_iter()
2315 .zip(values_sizes.into_iter())
2316 .zip(errors.into_iter())
2317 .map(|((v, s), e)| {
2318 if e.is_null() {
2319 let value = unsafe { crate::ffi_util::raw_data(v, s) };
2320 unsafe {
2321 ffi::rocksdb_free(v as *mut c_void);
2322 }
2323 Ok(value)
2324 } else {
2325 Err(Error::new(crate::ffi_util::error_message(e)))
2326 }
2327 })
2328 .collect()
2329}