Skip to main content

mz_persist/
mem.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory implementations for testing and benchmarking.
11
12use std::collections::BTreeMap;
13use std::sync::{Arc, Mutex};
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use bytes::Bytes;
18use futures_util::{StreamExt, stream};
19use mz_ore::bytes::SegmentedBytes;
20use mz_ore::cast::CastFrom;
21use mz_ore::future::yield_now;
22
23use crate::error::Error;
24use crate::location::{
25    Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, ResultStream, SeqNo,
26    VersionedData,
27};
28
29/// An in-memory representation of a set of [Log]s and [Blob]s that can be reused
30/// across dataflows
31#[cfg(test)]
32#[derive(Debug)]
33pub struct MemMultiRegistry {
34    blob_by_path: BTreeMap<String, Arc<tokio::sync::Mutex<MemBlobCore>>>,
35    tombstone: bool,
36}
37
38#[cfg(test)]
39impl MemMultiRegistry {
40    /// Constructs a new, empty [MemMultiRegistry].
41    pub fn new(tombstone: bool) -> Self {
42        MemMultiRegistry {
43            blob_by_path: BTreeMap::new(),
44            tombstone,
45        }
46    }
47
48    /// Opens a [MemBlob] associated with `path`.
49    ///
50    /// TODO: Replace this with PersistClientCache once they're in the same
51    /// crate.
52    pub fn blob(&mut self, path: &str) -> MemBlob {
53        if let Some(blob) = self.blob_by_path.get(path) {
54            MemBlob::open(MemBlobConfig {
55                core: Arc::clone(blob),
56            })
57        } else {
58            let blob = Arc::new(tokio::sync::Mutex::new(MemBlobCore {
59                dataz: Default::default(),
60                tombstone: self.tombstone,
61            }));
62            self.blob_by_path
63                .insert(path.to_string(), Arc::clone(&blob));
64            MemBlob::open(MemBlobConfig { core: blob })
65        }
66    }
67}
68
69#[derive(Debug, Default)]
70struct MemBlobCore {
71    dataz: BTreeMap<String, (Bytes, bool)>,
72    tombstone: bool,
73}
74
75impl MemBlobCore {
76    fn get(&self, key: &str) -> Result<Option<Bytes>, ExternalError> {
77        Ok(self
78            .dataz
79            .get(key)
80            .and_then(|(x, exists)| exists.then(|| Bytes::clone(x))))
81    }
82
83    fn set(&mut self, key: &str, value: Bytes) -> Result<(), ExternalError> {
84        self.dataz.insert(key.to_owned(), (value, true));
85        Ok(())
86    }
87
88    fn list_keys_and_metadata(
89        &self,
90        key_prefix: &str,
91        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
92    ) -> Result<(), ExternalError> {
93        for (key, (value, exists)) in &self.dataz {
94            if !*exists || !key.starts_with(key_prefix) {
95                continue;
96            }
97
98            f(BlobMetadata {
99                key,
100                size_in_bytes: u64::cast_from(value.len()),
101            });
102        }
103
104        Ok(())
105    }
106
107    fn delete(&mut self, key: &str) -> Result<Option<usize>, ExternalError> {
108        let bytes = if self.tombstone {
109            self.dataz.get_mut(key).and_then(|(x, exists)| {
110                let deleted_size = exists.then(|| x.len());
111                *exists = false;
112                deleted_size
113            })
114        } else {
115            self.dataz.remove(key).map(|(x, _)| x.len())
116        };
117        Ok(bytes)
118    }
119
120    fn restore(&mut self, key: &str) -> Result<(), ExternalError> {
121        match self.dataz.get_mut(key) {
122            None => Err(
123                Determinate::new(anyhow!("unable to restore {key} from in-memory state")).into(),
124            ),
125            Some((_, exists)) => {
126                *exists = true;
127                Ok(())
128            }
129        }
130    }
131}
132
133/// Configuration for opening a [MemBlob].
134#[derive(Debug, Default)]
135pub struct MemBlobConfig {
136    core: Arc<tokio::sync::Mutex<MemBlobCore>>,
137}
138
139impl MemBlobConfig {
140    /// Create a new instance.
141    pub fn new(tombstone: bool) -> Self {
142        Self {
143            core: Arc::new(tokio::sync::Mutex::new(MemBlobCore {
144                dataz: Default::default(),
145                tombstone,
146            })),
147        }
148    }
149}
150
151/// An in-memory implementation of [Blob].
152#[derive(Clone, Debug)]
153pub struct MemBlob {
154    core: Arc<tokio::sync::Mutex<MemBlobCore>>,
155}
156
157impl MemBlob {
158    /// Opens the given location for non-exclusive read-write access.
159    pub fn open(config: MemBlobConfig) -> Self {
160        MemBlob { core: config.core }
161    }
162}
163
164#[async_trait]
165impl Blob for MemBlob {
166    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
167        // Yield to maximize our chances for getting interesting orderings.
168        let () = yield_now().await;
169        let maybe_bytes = self.core.lock().await.get(key)?;
170        Ok(maybe_bytes.map(SegmentedBytes::from))
171    }
172
173    async fn list_keys_and_metadata(
174        &self,
175        key_prefix: &str,
176        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
177    ) -> Result<(), ExternalError> {
178        // Yield to maximize our chances for getting interesting orderings.
179        let () = yield_now().await;
180        self.core.lock().await.list_keys_and_metadata(key_prefix, f)
181    }
182
183    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
184        // Yield to maximize our chances for getting interesting orderings.
185        let () = yield_now().await;
186        // NB: This is always atomic, so we're free to ignore the atomic param.
187        self.core.lock().await.set(key, value)
188    }
189
190    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
191        // Yield to maximize our chances for getting interesting orderings.
192        let () = yield_now().await;
193        self.core.lock().await.delete(key)
194    }
195
196    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
197        // Yield to maximize our chances for getting interesting orderings.
198        let () = yield_now().await;
199        self.core.lock().await.restore(key)
200    }
201}
202
203/// An in-memory implementation of [Consensus].
204#[derive(Clone, Debug)]
205pub struct MemConsensus {
206    // TODO: This was intended to be a tokio::sync::Mutex but that seems to
207    // regularly deadlock in the `concurrency` test.
208    data: Arc<Mutex<BTreeMap<String, Vec<VersionedData>>>>,
209}
210
211impl Default for MemConsensus {
212    fn default() -> Self {
213        Self {
214            data: Arc::new(Mutex::new(BTreeMap::new())),
215        }
216    }
217}
218
219impl MemConsensus {
220    fn scan_store(
221        store: &BTreeMap<String, Vec<VersionedData>>,
222        key: &str,
223        from: SeqNo,
224        limit: usize,
225    ) -> Result<Vec<VersionedData>, ExternalError> {
226        let results = if let Some(values) = store.get(key) {
227            let from_idx = values.partition_point(|x| x.seqno < from);
228            let from_values = &values[from_idx..];
229            let from_values = &from_values[..usize::min(limit, from_values.len())];
230            from_values.to_vec()
231        } else {
232            Vec::new()
233        };
234        Ok(results)
235    }
236}
237
238#[async_trait]
239impl Consensus for MemConsensus {
240    fn list_keys(&self) -> ResultStream<'_, String> {
241        // Yield to maximize our chances for getting interesting orderings.
242        let store = self.data.lock().expect("lock poisoned");
243        let keys: Vec<_> = store.keys().cloned().collect();
244        Box::pin(stream::iter(keys).map(Ok))
245    }
246
247    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
248        // Yield to maximize our chances for getting interesting orderings.
249        let () = yield_now().await;
250        let store = self.data.lock().map_err(Error::from)?;
251        let values = match store.get(key) {
252            None => return Ok(None),
253            Some(values) => values,
254        };
255
256        Ok(values.last().cloned())
257    }
258
259    async fn compare_and_set(
260        &self,
261        key: &str,
262        new: VersionedData,
263    ) -> Result<CaSResult, ExternalError> {
264        let expected = new.seqno.previous();
265        // Yield to maximize our chances for getting interesting orderings.
266        let () = yield_now().await;
267
268        if new.seqno.0 > i64::MAX.try_into().expect("i64::MAX known to fit in u64") {
269            return Err(ExternalError::from(anyhow!(
270                "sequence numbers must fit within [0, i64::MAX], received: {:?}",
271                new.seqno
272            )));
273        }
274        let mut store = self.data.lock().map_err(Error::from)?;
275
276        let data = match store.get(key) {
277            None => None,
278            Some(values) => values.last(),
279        };
280
281        let seqno = data.as_ref().map(|data| data.seqno);
282
283        if seqno != expected {
284            return Ok(CaSResult::ExpectationMismatch);
285        }
286
287        store.entry(key.to_string()).or_default().push(new);
288
289        Ok(CaSResult::Committed)
290    }
291
292    async fn scan(
293        &self,
294        key: &str,
295        from: SeqNo,
296        limit: usize,
297    ) -> Result<Vec<VersionedData>, ExternalError> {
298        // Yield to maximize our chances for getting interesting orderings.
299        let () = yield_now().await;
300        let store = self.data.lock().map_err(Error::from)?;
301        Self::scan_store(&store, key, from, limit)
302    }
303
304    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
305        // Yield to maximize our chances for getting interesting orderings.
306        let () = yield_now().await;
307        let current = self.head(key).await?;
308        if current.map_or(true, |data| data.seqno < seqno) {
309            return Err(ExternalError::from(anyhow!(
310                "upper bound too high for truncate: {:?}",
311                seqno
312            )));
313        }
314
315        let mut store = self.data.lock().map_err(Error::from)?;
316
317        let mut deleted = 0;
318        if let Some(values) = store.get_mut(key) {
319            let count_before = values.len();
320            values.retain(|val| val.seqno >= seqno);
321            deleted += count_before - values.len();
322        }
323
324        Ok(Some(deleted))
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use crate::location::tests::{blob_impl_test, consensus_impl_test};
331
332    use super::*;
333
334    #[mz_ore::test(tokio::test)]
335    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
336    async fn mem_blob() -> Result<(), ExternalError> {
337        let registry = Arc::new(tokio::sync::Mutex::new(MemMultiRegistry::new(false)));
338        blob_impl_test(move |path| {
339            let path = path.to_owned();
340            let registry = Arc::clone(&registry);
341            async move { Ok(registry.lock().await.blob(&path)) }
342        })
343        .await?;
344
345        let registry = Arc::new(tokio::sync::Mutex::new(MemMultiRegistry::new(true)));
346        blob_impl_test(move |path| {
347            let path = path.to_owned();
348            let registry = Arc::clone(&registry);
349            async move { Ok(registry.lock().await.blob(&path)) }
350        })
351        .await?;
352
353        Ok(())
354    }
355
356    #[mz_ore::test(tokio::test)]
357    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
358    async fn mem_consensus() -> Result<(), ExternalError> {
359        consensus_impl_test(|| async { Ok(MemConsensus::default()) }).await
360    }
361}