rocksdb/
merge_operator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! rustic merge operator
17//!
18//! ```
19//! use rocksdb::{Options, DB, MergeOperands};
20//!
21//! fn concat_merge(new_key: &[u8],
22//!                 existing_val: Option<&[u8]>,
23//!                 operands: &MergeOperands)
24//!                 -> Option<Vec<u8>> {
25//!
26//!    let mut result: Vec<u8> = Vec::with_capacity(operands.len());
27//!    existing_val.map(|v| {
28//!        for e in v {
29//!            result.push(*e)
30//!        }
31//!    });
32//!    for op in operands {
33//!        for e in op {
34//!            result.push(*e)
35//!        }
36//!    }
37//!    Some(result)
38//! }
39//!
40//!let path = "_rust_path_to_rocksdb";
41//!let mut opts = Options::default();
42//!
43//!opts.create_if_missing(true);
44//!opts.set_merge_operator_associative("test operator", concat_merge);
45//!{
46//!    let db = DB::open(&opts, path).unwrap();
47//!    let p = db.put(b"k1", b"a");
48//!    db.merge(b"k1", b"b");
49//!    db.merge(b"k1", b"c");
50//!    db.merge(b"k1", b"d");
51//!    db.merge(b"k1", b"efg");
52//!    let r = db.get(b"k1");
53//!    assert_eq!(r.unwrap().unwrap(), b"abcdefg");
54//!}
55//!let _ = DB::destroy(&opts, path);
56//! ```
57
58use libc::{self, c_char, c_int, c_void, size_t};
59use std::ffi::CString;
60use std::mem;
61use std::ptr;
62use std::slice;
63
64pub trait MergeFn:
65    Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
66{
67}
68impl<F> MergeFn for F where
69    F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
70{
71}
72
73pub struct MergeOperatorCallback<F: MergeFn, PF: MergeFn> {
74    pub name: CString,
75    pub full_merge_fn: F,
76    pub partial_merge_fn: PF,
77}
78
79pub unsafe extern "C" fn destructor_callback<F: MergeFn, PF: MergeFn>(raw_cb: *mut c_void) {
80    drop(Box::from_raw(raw_cb as *mut MergeOperatorCallback<F, PF>));
81}
82
83pub unsafe extern "C" fn delete_callback(
84    _raw_cb: *mut c_void,
85    value: *const c_char,
86    value_length: size_t,
87) {
88    if !value.is_null() {
89        drop(Box::from_raw(slice::from_raw_parts_mut(
90            value as *mut u8,
91            value_length,
92        )));
93    }
94}
95
96pub unsafe extern "C" fn name_callback<F: MergeFn, PF: MergeFn>(
97    raw_cb: *mut c_void,
98) -> *const c_char {
99    let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
100    cb.name.as_ptr()
101}
102
103pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
104    raw_cb: *mut c_void,
105    raw_key: *const c_char,
106    key_len: size_t,
107    existing_value: *const c_char,
108    existing_value_len: size_t,
109    operands_list: *const *const c_char,
110    operands_list_len: *const size_t,
111    num_operands: c_int,
112    success: *mut u8,
113    new_value_length: *mut size_t,
114) -> *mut c_char {
115    let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
116    let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
117    let key = slice::from_raw_parts(raw_key as *const u8, key_len);
118    let oldval = if existing_value.is_null() {
119        None
120    } else {
121        Some(slice::from_raw_parts(
122            existing_value as *const u8,
123            existing_value_len,
124        ))
125    };
126    (cb.full_merge_fn)(key, oldval, operands).map_or_else(
127        || {
128            *new_value_length = 0;
129            *success = 0_u8;
130            ptr::null_mut() as *mut c_char
131        },
132        |result| {
133            *new_value_length = result.len() as size_t;
134            *success = 1_u8;
135            Box::into_raw(result.into_boxed_slice()) as *mut c_char
136        },
137    )
138}
139
140pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
141    raw_cb: *mut c_void,
142    raw_key: *const c_char,
143    key_len: size_t,
144    operands_list: *const *const c_char,
145    operands_list_len: *const size_t,
146    num_operands: c_int,
147    success: *mut u8,
148    new_value_length: *mut size_t,
149) -> *mut c_char {
150    let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
151    let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
152    let key = slice::from_raw_parts(raw_key as *const u8, key_len);
153    (cb.partial_merge_fn)(key, None, operands).map_or_else(
154        || {
155            *new_value_length = 0;
156            *success = 0_u8;
157            ptr::null_mut::<c_char>()
158        },
159        |result| {
160            *new_value_length = result.len() as size_t;
161            *success = 1_u8;
162            Box::into_raw(result.into_boxed_slice()) as *mut c_char
163        },
164    )
165}
166
167pub struct MergeOperands {
168    operands_list: *const *const c_char,
169    operands_list_len: *const size_t,
170    num_operands: usize,
171}
172
173impl MergeOperands {
174    fn new(
175        operands_list: *const *const c_char,
176        operands_list_len: *const size_t,
177        num_operands: c_int,
178    ) -> MergeOperands {
179        assert!(num_operands >= 0);
180        MergeOperands {
181            operands_list,
182            operands_list_len,
183            num_operands: num_operands as usize,
184        }
185    }
186
187    pub fn len(&self) -> usize {
188        self.num_operands
189    }
190
191    pub fn is_empty(&self) -> bool {
192        self.num_operands == 0
193    }
194
195    pub fn iter(&self) -> MergeOperandsIter {
196        MergeOperandsIter {
197            operands: self,
198            cursor: 0,
199        }
200    }
201
202    fn get_operand(&self, index: usize) -> Option<&[u8]> {
203        if index >= self.num_operands {
204            None
205        } else {
206            unsafe {
207                let base = self.operands_list as usize;
208                let base_len = self.operands_list_len as usize;
209                let spacing = mem::size_of::<*const *const u8>();
210                let spacing_len = mem::size_of::<*const size_t>();
211                let len_ptr = (base_len + (spacing_len * index)) as *const size_t;
212                let len = *len_ptr;
213                let ptr = base + (spacing * index);
214                Some(slice::from_raw_parts(
215                    *(ptr as *const *const u8) as *const u8,
216                    len,
217                ))
218            }
219        }
220    }
221}
222
223pub struct MergeOperandsIter<'a> {
224    operands: &'a MergeOperands,
225    cursor: usize,
226}
227
228impl<'a> Iterator for MergeOperandsIter<'a> {
229    type Item = &'a [u8];
230
231    fn next(&mut self) -> Option<Self::Item> {
232        let operand = self.operands.get_operand(self.cursor)?;
233        self.cursor += 1;
234        Some(operand)
235    }
236
237    fn size_hint(&self) -> (usize, Option<usize>) {
238        let remaining = self.operands.num_operands - self.cursor;
239        (remaining, Some(remaining))
240    }
241}
242
243impl<'a> IntoIterator for &'a MergeOperands {
244    type Item = &'a [u8];
245    type IntoIter = MergeOperandsIter<'a>;
246
247    fn into_iter(self) -> Self::IntoIter {
248        Self::IntoIter {
249            operands: self,
250            cursor: 0,
251        }
252    }
253}