rocksdb/
merge_operator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! rustic merge operator
17//!
18//! ```
19//! use rocksdb::{Options, DB, MergeOperands};
20//!
21//! fn concat_merge(new_key: &[u8],
22//!                 existing_val: Option<&[u8]>,
23//!                 operands: &MergeOperands)
24//!                 -> Option<Vec<u8>> {
25//!
26//!    let mut result: Vec<u8> = Vec::with_capacity(operands.len());
27//!    existing_val.map(|v| {
28//!        for e in v {
29//!            result.push(*e)
30//!        }
31//!    });
32//!    for op in operands {
33//!        for e in op {
34//!            result.push(*e)
35//!        }
36//!    }
37//!    Some(result)
38//! }
39//!
40//!let tempdir = tempfile::Builder::new()
41//!    .prefix("_rust_path_to_rocksdb")
42//!    .tempdir()
43//!    .expect("Failed to create temporary path for the _rust_path_to_rocksdb");
44//!let path = tempdir.path();
45//!let mut opts = Options::default();
46//!
47//!opts.create_if_missing(true);
48//!opts.set_merge_operator_associative("test operator", concat_merge);
49//!{
50//!    let db = DB::open(&opts, path).unwrap();
51//!    let p = db.put(b"k1", b"a");
52//!    db.merge(b"k1", b"b");
53//!    db.merge(b"k1", b"c");
54//!    db.merge(b"k1", b"d");
55//!    db.merge(b"k1", b"efg");
56//!    let r = db.get(b"k1");
57//!    assert_eq!(r.unwrap().unwrap(), b"abcdefg");
58//!}
59//!let _ = DB::destroy(&opts, path);
60//! ```
61
62use libc::{self, c_char, c_int, c_void, size_t};
63use std::ffi::CString;
64use std::mem;
65use std::ptr;
66use std::slice;
67
68pub trait MergeFn:
69    Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
70{
71}
72impl<F> MergeFn for F where
73    F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
74{
75}
76
77pub struct MergeOperatorCallback<F: MergeFn, PF: MergeFn> {
78    pub name: CString,
79    pub full_merge_fn: F,
80    pub partial_merge_fn: PF,
81}
82
83pub unsafe extern "C" fn destructor_callback<F: MergeFn, PF: MergeFn>(raw_cb: *mut c_void) {
84    drop(unsafe { Box::from_raw(raw_cb as *mut MergeOperatorCallback<F, PF>) });
85}
86
87pub unsafe extern "C" fn delete_callback(
88    _raw_cb: *mut c_void,
89    value: *const c_char,
90    value_length: size_t,
91) {
92    if !value.is_null() {
93        drop(unsafe { Box::from_raw(slice::from_raw_parts_mut(value as *mut u8, value_length)) });
94    }
95}
96
97pub unsafe extern "C" fn name_callback<F: MergeFn, PF: MergeFn>(
98    raw_cb: *mut c_void,
99) -> *const c_char {
100    let cb = unsafe { &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>) };
101    cb.name.as_ptr()
102}
103
104pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
105    raw_cb: *mut c_void,
106    raw_key: *const c_char,
107    key_len: size_t,
108    existing_value: *const c_char,
109    existing_value_len: size_t,
110    operands_list: *const *const c_char,
111    operands_list_len: *const size_t,
112    num_operands: c_int,
113    success: *mut u8,
114    new_value_length: *mut size_t,
115) -> *mut c_char {
116    let cb = unsafe { &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>) };
117    let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
118    let key = unsafe { slice::from_raw_parts(raw_key as *const u8, key_len) };
119    let oldval = if existing_value.is_null() {
120        None
121    } else {
122        Some(unsafe { slice::from_raw_parts(existing_value as *const u8, existing_value_len) })
123    };
124    (cb.full_merge_fn)(key, oldval, operands).map_or_else(
125        || {
126            unsafe { *new_value_length = 0 };
127            unsafe { *success = 0_u8 };
128            ptr::null_mut() as *mut c_char
129        },
130        |result| {
131            unsafe { *new_value_length = result.len() as size_t };
132            unsafe { *success = 1_u8 };
133            Box::into_raw(result.into_boxed_slice()) as *mut c_char
134        },
135    )
136}
137
138pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
139    raw_cb: *mut c_void,
140    raw_key: *const c_char,
141    key_len: size_t,
142    operands_list: *const *const c_char,
143    operands_list_len: *const size_t,
144    num_operands: c_int,
145    success: *mut u8,
146    new_value_length: *mut size_t,
147) -> *mut c_char {
148    let cb = unsafe { &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>) };
149    let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
150    let key = unsafe { slice::from_raw_parts(raw_key as *const u8, key_len) };
151    (cb.partial_merge_fn)(key, None, operands).map_or_else(
152        || {
153            unsafe { *new_value_length = 0 };
154            unsafe { *success = 0_u8 };
155            ptr::null_mut::<c_char>()
156        },
157        |result| {
158            unsafe { *new_value_length = result.len() as size_t };
159            unsafe { *success = 1_u8 };
160            Box::into_raw(result.into_boxed_slice()) as *mut c_char
161        },
162    )
163}
164
165pub struct MergeOperands {
166    operands_list: *const *const c_char,
167    operands_list_len: *const size_t,
168    num_operands: usize,
169}
170
171impl MergeOperands {
172    fn new(
173        operands_list: *const *const c_char,
174        operands_list_len: *const size_t,
175        num_operands: c_int,
176    ) -> MergeOperands {
177        assert!(num_operands >= 0);
178        MergeOperands {
179            operands_list,
180            operands_list_len,
181            num_operands: num_operands as usize,
182        }
183    }
184
185    pub fn len(&self) -> usize {
186        self.num_operands
187    }
188
189    pub fn is_empty(&self) -> bool {
190        self.num_operands == 0
191    }
192
193    pub fn iter(&self) -> MergeOperandsIter {
194        MergeOperandsIter {
195            operands: self,
196            cursor: 0,
197        }
198    }
199
200    fn get_operand(&self, index: usize) -> Option<&[u8]> {
201        if index >= self.num_operands {
202            None
203        } else {
204            unsafe {
205                let base = self.operands_list as usize;
206                let base_len = self.operands_list_len as usize;
207                let spacing = mem::size_of::<*const *const u8>();
208                let spacing_len = mem::size_of::<*const size_t>();
209                let len_ptr = (base_len + (spacing_len * index)) as *const size_t;
210                let len = *len_ptr;
211                let ptr = base + (spacing * index);
212                Some(slice::from_raw_parts(*(ptr as *const *const u8), len))
213            }
214        }
215    }
216}
217
218pub struct MergeOperandsIter<'a> {
219    operands: &'a MergeOperands,
220    cursor: usize,
221}
222
223impl<'a> Iterator for MergeOperandsIter<'a> {
224    type Item = &'a [u8];
225
226    fn next(&mut self) -> Option<Self::Item> {
227        let operand = self.operands.get_operand(self.cursor)?;
228        self.cursor += 1;
229        Some(operand)
230    }
231
232    fn size_hint(&self) -> (usize, Option<usize>) {
233        let remaining = self.operands.num_operands - self.cursor;
234        (remaining, Some(remaining))
235    }
236}
237
238impl<'a> IntoIterator for &'a MergeOperands {
239    type Item = &'a [u8];
240    type IntoIter = MergeOperandsIter<'a>;
241
242    fn into_iter(self) -> Self::IntoIter {
243        Self::IntoIter {
244            operands: self,
245            cursor: 0,
246        }
247    }
248}