1use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use bytes::Bytes;
18use mz_ore::bytes::SegmentedBytes;
19use mz_persist::location::{
20 Blob, BlobMetadata, CaSResult, Consensus, Determinate, ExternalError, ResultStream, SeqNo,
21 VersionedData,
22};
23use mz_repr::TimestampManipulation;
24use mz_timestamp_oracle::{TimestampOracle, WriteTimestamp};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27use tokio::sync::Mutex;
28use tracing::debug;
29
30use crate::maelstrom::api::{ErrorCode, MaelstromError};
31use crate::maelstrom::node::Handle;
32
33#[derive(Clone, Debug, Serialize, Deserialize)]
36struct MaelstromVersionedData {
37 seqno: u64,
38 data: Vec<u8>,
39}
40
41#[derive(Debug)]
43pub struct MaelstromConsensus {
44 handle: Handle,
45 cache: Mutex<BTreeMap<(String, SeqNo), Vec<u8>>>,
60}
61
62impl MaelstromConsensus {
63 pub fn new(handle: Handle) -> Arc<dyn Consensus> {
64 Arc::new(MaelstromConsensus {
65 handle,
66 cache: Mutex::new(BTreeMap::new()),
67 })
68 }
69
70 pub async fn hydrate_seqno(
71 &self,
72 key: &str,
73 expected: SeqNo,
74 ) -> Result<Result<VersionedData, Option<VersionedData>>, ExternalError> {
75 if let Some(data) = self.cache.lock().await.get(&(key.to_string(), expected)) {
76 let value = VersionedData {
77 seqno: expected.clone(),
78 data: Bytes::from(data.clone()),
79 };
80 return Ok(Ok(value));
81 }
82
83 match self.head(key).await? {
86 Some(current) if current.seqno == expected => Ok(Ok(current)),
87 x => Ok(Err(x)),
88 }
89 }
90}
91
92#[async_trait]
93impl Consensus for MaelstromConsensus {
94 fn list_keys(&self) -> ResultStream<String> {
95 unimplemented!("TODO")
96 }
97
98 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
99 let value = match self
100 .handle
101 .lin_kv_read(Value::from(format!("consensus/{}", key)))
102 .await
103 .map_err(anyhow::Error::new)?
104 {
105 Some(x) => x,
106 None => return Ok(None),
107 };
108 let value = VersionedData::from(MaelstromVersionedData::try_from(&value)?);
109 self.cache
110 .lock()
111 .await
112 .insert((key.to_string(), value.seqno), value.data.to_vec());
113 Ok(Some(value))
114 }
115
116 async fn compare_and_set(
117 &self,
118 key: &str,
119 expected: Option<SeqNo>,
120 new: VersionedData,
121 ) -> Result<CaSResult, ExternalError> {
122 let create_if_not_exists = expected.is_none();
123
124 let from = match expected {
125 Some(expected) => match self.hydrate_seqno(key, expected).await? {
126 Ok(x) => Value::from(&MaelstromVersionedData::from(x)),
127 Err(_) => {
128 return Ok(CaSResult::ExpectationMismatch);
129 }
130 },
131 None => Value::Null,
132 };
133 let new = MaelstromVersionedData::from(new);
134 let to = Value::from(&new);
135 let cas_res = self
136 .handle
137 .lin_kv_compare_and_set(
138 Value::from(format!("consensus/{}", key)),
139 from,
140 to,
141 Some(create_if_not_exists),
142 )
143 .await;
144 match cas_res {
145 Ok(()) => {
146 self.cache
147 .lock()
148 .await
149 .insert((key.to_string(), SeqNo(new.seqno)), new.data.clone());
150 Ok(CaSResult::Committed)
151 }
152 Err(MaelstromError {
153 code: ErrorCode::PreconditionFailed,
154 ..
155 }) => Ok(CaSResult::ExpectationMismatch),
156 Err(err) => Err(ExternalError::from(anyhow::Error::new(err))),
157 }
158 }
159
160 async fn scan(
161 &self,
162 _key: &str,
163 _from: SeqNo,
164 _limit: usize,
165 ) -> Result<Vec<VersionedData>, ExternalError> {
166 unimplemented!("TODO")
167 }
168
169 async fn truncate(&self, _key: &str, _seqno: SeqNo) -> Result<usize, ExternalError> {
170 unimplemented!("TODO")
171 }
172}
173
174#[derive(Debug)]
176pub struct MaelstromBlob {
177 handle: Handle,
178}
179
180impl MaelstromBlob {
181 pub fn new(handle: Handle) -> Arc<dyn Blob> {
182 Arc::new(MaelstromBlob { handle })
183 }
184}
185
186#[async_trait]
187impl Blob for MaelstromBlob {
188 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
189 let value = match self
190 .handle
191 .lin_kv_read(Value::from(format!("blob/{}", key)))
192 .await
193 .map_err(anyhow::Error::new)?
194 {
195 Some(x) => x,
196 None => return Ok(None),
197 };
198 let value = value
199 .as_str()
200 .ok_or_else(|| anyhow!("invalid blob at {}: {:?}", key, value))?;
201 let value: Vec<u8> = serde_json::from_str(value)
202 .map_err(|err| anyhow!("invalid blob at {}: {}", key, err))?;
203 Ok(Some(SegmentedBytes::from(value)))
204 }
205
206 async fn list_keys_and_metadata(
207 &self,
208 _key_prefix: &str,
209 _f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
210 ) -> Result<(), ExternalError> {
211 unimplemented!("not yet used")
212 }
213
214 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
215 let value = serde_json::to_string(value.as_ref()).expect("failed to serialize value");
216 self.handle
217 .lin_kv_write(Value::from(format!("blob/{}", key)), Value::from(value))
218 .await
219 .map_err(anyhow::Error::new)?;
220 Ok(())
221 }
222
223 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
224 self.handle
226 .lin_kv_write(Value::from(format!("blob/{}", key)), Value::Null)
227 .await
228 .map_err(anyhow::Error::new)?;
229 Ok(Some(0))
232 }
233
234 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
235 let read = self
236 .handle
237 .lin_kv_read(Value::from(format!("blob/{}", key)))
238 .await
239 .map_err(anyhow::Error::new)?;
240 if read.is_none() {
241 return Err(
242 Determinate::new(anyhow!("key {key} not present in the maelstrom store")).into(),
243 );
244 }
245 Ok(())
246 }
247}
248
249#[derive(Debug)]
254pub struct CachingBlob {
255 blob: Arc<dyn Blob>,
256 cache: Mutex<BTreeMap<String, SegmentedBytes>>,
257}
258
259impl CachingBlob {
260 pub fn new(blob: Arc<dyn Blob>) -> Arc<dyn Blob> {
261 Arc::new(CachingBlob {
262 blob,
263 cache: Mutex::new(BTreeMap::new()),
264 })
265 }
266}
267
268#[async_trait]
269impl Blob for CachingBlob {
270 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
271 let cache = self.cache.lock().await;
273 if let Some(value) = cache.get(key) {
274 return Ok(Some(value.clone()));
275 }
276 drop(cache);
278
279 let value = self.blob.get(key).await?;
281 if let Some(value) = &value {
282 self.cache
286 .lock()
287 .await
288 .insert(key.to_owned(), value.clone());
289 }
290
291 Ok(value)
292 }
293
294 async fn list_keys_and_metadata(
295 &self,
296 key_prefix: &str,
297 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
298 ) -> Result<(), ExternalError> {
299 self.blob.list_keys_and_metadata(key_prefix, f).await
300 }
301
302 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
303 self.blob.set(key, value).await
306 }
307
308 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
309 self.cache.lock().await.remove(key);
310 self.blob.delete(key).await
311 }
312
313 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
314 self.blob.restore(key).await
315 }
316}
317
318#[derive(Debug)]
319pub struct MaelstromOracle {
320 read_ts: MaelstromOracleKey,
321 write_ts: MaelstromOracleKey,
322}
323
324#[derive(Debug)]
325pub struct MaelstromOracleKey {
326 handle: Handle,
327 key: String,
328 expected: u64,
329}
330
331impl MaelstromOracle {
333 pub async fn new(handle: Handle) -> Result<Self, ExternalError> {
334 let read_ts = MaelstromOracleKey::new(handle.clone(), "tso_read", 0).await?;
335 let write_ts = MaelstromOracleKey::new(handle.clone(), "tso_write", 0).await?;
336 Ok(MaelstromOracle { read_ts, write_ts })
337 }
338
339 pub async fn write_ts(&mut self) -> Result<u64, ExternalError> {
340 let prev = self.write_ts.cas(|expected| Some(expected + 1)).await?;
341 Ok(prev)
342 }
343
344 pub async fn peek_write_ts(&mut self) -> Result<u64, ExternalError> {
345 self.write_ts.peek().await
346 }
347
348 pub async fn read_ts(&mut self) -> Result<u64, ExternalError> {
349 self.read_ts.peek().await
350 }
351
352 pub async fn apply_write(&mut self, lower_bound: u64) -> Result<(), ExternalError> {
353 let write_prev = self
354 .write_ts
355 .cas(|expected| (expected < lower_bound).then_some(lower_bound))
356 .await?;
357 let read_prev = self
358 .read_ts
359 .cas(|expected| (expected < lower_bound).then_some(lower_bound))
360 .await?;
361 debug!(
362 "apply_write {} write_prev={} write_new={} read_prev={} read_new={}",
363 lower_bound, write_prev, self.write_ts.expected, read_prev, self.read_ts.expected
364 );
365 Ok(())
366 }
367}
368
369impl MaelstromOracleKey {
370 async fn new(handle: Handle, key: &str, init_ts: u64) -> Result<Self, ExternalError> {
371 let res = handle
372 .lin_kv_compare_and_set(
373 Value::from(key),
374 Value::Null,
375 Value::from(init_ts),
376 Some(true),
377 )
378 .await;
379 match Self::cas_res(res)? {
381 CaSResult::Committed => {}
382 CaSResult::ExpectationMismatch => {}
383 }
384 Ok(MaelstromOracleKey {
385 handle,
386 key: key.to_owned(),
387 expected: init_ts,
388 })
389 }
390
391 fn cas_res(res: Result<(), MaelstromError>) -> Result<CaSResult, ExternalError> {
392 match res {
393 Ok(()) => Ok(CaSResult::Committed),
394 Err(MaelstromError {
395 code: ErrorCode::PreconditionFailed,
396 ..
397 }) => Ok(CaSResult::ExpectationMismatch),
398 Err(err) => Err(anyhow::Error::new(err).into()),
399 }
400 }
401
402 async fn peek(&mut self) -> Result<u64, ExternalError> {
403 let value = self
404 .handle
405 .lin_kv_read(Value::from(self.key.as_str()))
406 .await
407 .map_err(anyhow::Error::new)?
408 .expect("ts oracle should be initialized");
409 let current = value
410 .as_u64()
411 .ok_or_else(|| anyhow!("invalid {} value: {:?}", self.key, value))?;
412 assert!(self.expected <= current);
413 self.expected = current;
414 Ok(current)
415 }
416
417 async fn cas(&mut self, new_fn: impl Fn(u64) -> Option<u64>) -> Result<u64, ExternalError> {
418 loop {
419 let new = match new_fn(self.expected) {
420 Some(x) => x,
421 None => {
422 return Ok(self.expected);
424 }
425 };
426 let res = self
427 .handle
428 .lin_kv_compare_and_set(
429 Value::from(self.key.as_str()),
430 Value::from(self.expected),
431 Value::from(new),
432 None,
433 )
434 .await;
435 match Self::cas_res(res)? {
436 CaSResult::Committed => {
437 let prev = self.expected;
438 self.expected = new;
439 return Ok(prev);
440 }
441 CaSResult::ExpectationMismatch => {
442 self.expected = self.peek().await?;
443 continue;
444 }
445 };
446 }
447 }
448}
449
450#[derive(Debug, Default)]
451pub struct MemTimestampOracle<T> {
452 read_write_ts: Arc<std::sync::Mutex<(T, T)>>,
453}
454
455#[async_trait]
456impl<T: TimestampManipulation> TimestampOracle<T> for MemTimestampOracle<T> {
457 async fn write_ts(&self) -> WriteTimestamp<T> {
458 let (read_ts, write_ts) = &mut *self.read_write_ts.lock().expect("lock poisoned");
459 let new_write_ts = TimestampManipulation::step_forward(std::cmp::max(read_ts, write_ts));
460 write_ts.clone_from(&new_write_ts);
461 WriteTimestamp {
462 advance_to: TimestampManipulation::step_forward(&new_write_ts),
463 timestamp: new_write_ts,
464 }
465 }
466
467 async fn peek_write_ts(&self) -> T {
468 let (_, write_ts) = &*self.read_write_ts.lock().expect("lock poisoned");
469 write_ts.clone()
470 }
471
472 async fn read_ts(&self) -> T {
473 let (read_ts, _) = &*self.read_write_ts.lock().expect("lock poisoned");
474 read_ts.clone()
475 }
476
477 async fn apply_write(&self, lower_bound: T) {
478 let (read_ts, write_ts) = &mut *self.read_write_ts.lock().expect("lock poisoned");
479 *read_ts = std::cmp::max(read_ts.clone(), lower_bound);
480 *write_ts = std::cmp::max(read_ts, write_ts).clone();
481 }
482}
483
484mod from_impls {
485 use bytes::Bytes;
486 use mz_persist::location::{ExternalError, SeqNo, VersionedData};
487 use serde_json::Value;
488
489 use crate::maelstrom::services::MaelstromVersionedData;
490
491 impl From<VersionedData> for MaelstromVersionedData {
492 fn from(x: VersionedData) -> Self {
493 MaelstromVersionedData {
494 seqno: x.seqno.0,
495 data: x.data.to_vec(),
496 }
497 }
498 }
499
500 impl From<MaelstromVersionedData> for VersionedData {
501 fn from(x: MaelstromVersionedData) -> Self {
502 VersionedData {
503 seqno: SeqNo(x.seqno),
504 data: Bytes::from(x.data),
505 }
506 }
507 }
508
509 impl From<&MaelstromVersionedData> for Value {
510 fn from(x: &MaelstromVersionedData) -> Self {
511 let json = serde_json::to_string(x).expect("MaelstromVersionedData wasn't valid json");
512 serde_json::from_str(&json).expect("MaelstromVersionedData wasn't valid json")
513 }
514 }
515
516 impl TryFrom<&Value> for MaelstromVersionedData {
517 type Error = ExternalError;
518
519 fn try_from(x: &Value) -> Result<Self, Self::Error> {
520 let json = serde_json::to_string(x)
521 .map_err(|err| ExternalError::from(anyhow::Error::new(err)))?;
522 serde_json::from_str(&json).map_err(|err| ExternalError::from(anyhow::Error::new(err)))
523 }
524 }
525}