1use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int, size_t};
19
20use crate::column_family::ColumnFamilyTtl;
21use crate::{
22 db::{DBCommon, DBInner},
23 ffi,
24 ffi_util::to_cpath,
25 write_batch::WriteBatchWithTransaction,
26 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options,
27 ThreadMode, Transaction, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
28};
29
30#[cfg(not(feature = "multi-threaded-cf"))]
65pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
66 DBCommon<T, OptimisticTransactionDBInner>;
67#[cfg(feature = "multi-threaded-cf")]
68pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
69 DBCommon<T, OptimisticTransactionDBInner>;
70
71pub struct OptimisticTransactionDBInner {
72 base: *mut ffi::rocksdb_t,
73 db: *mut ffi::rocksdb_optimistictransactiondb_t,
74}
75
76impl DBInner for OptimisticTransactionDBInner {
77 fn inner(&self) -> *mut ffi::rocksdb_t {
78 self.base
79 }
80}
81
82impl Drop for OptimisticTransactionDBInner {
83 fn drop(&mut self) {
84 unsafe {
85 ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
86 ffi::rocksdb_optimistictransactiondb_close(self.db);
87 }
88 }
89}
90
91impl<T: ThreadMode> OptimisticTransactionDB<T> {
93 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
95 let mut opts = Options::default();
96 opts.create_if_missing(true);
97 Self::open(&opts, path)
98 }
99
100 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
102 Self::open_cf(opts, path, None::<&str>)
103 }
104
105 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
112 where
113 P: AsRef<Path>,
114 I: IntoIterator<Item = N>,
115 N: AsRef<str>,
116 {
117 let cfs = cfs
118 .into_iter()
119 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
120
121 Self::open_cf_descriptors_internal(opts, path, cfs)
122 }
123
124 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
126 where
127 P: AsRef<Path>,
128 I: IntoIterator<Item = ColumnFamilyDescriptor>,
129 {
130 Self::open_cf_descriptors_internal(opts, path, cfs)
131 }
132
133 fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
135 where
136 P: AsRef<Path>,
137 I: IntoIterator<Item = ColumnFamilyDescriptor>,
138 {
139 let cfs: Vec<_> = cfs.into_iter().collect();
140 let outlive = iter::once(opts.outlive.clone())
141 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
142 .collect();
143
144 let cpath = to_cpath(&path)?;
145
146 if let Err(e) = fs::create_dir_all(&path) {
147 return Err(Error::new(format!(
148 "Failed to create RocksDB directory: `{e:?}`."
149 )));
150 }
151
152 let db: *mut ffi::rocksdb_optimistictransactiondb_t;
153 let mut cf_map = BTreeMap::new();
154
155 if cfs.is_empty() {
156 db = Self::open_raw(opts, &cpath)?;
157 } else {
158 let mut cfs_v = cfs;
159 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
161 cfs_v.push(ColumnFamilyDescriptor {
162 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
163 options: Options::default(),
164 ttl: ColumnFamilyTtl::SameAsDb,
165 });
166 }
167 let c_cfs: Vec<CString> = cfs_v
170 .iter()
171 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
172 .collect();
173
174 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
175
176 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
178
179 let cfopts: Vec<_> = cfs_v
180 .iter()
181 .map(|cf| cf.options.inner.cast_const())
182 .collect();
183
184 db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
185
186 for handle in &cfhandles {
187 if handle.is_null() {
188 return Err(Error::new(
189 "Received null column family handle from DB.".to_owned(),
190 ));
191 }
192 }
193
194 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
195 cf_map.insert(cf_desc.name.clone(), inner);
196 }
197 }
198
199 if db.is_null() {
200 return Err(Error::new("Could not initialize database.".to_owned()));
201 }
202
203 let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
204 if base.is_null() {
205 unsafe {
206 ffi::rocksdb_optimistictransactiondb_close(db);
207 }
208 return Err(Error::new("Could not initialize database.".to_owned()));
209 }
210 let inner = OptimisticTransactionDBInner { base, db };
211
212 Ok(Self::new(
213 inner,
214 T::new_cf_map_internal(cf_map),
215 path.as_ref().to_path_buf(),
216 outlive,
217 ))
218 }
219
220 fn open_raw(
221 opts: &Options,
222 cpath: &CString,
223 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
224 unsafe {
225 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
226 opts.inner,
227 cpath.as_ptr()
228 ));
229 Ok(db)
230 }
231 }
232
233 fn open_cf_raw(
234 opts: &Options,
235 cpath: &CString,
236 cfs_v: &[ColumnFamilyDescriptor],
237 cfnames: &[*const c_char],
238 cfopts: &[*const ffi::rocksdb_options_t],
239 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
240 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
241 unsafe {
242 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
243 opts.inner,
244 cpath.as_ptr(),
245 cfs_v.len() as c_int,
246 cfnames.as_ptr(),
247 cfopts.as_ptr(),
248 cfhandles.as_mut_ptr(),
249 ));
250 Ok(db)
251 }
252 }
253
254 pub fn transaction(&self) -> Transaction<Self> {
256 self.transaction_opt(
257 &WriteOptions::default(),
258 &OptimisticTransactionOptions::default(),
259 )
260 }
261
262 pub fn transaction_opt(
264 &self,
265 writeopts: &WriteOptions,
266 otxn_opts: &OptimisticTransactionOptions,
267 ) -> Transaction<Self> {
268 Transaction {
269 inner: unsafe {
270 ffi::rocksdb_optimistictransaction_begin(
271 self.inner.db,
272 writeopts.inner,
273 otxn_opts.inner,
274 std::ptr::null_mut(),
275 )
276 },
277 _marker: PhantomData,
278 }
279 }
280
281 pub fn write_opt(
282 &self,
283 batch: WriteBatchWithTransaction<true>,
284 writeopts: &WriteOptions,
285 ) -> Result<(), Error> {
286 unsafe {
287 ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
288 self.inner.db,
289 writeopts.inner,
290 batch.inner
291 ));
292 }
293 Ok(())
294 }
295
296 pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
297 self.write_opt(batch, &WriteOptions::default())
298 }
299
300 pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
301 let mut wo = WriteOptions::new();
302 wo.disable_wal(true);
303 self.write_opt(batch, &wo)
304 }
305
306 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
308 &self,
309 cf: &impl AsColumnFamilyRef,
310 from: K,
311 to: K,
312 writeopts: &WriteOptions,
313 ) -> Result<(), Error> {
314 let from = from.as_ref();
315 let to = to.as_ref();
316
317 unsafe {
318 ffi_try!(ffi::rocksdb_delete_range_cf(
319 self.inner.inner(),
320 writeopts.inner,
321 cf.inner(),
322 from.as_ptr() as *const c_char,
323 from.len() as size_t,
324 to.as_ptr() as *const c_char,
325 to.len() as size_t,
326 ));
327 Ok(())
328 }
329 }
330
331 pub fn delete_range_cf<K: AsRef<[u8]>>(
333 &self,
334 cf: &impl AsColumnFamilyRef,
335 from: K,
336 to: K,
337 ) -> Result<(), Error> {
338 self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
339 }
340}