rocksdb/
write_batch.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{ffi, AsColumnFamilyRef};
16use libc::{c_char, c_void, size_t};
17use std::slice;
18
19/// A type alias to keep compatibility. See [`WriteBatchWithTransaction`] for details
20pub type WriteBatch = WriteBatchWithTransaction<false>;
21
22/// An atomic batch of write operations.
23///
24/// [`delete_range`](#method.delete_range) is not supported in [`Transaction`].
25///
26/// Making an atomic commit of several writes:
27///
28/// ```
29/// use rocksdb::{DB, Options, WriteBatchWithTransaction};
30///
31/// let path = "_path_for_rocksdb_storage1";
32/// {
33///     let db = DB::open_default(path).unwrap();
34///     let mut batch = WriteBatchWithTransaction::<false>::default();
35///     batch.put(b"my key", b"my value");
36///     batch.put(b"key2", b"value2");
37///     batch.put(b"key3", b"value3");
38///
39///     // delete_range is supported when use without transaction
40///     batch.delete_range(b"key2", b"key3");
41///
42///     db.write(batch); // Atomically commits the batch
43/// }
44/// let _ = DB::destroy(&Options::default(), path);
45/// ```
46///
47/// [`Transaction`]: crate::Transaction
48pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
49    pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
50}
51
52/// Receives the puts and deletes of a write batch.
53///
54/// The application must provide an implementation of this trait when
55/// iterating the operations within a `WriteBatch`
56pub trait WriteBatchIterator {
57    /// Called with a key and value that were `put` into the batch.
58    fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>);
59    /// Called with a key that was `delete`d from the batch.
60    fn delete(&mut self, key: Box<[u8]>);
61}
62
63unsafe extern "C" fn writebatch_put_callback(
64    state: *mut c_void,
65    k: *const c_char,
66    klen: usize,
67    v: *const c_char,
68    vlen: usize,
69) {
70    // coerce the raw pointer back into a box, but "leak" it so we prevent
71    // freeing the resource before we are done with it
72    let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
73    let leaked_cb = Box::leak(boxed_cb);
74    let key = slice::from_raw_parts(k as *const u8, klen);
75    let value = slice::from_raw_parts(v as *const u8, vlen);
76    leaked_cb.put(
77        key.to_vec().into_boxed_slice(),
78        value.to_vec().into_boxed_slice(),
79    );
80}
81
82unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) {
83    // coerce the raw pointer back into a box, but "leak" it so we prevent
84    // freeing the resource before we are done with it
85    let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
86    let leaked_cb = Box::leak(boxed_cb);
87    let key = slice::from_raw_parts(k as *const u8, klen);
88    leaked_cb.delete(key.to_vec().into_boxed_slice());
89}
90
91impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
92    /// Construct with a reference to a byte array serialized by [`WriteBatch`].
93    pub fn from_data(data: &[u8]) -> Self {
94        unsafe {
95            let ptr = data.as_ptr();
96            let len = data.len();
97            Self {
98                inner: ffi::rocksdb_writebatch_create_from(
99                    ptr as *const libc::c_char,
100                    len as size_t,
101                ),
102            }
103        }
104    }
105
106    pub fn len(&self) -> usize {
107        unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
108    }
109
110    /// Return WriteBatch serialized size (in bytes).
111    pub fn size_in_bytes(&self) -> usize {
112        unsafe {
113            let mut batch_size: size_t = 0;
114            ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
115            batch_size
116        }
117    }
118
119    /// Return a reference to a byte array which represents a serialized version of the batch.
120    pub fn data(&self) -> &[u8] {
121        unsafe {
122            let mut batch_size: size_t = 0;
123            let batch_data = ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
124            std::slice::from_raw_parts(batch_data as _, batch_size)
125        }
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.len() == 0
130    }
131
132    /// Iterate the put and delete operations within this write batch. Note that
133    /// this does _not_ return an `Iterator` but instead will invoke the `put()`
134    /// and `delete()` member functions of the provided `WriteBatchIterator`
135    /// trait implementation.
136    pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) {
137        let state = Box::into_raw(Box::new(callbacks));
138        unsafe {
139            ffi::rocksdb_writebatch_iterate(
140                self.inner,
141                state as *mut c_void,
142                Some(writebatch_put_callback),
143                Some(writebatch_delete_callback),
144            );
145            // we must manually set the raw box free since there is no
146            // associated "destroy" callback for this object
147            drop(Box::from_raw(state));
148        }
149    }
150
151    /// Insert a value into the database under the given key.
152    pub fn put<K, V>(&mut self, key: K, value: V)
153    where
154        K: AsRef<[u8]>,
155        V: AsRef<[u8]>,
156    {
157        let key = key.as_ref();
158        let value = value.as_ref();
159
160        unsafe {
161            ffi::rocksdb_writebatch_put(
162                self.inner,
163                key.as_ptr() as *const c_char,
164                key.len() as size_t,
165                value.as_ptr() as *const c_char,
166                value.len() as size_t,
167            );
168        }
169    }
170
171    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
172    where
173        K: AsRef<[u8]>,
174        V: AsRef<[u8]>,
175    {
176        let key = key.as_ref();
177        let value = value.as_ref();
178
179        unsafe {
180            ffi::rocksdb_writebatch_put_cf(
181                self.inner,
182                cf.inner(),
183                key.as_ptr() as *const c_char,
184                key.len() as size_t,
185                value.as_ptr() as *const c_char,
186                value.len() as size_t,
187            );
188        }
189    }
190
191    pub fn merge<K, V>(&mut self, key: K, value: V)
192    where
193        K: AsRef<[u8]>,
194        V: AsRef<[u8]>,
195    {
196        let key = key.as_ref();
197        let value = value.as_ref();
198
199        unsafe {
200            ffi::rocksdb_writebatch_merge(
201                self.inner,
202                key.as_ptr() as *const c_char,
203                key.len() as size_t,
204                value.as_ptr() as *const c_char,
205                value.len() as size_t,
206            );
207        }
208    }
209
210    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
211    where
212        K: AsRef<[u8]>,
213        V: AsRef<[u8]>,
214    {
215        let key = key.as_ref();
216        let value = value.as_ref();
217
218        unsafe {
219            ffi::rocksdb_writebatch_merge_cf(
220                self.inner,
221                cf.inner(),
222                key.as_ptr() as *const c_char,
223                key.len() as size_t,
224                value.as_ptr() as *const c_char,
225                value.len() as size_t,
226            );
227        }
228    }
229
230    /// Removes the database entry for key. Does nothing if the key was not found.
231    pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
232        let key = key.as_ref();
233
234        unsafe {
235            ffi::rocksdb_writebatch_delete(
236                self.inner,
237                key.as_ptr() as *const c_char,
238                key.len() as size_t,
239            );
240        }
241    }
242
243    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
244        let key = key.as_ref();
245
246        unsafe {
247            ffi::rocksdb_writebatch_delete_cf(
248                self.inner,
249                cf.inner(),
250                key.as_ptr() as *const c_char,
251                key.len() as size_t,
252            );
253        }
254    }
255
256    /// Clear all updates buffered in this batch.
257    pub fn clear(&mut self) {
258        unsafe {
259            ffi::rocksdb_writebatch_clear(self.inner);
260        }
261    }
262}
263
264impl WriteBatchWithTransaction<false> {
265    /// Remove database entries from start key to end key.
266    ///
267    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
268    /// including "begin_key" and excluding "end_key". It is not an error if no
269    /// keys exist in the range ["begin_key", "end_key").
270    pub fn delete_range<K: AsRef<[u8]>>(&mut self, from: K, to: K) {
271        let (start_key, end_key) = (from.as_ref(), to.as_ref());
272
273        unsafe {
274            ffi::rocksdb_writebatch_delete_range(
275                self.inner,
276                start_key.as_ptr() as *const c_char,
277                start_key.len() as size_t,
278                end_key.as_ptr() as *const c_char,
279                end_key.len() as size_t,
280            );
281        }
282    }
283
284    /// Remove database entries in column family from start key to end key.
285    ///
286    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
287    /// including "begin_key" and excluding "end_key". It is not an error if no
288    /// keys exist in the range ["begin_key", "end_key").
289    pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
290        let (start_key, end_key) = (from.as_ref(), to.as_ref());
291
292        unsafe {
293            ffi::rocksdb_writebatch_delete_range_cf(
294                self.inner,
295                cf.inner(),
296                start_key.as_ptr() as *const c_char,
297                start_key.len() as size_t,
298                end_key.as_ptr() as *const c_char,
299                end_key.len() as size_t,
300            );
301        }
302    }
303}
304
305impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
306    fn default() -> Self {
307        Self {
308            inner: unsafe { ffi::rocksdb_writebatch_create() },
309        }
310    }
311}
312
313impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
314    fn drop(&mut self) {
315        unsafe {
316            ffi::rocksdb_writebatch_destroy(self.inner);
317        }
318    }
319}
320
321unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}