persistcli/maelstrom/
services.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementations of Maelstrom services as persist external durability
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use bytes::Bytes;
18use mz_ore::bytes::SegmentedBytes;
19use mz_persist::location::{
20    Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, ResultStream, SeqNo,
21    VersionedData,
22};
23use mz_repr::TimestampManipulation;
24use mz_timestamp_oracle::{TimestampOracle, WriteTimestamp};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27use tokio::sync::Mutex;
28use tracing::debug;
29
30use crate::maelstrom::api::{ErrorCode, MaelstromError};
31use crate::maelstrom::node::Handle;
32
33/// An adaptor for [VersionedData] that implements [Serialize] and
34/// [Deserialize].
35#[derive(Clone, Debug, Serialize, Deserialize)]
36struct MaelstromVersionedData {
37    seqno: u64,
38    data: Vec<u8>,
39}
40
41/// Implementation of [Consensus] backed by the Maelstrom lin-kv service.
42#[derive(Debug)]
43pub struct MaelstromConsensus {
44    handle: Handle,
45    // A cache of SeqNo -> the data for that SeqNo. Here because the [Consensus]
46    // interface uses only a SeqNo for expected/from, but the lin-kv Maelstrom
47    // service requires the whole VersionedData. This is only used to hydrate
48    // the expected/from in the impl of `compare_and_set`.
49    //
50    // This cache exists primarily to keep our usage of maelstrom services
51    // "clean". Instead of the cache, we could use the consensus `head` call to
52    // hydrate SeqNos, but it's really nice to have a 1:1 relationship between
53    // Consensus calls and Maelstrom service calls when looking at the Lamport
54    // diagrams that Maelstrom emits.
55    //
56    // It also secondarily means that more stale expectations make it to the
57    // Maelstrom CaS call (with the `head` alternative we'd discover some then),
58    // but this is mostly a side benefit.
59    cache: Mutex<BTreeMap<(String, SeqNo), Vec<u8>>>,
60}
61
62impl MaelstromConsensus {
63    pub fn new(handle: Handle) -> Arc<dyn Consensus> {
64        Arc::new(MaelstromConsensus {
65            handle,
66            cache: Mutex::new(BTreeMap::new()),
67        })
68    }
69
70    pub async fn hydrate_seqno(
71        &self,
72        key: &str,
73        expected: SeqNo,
74    ) -> Result<Result<VersionedData, Option<VersionedData>>, ExternalError> {
75        if let Some(data) = self.cache.lock().await.get(&(key.to_string(), expected)) {
76            let value = VersionedData {
77                seqno: expected.clone(),
78                data: Bytes::from(data.clone()),
79            };
80            return Ok(Ok(value));
81        }
82
83        // It wasn't in the cache (must have been set by another process), fetch
84        // head and see if that matches.
85        match self.head(key).await? {
86            Some(current) if current.seqno == expected => Ok(Ok(current)),
87            x => Ok(Err(x)),
88        }
89    }
90}
91
92#[async_trait]
93impl Consensus for MaelstromConsensus {
94    fn list_keys(&self) -> ResultStream<String> {
95        unimplemented!("TODO")
96    }
97
98    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
99        let value = match self
100            .handle
101            .lin_kv_read(Value::from(format!("consensus/{}", key)))
102            .await
103            .map_err(anyhow::Error::new)?
104        {
105            Some(x) => x,
106            None => return Ok(None),
107        };
108        let value = VersionedData::from(MaelstromVersionedData::try_from(&value)?);
109        self.cache
110            .lock()
111            .await
112            .insert((key.to_string(), value.seqno), value.data.to_vec());
113        Ok(Some(value))
114    }
115
116    async fn compare_and_set(
117        &self,
118        key: &str,
119        expected: Option<SeqNo>,
120        new: VersionedData,
121    ) -> Result<CaSResult, ExternalError> {
122        let create_if_not_exists = expected.is_none();
123
124        let from = match expected {
125            Some(expected) => match self.hydrate_seqno(key, expected).await? {
126                Ok(x) => Value::from(&MaelstromVersionedData::from(x)),
127                Err(_) => {
128                    return Ok(CaSResult::ExpectationMismatch);
129                }
130            },
131            None => Value::Null,
132        };
133        let new = MaelstromVersionedData::from(new);
134        let to = Value::from(&new);
135        let cas_res = self
136            .handle
137            .lin_kv_compare_and_set(
138                Value::from(format!("consensus/{}", key)),
139                from,
140                to,
141                Some(create_if_not_exists),
142            )
143            .await;
144        match cas_res {
145            Ok(()) => {
146                self.cache
147                    .lock()
148                    .await
149                    .insert((key.to_string(), SeqNo(new.seqno)), new.data.clone());
150                Ok(CaSResult::Committed)
151            }
152            Err(MaelstromError {
153                code: ErrorCode::PreconditionFailed,
154                ..
155            }) => Ok(CaSResult::ExpectationMismatch),
156            Err(err) => Err(ExternalError::from(anyhow::Error::new(err))),
157        }
158    }
159
160    async fn scan(
161        &self,
162        _key: &str,
163        _from: SeqNo,
164        _limit: usize,
165    ) -> Result<Vec<VersionedData>, ExternalError> {
166        unimplemented!("TODO")
167    }
168
169    async fn truncate(&self, _key: &str, _seqno: SeqNo) -> Result<usize, ExternalError> {
170        unimplemented!("TODO")
171    }
172}
173
174/// Implementation of [Blob] backed by the Maelstrom lin-kv service.
175#[derive(Debug)]
176pub struct MaelstromBlob {
177    handle: Handle,
178}
179
180impl MaelstromBlob {
181    pub fn new(handle: Handle) -> Arc<dyn Blob> {
182        Arc::new(MaelstromBlob { handle })
183    }
184}
185
186#[async_trait]
187impl Blob for MaelstromBlob {
188    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
189        let value = match self
190            .handle
191            .lin_kv_read(Value::from(format!("blob/{}", key)))
192            .await
193            .map_err(anyhow::Error::new)?
194        {
195            Some(x) => x,
196            None => return Ok(None),
197        };
198        let value = value
199            .as_str()
200            .ok_or_else(|| anyhow!("invalid blob at {}: {:?}", key, value))?;
201        let value: Vec<u8> = serde_json::from_str(value)
202            .map_err(|err| anyhow!("invalid blob at {}: {}", key, err))?;
203        Ok(Some(SegmentedBytes::from(value)))
204    }
205
206    async fn list_keys_and_metadata(
207        &self,
208        _key_prefix: &str,
209        _f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
210    ) -> Result<(), ExternalError> {
211        unimplemented!("not yet used")
212    }
213
214    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
215        let value = serde_json::to_string(value.as_ref()).expect("failed to serialize value");
216        self.handle
217            .lin_kv_write(Value::from(format!("blob/{}", key)), Value::from(value))
218            .await
219            .map_err(anyhow::Error::new)?;
220        Ok(())
221    }
222
223    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
224        // Setting the value to Null is as close as we can get with lin_kv.
225        self.handle
226            .lin_kv_write(Value::from(format!("blob/{}", key)), Value::Null)
227            .await
228            .map_err(anyhow::Error::new)?;
229        // The "existed" return value only controls our metrics, so I suppose
230        // it's okay if its wrong in Maelstrom.
231        Ok(Some(0))
232    }
233
234    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
235        let read = self
236            .handle
237            .lin_kv_read(Value::from(format!("blob/{}", key)))
238            .await
239            .map_err(anyhow::Error::new)?;
240        if read.is_none() {
241            return Err(
242                Determinate::new(anyhow!("key {key} not present in the maelstrom store")).into(),
243            );
244        }
245        Ok(())
246    }
247}
248
249/// Implementation of [Blob] that caches get calls.
250///
251/// NB: It intentionally does not store successful set calls in the cache, so
252/// that a blob gets fetched at least once (exercising those code paths).
253#[derive(Debug)]
254pub struct CachingBlob {
255    blob: Arc<dyn Blob>,
256    cache: Mutex<BTreeMap<String, SegmentedBytes>>,
257}
258
259impl CachingBlob {
260    pub fn new(blob: Arc<dyn Blob>) -> Arc<dyn Blob> {
261        Arc::new(CachingBlob {
262            blob,
263            cache: Mutex::new(BTreeMap::new()),
264        })
265    }
266}
267
268#[async_trait]
269impl Blob for CachingBlob {
270    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
271        // Fetch the cached value if there is one.
272        let cache = self.cache.lock().await;
273        if let Some(value) = cache.get(key) {
274            return Ok(Some(value.clone()));
275        }
276        // Make sure not to hold the cache lock across the forwarded `get` call.
277        drop(cache);
278
279        // We didn't get a cache hit, fetch the value and update the cache.
280        let value = self.blob.get(key).await?;
281        if let Some(value) = &value {
282            // Everything in persist is write-once modify-never, so until we add
283            // support for deletions to CachingBlob, we're free to blindly
284            // overwrite whatever is in the cache.
285            self.cache
286                .lock()
287                .await
288                .insert(key.to_owned(), value.clone());
289        }
290
291        Ok(value)
292    }
293
294    async fn list_keys_and_metadata(
295        &self,
296        key_prefix: &str,
297        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
298    ) -> Result<(), ExternalError> {
299        self.blob.list_keys_and_metadata(key_prefix, f).await
300    }
301
302    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
303        // Intentionally don't put this in the cache on set, so that this blob
304        // gets fetched at least once (exercising those code paths).
305        self.blob.set(key, value).await
306    }
307
308    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
309        self.cache.lock().await.remove(key);
310        self.blob.delete(key).await
311    }
312
313    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
314        self.blob.restore(key).await
315    }
316}
317
318#[derive(Debug)]
319pub struct MaelstromOracle {
320    read_ts: MaelstromOracleKey,
321    write_ts: MaelstromOracleKey,
322}
323
324#[derive(Debug)]
325pub struct MaelstromOracleKey {
326    handle: Handle,
327    key: String,
328    expected: u64,
329}
330
331// TODO: Make this implement TimestampOracle.
332impl MaelstromOracle {
333    pub async fn new(handle: Handle) -> Result<Self, ExternalError> {
334        let read_ts = MaelstromOracleKey::new(handle.clone(), "tso_read", 0).await?;
335        let write_ts = MaelstromOracleKey::new(handle.clone(), "tso_write", 0).await?;
336        Ok(MaelstromOracle { read_ts, write_ts })
337    }
338
339    pub async fn write_ts(&mut self) -> Result<u64, ExternalError> {
340        let prev = self.write_ts.cas(|expected| Some(expected + 1)).await?;
341        Ok(prev)
342    }
343
344    pub async fn peek_write_ts(&mut self) -> Result<u64, ExternalError> {
345        self.write_ts.peek().await
346    }
347
348    pub async fn read_ts(&mut self) -> Result<u64, ExternalError> {
349        self.read_ts.peek().await
350    }
351
352    pub async fn apply_write(&mut self, lower_bound: u64) -> Result<(), ExternalError> {
353        let write_prev = self
354            .write_ts
355            .cas(|expected| (expected < lower_bound).then_some(lower_bound))
356            .await?;
357        let read_prev = self
358            .read_ts
359            .cas(|expected| (expected < lower_bound).then_some(lower_bound))
360            .await?;
361        debug!(
362            "apply_write {} write_prev={} write_new={} read_prev={} read_new={}",
363            lower_bound, write_prev, self.write_ts.expected, read_prev, self.read_ts.expected
364        );
365        Ok(())
366    }
367}
368
369impl MaelstromOracleKey {
370    async fn new(handle: Handle, key: &str, init_ts: u64) -> Result<Self, ExternalError> {
371        let res = handle
372            .lin_kv_compare_and_set(
373                Value::from(key),
374                Value::Null,
375                Value::from(init_ts),
376                Some(true),
377            )
378            .await;
379        // Either of these answers indicate that we created the key.
380        match Self::cas_res(res)? {
381            CaSResult::Committed => {}
382            CaSResult::ExpectationMismatch => {}
383        }
384        Ok(MaelstromOracleKey {
385            handle,
386            key: key.to_owned(),
387            expected: init_ts,
388        })
389    }
390
391    fn cas_res(res: Result<(), MaelstromError>) -> Result<CaSResult, ExternalError> {
392        match res {
393            Ok(()) => Ok(CaSResult::Committed),
394            Err(MaelstromError {
395                code: ErrorCode::PreconditionFailed,
396                ..
397            }) => Ok(CaSResult::ExpectationMismatch),
398            Err(err) => Err(anyhow::Error::new(err).into()),
399        }
400    }
401
402    async fn peek(&mut self) -> Result<u64, ExternalError> {
403        let value = self
404            .handle
405            .lin_kv_read(Value::from(self.key.as_str()))
406            .await
407            .map_err(anyhow::Error::new)?
408            .expect("ts oracle should be initialized");
409        let current = value
410            .as_u64()
411            .ok_or_else(|| anyhow!("invalid {} value: {:?}", self.key, value))?;
412        assert!(self.expected <= current);
413        self.expected = current;
414        Ok(current)
415    }
416
417    async fn cas(&mut self, new_fn: impl Fn(u64) -> Option<u64>) -> Result<u64, ExternalError> {
418        loop {
419            let new = match new_fn(self.expected) {
420                Some(x) => x,
421                None => {
422                    // The latest cached value is good enough, early exit.
423                    return Ok(self.expected);
424                }
425            };
426            let res = self
427                .handle
428                .lin_kv_compare_and_set(
429                    Value::from(self.key.as_str()),
430                    Value::from(self.expected),
431                    Value::from(new),
432                    None,
433                )
434                .await;
435            match Self::cas_res(res)? {
436                CaSResult::Committed => {
437                    let prev = self.expected;
438                    self.expected = new;
439                    return Ok(prev);
440                }
441                CaSResult::ExpectationMismatch => {
442                    self.expected = self.peek().await?;
443                    continue;
444                }
445            };
446        }
447    }
448}
449
450#[derive(Debug, Default)]
451pub struct MemTimestampOracle<T> {
452    read_write_ts: Arc<std::sync::Mutex<(T, T)>>,
453}
454
455#[async_trait]
456impl<T: TimestampManipulation> TimestampOracle<T> for MemTimestampOracle<T> {
457    async fn write_ts(&self) -> WriteTimestamp<T> {
458        let (read_ts, write_ts) = &mut *self.read_write_ts.lock().expect("lock poisoned");
459        let new_write_ts = TimestampManipulation::step_forward(std::cmp::max(read_ts, write_ts));
460        write_ts.clone_from(&new_write_ts);
461        WriteTimestamp {
462            advance_to: TimestampManipulation::step_forward(&new_write_ts),
463            timestamp: new_write_ts,
464        }
465    }
466
467    async fn peek_write_ts(&self) -> T {
468        let (_, write_ts) = &*self.read_write_ts.lock().expect("lock poisoned");
469        write_ts.clone()
470    }
471
472    async fn read_ts(&self) -> T {
473        let (read_ts, _) = &*self.read_write_ts.lock().expect("lock poisoned");
474        read_ts.clone()
475    }
476
477    async fn apply_write(&self, lower_bound: T) {
478        let (read_ts, write_ts) = &mut *self.read_write_ts.lock().expect("lock poisoned");
479        *read_ts = std::cmp::max(read_ts.clone(), lower_bound);
480        *write_ts = std::cmp::max(read_ts, write_ts).clone();
481    }
482}
483
484mod from_impls {
485    use bytes::Bytes;
486    use mz_persist::location::{ExternalError, SeqNo, VersionedData};
487    use serde_json::Value;
488
489    use crate::maelstrom::services::MaelstromVersionedData;
490
491    impl From<VersionedData> for MaelstromVersionedData {
492        fn from(x: VersionedData) -> Self {
493            MaelstromVersionedData {
494                seqno: x.seqno.0,
495                data: x.data.to_vec(),
496            }
497        }
498    }
499
500    impl From<MaelstromVersionedData> for VersionedData {
501        fn from(x: MaelstromVersionedData) -> Self {
502            VersionedData {
503                seqno: SeqNo(x.seqno),
504                data: Bytes::from(x.data),
505            }
506        }
507    }
508
509    impl From<&MaelstromVersionedData> for Value {
510        fn from(x: &MaelstromVersionedData) -> Self {
511            let json = serde_json::to_string(x).expect("MaelstromVersionedData wasn't valid json");
512            serde_json::from_str(&json).expect("MaelstromVersionedData wasn't valid json")
513        }
514    }
515
516    impl TryFrom<&Value> for MaelstromVersionedData {
517        type Error = ExternalError;
518
519        fn try_from(x: &Value) -> Result<Self, Self::Error> {
520            let json = serde_json::to_string(x)
521                .map_err(|err| ExternalError::from(anyhow::Error::new(err)))?;
522            serde_json::from_str(&json).map_err(|err| ExternalError::from(anyhow::Error::new(err)))
523        }
524    }
525}