persistcli/maelstrom/
txn_list_append_single.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the Maelstrom txn-list-append workload using a single
11//! persist shard.
12
13use std::collections::BTreeMap;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use async_trait::async_trait;
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::metrics::MetricsRegistry;
21use mz_ore::now::SYSTEM_TIME;
22use mz_persist::cfg::{BlobConfig, ConsensusConfig};
23use mz_persist::location::{Blob, Consensus, ExternalError};
24use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
25use mz_persist_client::async_runtime::IsolatedRuntime;
26use mz_persist_client::cache::StateCache;
27use mz_persist_client::cfg::PersistConfig;
28use mz_persist_client::critical::SinceHandle;
29use mz_persist_client::metrics::Metrics;
30use mz_persist_client::read::{Listen, ListenEvent};
31use mz_persist_client::rpc::PubSubClientConnection;
32use mz_persist_client::write::WriteHandle;
33use mz_persist_client::{Diagnostics, PersistClient, ShardId};
34use timely::PartialOrder;
35use timely::order::TotalOrder;
36use timely::progress::{Antichain, Timestamp};
37use tokio::sync::Mutex;
38use tracing::{debug, info, trace};
39
40use crate::maelstrom::Args;
41use crate::maelstrom::api::{Body, ErrorCode, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
42use crate::maelstrom::node::{Handle, Service};
43use crate::maelstrom::services::{CachingBlob, MaelstromBlob, MaelstromConsensus};
44use crate::maelstrom::txn_list_append_single::codec_impls::{
45    MaelstromKeySchema, MaelstromValSchema,
46};
47
48/// Key of the persist shard used by [Transactor]
49#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
50pub struct MaelstromKey(u64);
51
52/// Val of the persist shard used by [Transactor]
53#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
54pub struct MaelstromVal(Vec<u64>);
55
56/// An implementation of read-write transactions on top of persist
57///
58/// This executes Maelstrom [txn-list-append] transactions. The timestamp
59/// selection is modeled after Materialize's SQL implementation.
60///
61/// [txn-list-append]: https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/workloads.md#workload-txn-list-append
62///
63/// A persist shard is maintained that directly represents a key-value map as it
64/// evolves over time. (Our real SQL implementation would likely instead
65/// maintain a WAL of updates to multiple tables.) Each transaction has
66/// associated timestamps:
67///
68/// - `read_ts`: at which the contents of the map are read (inclusive)
69/// - `write_ts`: at which txn mutations (if any) are written
70/// - `expected_upper`: the upper bound of the previous txn
71/// - `new_upper`: the upper bound after this transaction commits, if it does
72///
73/// To keep things simple, `write_ts` is always `read_ts+1`, `expected_upper` is
74/// `antichain[write_ts]`, and `new_upper` is `antichain[write_ts+1]`.
75///
76/// Transactions are "committed" by using `[WriteHandle::compare_and_append]`,
77/// which atomically:
78/// - verifies that `expected_upper` matches
79/// - writes the updates
80/// - advances the upper to `new_upper`.
81///
82/// This guarantees that all transactions are linearized with each txn's
83/// `read_ts` equal to the previous txn's `write_ts`. If two transactions race
84/// by reading at the same `read_ts` and writing at the same `write_ts`, the
85/// `compare_and_append` guarantees that only one of them succeeds and the other
86/// must retry with new timestamps.
87///
88/// Upon first use, the persist shard is initialized by advancing the upper to
89/// `antichain[T::minimum() + 1]`. This allows the first txn to use 0 as the
90/// `read_ts` and 1 as the `write_ts`.
91///
92/// Similarly modeling Materialize, the since of the shard is kept following the
93/// upper. To keep things simple, this is done by a fixed offset. This exercises
94/// persist compaction.
95///
96/// To ensure that both [mz_persist_client::read::ReadHandle::snapshot] and
97/// [mz_persist_client::read::ReadHandle::listen] are exercised, when a txn
98/// reads the state at `read_ts`, it artificially picks an `as_of` timestamp in
99/// `[since, read_ts]` and splits up the read data between snapshot and listen
100/// along this timestamp.
101#[derive(Debug)]
102pub struct Transactor {
103    cads_token: u64,
104    shard_id: ShardId,
105    client: PersistClient,
106    since: SinceHandle<MaelstromKey, MaelstromVal, u64, i64, u64>,
107    write: WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
108
109    read_ts: u64,
110
111    // Keep a long-lived listen, which is incrementally read as we go. Then
112    // assert that it has the same data as the short-lived snapshot+listen in
113    // `read`. This hopefully stresses slightly different parts of the system.
114    long_lived_updates: Vec<(
115        (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
116        u64,
117        i64,
118    )>,
119    long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
120}
121
122impl Transactor {
123    pub async fn new(
124        client: &PersistClient,
125        node_id: NodeId,
126        shard_id: ShardId,
127    ) -> Result<Self, MaelstromError> {
128        let cads_token = node_id
129            .0
130            .trim_start_matches('n')
131            .parse::<u64>()
132            .expect("maelstrom node_id should be n followed by an integer");
133
134        let (mut write, mut read) = client
135            .open(
136                shard_id,
137                Arc::new(MaelstromKeySchema),
138                Arc::new(MaelstromValSchema),
139                Diagnostics::from_purpose("maelstrom long-lived"),
140                true,
141            )
142            .await?;
143        // Use the CONTROLLER_CRITICAL_SINCE id for all nodes so we get coverage
144        // of contending traffic.
145        let since = client
146            .open_critical_since(
147                shard_id,
148                PersistClient::CONTROLLER_CRITICAL_SINCE,
149                Diagnostics::from_purpose("maelstrom since"),
150            )
151            .await?;
152        let read_ts = Self::maybe_init_shard(&mut write).await?;
153
154        let mut long_lived_updates = Vec::new();
155        let as_of = since.since().clone();
156        let mut updates = read
157            .snapshot_and_fetch(as_of.clone())
158            .await
159            .expect("as_of unexpectedly unavailable");
160        long_lived_updates.append(&mut updates);
161        let long_lived_listen = read
162            .listen(as_of.clone())
163            .await
164            .expect("as_of unexpectedly unavailable");
165
166        Ok(Transactor {
167            client: client.clone(),
168            cads_token,
169            shard_id,
170            since,
171            write,
172            read_ts,
173            long_lived_updates,
174            long_lived_listen,
175        })
176    }
177
178    /// Initializes the shard, if it hasn't been already, and returns the read
179    /// timestamp.
180    async fn maybe_init_shard(
181        write: &mut WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
182    ) -> Result<u64, MaelstromError> {
183        debug!("Transactor::maybe_init");
184        const EMPTY_UPDATES: &[((MaelstromKey, MaelstromVal), u64, i64)] = &[];
185        let ts_min = u64::minimum();
186        let initial_upper = Antichain::from_elem(ts_min);
187        let new_upper = Antichain::from_elem(ts_min + 1);
188        let cas_res = write
189            .compare_and_append(EMPTY_UPDATES, initial_upper.clone(), new_upper.clone())
190            .await;
191        let read_ts = match cas_res? {
192            Ok(()) => 0,
193            Err(mismatch) => Self::extract_ts(&mismatch.current)? - 1,
194        };
195        Ok(read_ts)
196    }
197
198    pub async fn transact(
199        &mut self,
200        req_ops: &[ReqTxnOp],
201    ) -> Result<Vec<ResTxnOp>, MaelstromError> {
202        loop {
203            trace!("transact req={:?}", req_ops);
204            let state = self.read().await?;
205            debug!("transact req={:?} state={:?}", req_ops, state);
206            let (writes, res_ops) = Self::eval_txn(&state, req_ops);
207
208            // NB: We do the CaS even if writes is empty, so that read-only txns
209            // are also linearizable.
210            let write_ts = self.read_ts + 1;
211            let updates = writes
212                .into_iter()
213                .map(|(k, v, diff)| ((k, v), write_ts, diff));
214            let expected_upper = Antichain::from_elem(write_ts);
215            let new_upper = Antichain::from_elem(write_ts + 1);
216            let cas_res = self
217                .write
218                .compare_and_append(updates, expected_upper.clone(), new_upper)
219                .await?;
220            match cas_res {
221                Ok(()) => {
222                    self.read_ts = write_ts;
223                    self.advance_since().await?;
224                    return Ok(res_ops);
225                }
226                // We lost the CaS race, try again.
227                Err(mismatch) => {
228                    info!(
229                        "transact lost the CaS race, retrying: {:?} vs {:?}",
230                        mismatch.expected, mismatch.current
231                    );
232                    self.read_ts = Self::extract_ts(&mismatch.current)? - 1;
233                    continue;
234                }
235            }
236        }
237    }
238
239    async fn read_short_lived(
240        &mut self,
241    ) -> Result<
242        (
243            Vec<(
244                (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
245                u64,
246                i64,
247            )>,
248            Antichain<u64>,
249        ),
250        MaelstromError,
251    > {
252        loop {
253            // We're reading as of read_ts, but we can split the read between the
254            // snapshot and listen at any ts in `[since_ts, read_ts]`. Intentionally
255            // pick one that uses a combination of both to get coverage.
256            let as_of = Antichain::from_elem(self.read_ts);
257            let since_ts = Self::extract_ts(self.since.since())?;
258            assert!(self.read_ts >= since_ts, "{} vs {}", self.read_ts, since_ts);
259            let snap_ts = since_ts + (self.read_ts - since_ts) / 2;
260            let snap_as_of = Antichain::from_elem(snap_ts);
261
262            // Intentionally create this from scratch so we get a brand new copy of
263            // state and exercise some more code paths.
264            let mut read = self
265                .client
266                .open_leased_reader(
267                    self.shard_id,
268                    Arc::new(MaelstromKeySchema),
269                    Arc::new(MaelstromValSchema),
270                    Diagnostics::from_purpose("maelstrom short-lived"),
271                    true,
272                )
273                .await
274                .expect("codecs should match");
275
276            let updates_res = read.snapshot_and_fetch(snap_as_of.clone()).await;
277            let mut updates = match updates_res {
278                Ok(x) => x,
279                Err(since) => {
280                    // Because we artificially share the same CriticalReaderId
281                    // between nodes, it doesn't quite act like a capability.
282                    // Prod doesn't have this issue, because a new one will
283                    // fence out old ones, but it's done here to stress edge
284                    // cases.
285                    //
286                    // If we did succeed in this read, we'd anyway just find out
287                    // from compare_and_append that our read_ts was out of date,
288                    // so proceed by fetching a new one and trying again. We
289                    // also re-register the since to get an updated since value
290                    // for the snap_ts calculation above.
291                    info!(
292                        "snapshot cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
293                        snap_ts,
294                        since.0.as_option()
295                    );
296                    self.advance_since().await?;
297
298                    let recent_upper = self.write.fetch_recent_upper().await;
299                    self.read_ts = Self::extract_ts(recent_upper)? - 1;
300                    continue;
301                }
302            };
303
304            let listen_res = read.listen(snap_as_of).await;
305            let listen = match listen_res {
306                Ok(x) => x,
307                Err(since) => {
308                    // Because we artificially share the same CriticalReaderId
309                    // between nodes, it doesn't quite act like a capability.
310                    // Prod doesn't have this issue, because a new one will
311                    // fence out old ones, but it's done here to stress edge
312                    // cases.
313                    //
314                    // If we did succeed in this read, we'd anyway just find out
315                    // from compare_and_append that our read_ts was out of date,
316                    // so proceed by fetching a new one and trying again. We
317                    // also re-register the since to get an updated since value
318                    // for the snap_ts calculation above.
319                    info!(
320                        "listen cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
321                        snap_ts,
322                        since.0.as_option(),
323                    );
324                    self.advance_since().await?;
325                    let recent_upper = self.write.fetch_recent_upper().await;
326                    self.read_ts = Self::extract_ts(recent_upper)? - 1;
327                    assert!(
328                        PartialOrder::less_than(self.since.since(), recent_upper),
329                        "invariant: since {:?} should be held behind the recent upper {:?}",
330                        &**self.since.since(),
331                        &**recent_upper
332                    );
333                    continue;
334                }
335            };
336
337            trace!(
338                "read updates from snapshot as_of {}: {:?}",
339                snap_ts, updates
340            );
341            let listen_updates = Self::listen_through(listen, &as_of).await?;
342            trace!(
343                "read updates from listener as_of {} through {}: {:?}",
344                snap_ts, self.read_ts, listen_updates
345            );
346            updates.extend(listen_updates);
347
348            // Compute the contents of the collection as of `as_of`.
349            for (_, t, _) in updates.iter_mut() {
350                t.advance_by(as_of.borrow());
351            }
352            consolidate_updates(&mut updates);
353            return Ok((updates, as_of));
354        }
355    }
356
357    async fn read(&mut self) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
358        let (updates, as_of) = self.read_short_lived().await?;
359
360        let long_lived = self.read_long_lived(&as_of).await;
361        assert_eq!(&updates, &long_lived);
362
363        Self::extract_state_map(self.read_ts, updates)
364    }
365
366    async fn listen_through(
367        mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
368        frontier: &Antichain<u64>,
369    ) -> Result<
370        Vec<(
371            (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
372            u64,
373            i64,
374        )>,
375        ExternalError,
376    > {
377        let mut ret = Vec::new();
378        loop {
379            for event in listen.fetch_next().await {
380                match event {
381                    ListenEvent::Progress(x) => {
382                        // NB: Unlike the snapshot as_of, a listener frontier is
383                        // not inclusive, so we have to wait until it's > our
384                        // as_of to be sure we have everything.
385                        if PartialOrder::less_than(frontier, &x) {
386                            return Ok(ret);
387                        }
388                    }
389                    ListenEvent::Updates(x) => {
390                        // We want the collection at as_of, so skip anything
391                        // past that.
392                        ret.extend(x.into_iter().filter(|(_, ts, _)| !frontier.less_than(ts)));
393                    }
394                }
395            }
396        }
397    }
398
399    async fn read_long_lived(
400        &mut self,
401        as_of: &Antichain<u64>,
402    ) -> Vec<(
403        (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
404        u64,
405        i64,
406    )> {
407        while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
408            for event in self.long_lived_listen.fetch_next().await {
409                match event {
410                    ListenEvent::Updates(mut updates) => {
411                        self.long_lived_updates.append(&mut updates)
412                    }
413                    ListenEvent::Progress(_) => {} // No-op.
414                }
415            }
416        }
417        for (_, t, _) in self.long_lived_updates.iter_mut() {
418            t.advance_by(as_of.borrow());
419        }
420        consolidate_updates(&mut self.long_lived_updates);
421
422        // If as_of is less_than the frontier, we may have ended up with updates
423        // that we didn't want yet. We can't remove them from
424        // `self.long_lived_updates` because the long lived listener will only
425        // emit them once and we'll want them later. If performance was
426        // important, we could sort them to the end and return a subset, but
427        // it's not, so do the easy thing and copy the Vec.
428        self.long_lived_updates
429            .iter()
430            .filter(|(_, t, _)| !as_of.less_than(t))
431            .cloned()
432            .collect()
433    }
434
435    fn extract_state_map(
436        read_ts: u64,
437        updates: Vec<(
438            (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
439            u64,
440            i64,
441        )>,
442    ) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
443        let mut ret = BTreeMap::new();
444        for ((k, v), _, d) in updates {
445            if d != 1 {
446                return Err(MaelstromError {
447                    code: ErrorCode::Crash,
448                    text: format!("invalid read at time {}", read_ts),
449                });
450            }
451            let k = k.map_err(|err| MaelstromError {
452                code: ErrorCode::Crash,
453                text: format!("invalid key {}", err),
454            })?;
455            let v = v.map_err(|err| MaelstromError {
456                code: ErrorCode::Crash,
457                text: format!("invalid val {}", err),
458            })?;
459            if ret.contains_key(&k) {
460                return Err(MaelstromError {
461                    code: ErrorCode::Crash,
462                    text: format!("unexpected duplicate key {:?}", k),
463                });
464            }
465            ret.insert(k, v);
466        }
467        Ok(ret)
468    }
469
470    fn eval_txn(
471        state: &BTreeMap<MaelstromKey, MaelstromVal>,
472        req_ops: &[ReqTxnOp],
473    ) -> (Vec<(MaelstromKey, MaelstromVal, i64)>, Vec<ResTxnOp>) {
474        let mut res_ops = Vec::new();
475        let mut updates = Vec::new();
476        let mut txn_state = BTreeMap::new();
477
478        for req_op in req_ops.iter() {
479            match req_op {
480                ReqTxnOp::Read { key } => {
481                    let current = txn_state
482                        .get(&MaelstromKey(*key))
483                        .or_else(|| state.get(&MaelstromKey(*key)));
484                    let val = current.cloned().unwrap_or_default().0;
485                    res_ops.push(ResTxnOp::Read { key: *key, val })
486                }
487                ReqTxnOp::Append { key, val } => {
488                    let current = txn_state
489                        .get(&MaelstromKey(*key))
490                        .or_else(|| state.get(&MaelstromKey(*key)));
491                    let mut vals = match current {
492                        Some(val) => {
493                            // Retract the value we're about to overwrite.
494                            updates.push((MaelstromKey(*key), val.clone(), -1));
495                            val.clone()
496                        }
497                        None => MaelstromVal::default(),
498                    };
499                    vals.0.push(*val);
500                    txn_state.insert(MaelstromKey(*key), vals.clone());
501                    updates.push((MaelstromKey(*key), vals, 1));
502                    res_ops.push(ResTxnOp::Append {
503                        key: key.clone(),
504                        val: *val,
505                    })
506                }
507            }
508        }
509
510        debug!(
511            "eval_txn\n  req={:?}\n  res={:?}\n  updates={:?}\n  state={:?}\n  txn_state={:?}",
512            req_ops, res_ops, updates, state, txn_state
513        );
514        (updates, res_ops)
515    }
516
517    async fn advance_since(&mut self) -> Result<(), MaelstromError> {
518        // To keep things interesting, advance the since.
519        const SINCE_LAG: u64 = 10;
520        let new_since = Antichain::from_elem(self.read_ts.saturating_sub(SINCE_LAG));
521
522        let mut expected_token = self.cads_token;
523        loop {
524            let res = self
525                .since
526                .maybe_compare_and_downgrade_since(&expected_token, (&self.cads_token, &new_since))
527                .await;
528            match res {
529                Some(Ok(latest_since)) => {
530                    // Success! Another process might have advanced past our read_ts, so
531                    // forward read_ts to since_ts.
532                    let since_ts = Self::extract_ts(&latest_since)?;
533                    if since_ts > self.read_ts {
534                        info!(
535                            "since was last updated by {}, forwarding our read_ts from {} to {}",
536                            expected_token, self.read_ts, since_ts
537                        );
538                        self.read_ts = since_ts;
539                    }
540
541                    return Ok(());
542                }
543                Some(Err(actual_token)) => {
544                    debug!(
545                        "actual downgrade_since token {} didn't match expected {}, retrying",
546                        actual_token, expected_token,
547                    );
548                    expected_token = actual_token;
549                }
550                None => {
551                    panic!("should not no-op `maybe_compare_and_downgrade_since` during testing");
552                }
553            }
554        }
555    }
556
557    fn extract_ts<T: TotalOrder + Copy>(frontier: &Antichain<T>) -> Result<T, MaelstromError> {
558        frontier.as_option().copied().ok_or_else(|| MaelstromError {
559            code: ErrorCode::Crash,
560            text: "shard unexpectedly closed".into(),
561        })
562    }
563}
564
565/// An adaptor to implement [Service] using [Transactor]
566#[derive(Debug)]
567pub struct TransactorService(pub Arc<Mutex<Transactor>>);
568
569#[async_trait]
570impl Service for TransactorService {
571    async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
572        // Use the Maelstrom services to initialize a new random ShardId (so we
573        // can repeatedly run tests against the same Blob and Consensus without
574        // conflicting) and communicate it between processes.
575        let shard_id = handle.maybe_init_shard_id().await?;
576
577        // Make sure the seed is recomputed each time through the retry
578        // closure, so we don't retry the same deterministic timeouts.
579        let seed: u64 = SystemTime::now()
580            .duration_since(UNIX_EPOCH)
581            .unwrap_or_default()
582            .subsec_nanos()
583            .into();
584        // It doesn't particularly matter what we set should_happen to, so we do
585        // this to have a convenient single tunable param.
586        let should_happen = 1.0 - args.unreliability;
587        // For consensus, set should_timeout to `args.unreliability` so that once we split
588        // ExternalErrors into determinate vs indeterminate, then
589        // `args.unreliability` will also be the fraction of txns that it's
590        // not save for Maelstrom to retry (b/c indeterminate error in
591        // Consensus CaS).
592        let should_timeout = args.unreliability;
593        // It doesn't particularly matter what we set should_happen and
594        // should_timeout to for blobs, so use the same handle for both.
595        let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
596
597        let mut config =
598            PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
599        let metrics = Arc::new(Metrics::new(&config, &MetricsRegistry::new()));
600
601        // Construct requested Blob.
602        let blob = match &args.blob_uri {
603            Some(blob_uri) => {
604                let cfg = BlobConfig::try_from(
605                    blob_uri,
606                    Box::new(config.clone()),
607                    metrics.s3_blob.clone(),
608                    Arc::clone(&config.configs),
609                )
610                .await
611                .expect("blob_uri should be valid");
612                loop {
613                    match cfg.clone().open().await {
614                        Ok(x) => break x,
615                        Err(err) => {
616                            info!("failed to open blob, trying again: {}", err);
617                        }
618                    }
619                }
620            }
621            None => MaelstromBlob::new(handle.clone()),
622        };
623        let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
624        // Normal production persist usage (even including a real SQL txn impl)
625        // isn't particularly benefitted by a cache, so we don't have one baked
626        // into persist. In contrast, our Maelstrom transaction model
627        // intentionally exercises both a new snapshot and new listener on each
628        // txn. As a result, without a cache, things would be terribly slow,
629        // unreliable would cause more retries than are interesting, and the
630        // Lamport diagrams that Maelstrom generates would be noisy.
631        let blob = CachingBlob::new(blob);
632        // to simplify some downstream logic (+ a bit more stress testing),
633        // always downgrade the since of critical handles when asked
634        config.critical_downgrade_interval = Duration::from_secs(0);
635        // set a live diff scan limit such that we'll explore both the fast and slow paths
636        config.set_state_versions_recent_live_diffs_limit(5);
637        let consensus = match &args.consensus_uri {
638            Some(consensus_uri) => {
639                let cfg = ConsensusConfig::try_from(
640                    consensus_uri,
641                    Box::new(config.clone()),
642                    metrics.postgres_consensus.clone(),
643                )
644                .expect("consensus_uri should be valid");
645                loop {
646                    match cfg.clone().open().await {
647                        Ok(x) => break x,
648                        Err(err) => {
649                            info!("failed to open consensus, trying again: {}", err);
650                        }
651                    }
652                }
653            }
654            None => MaelstromConsensus::new(handle.clone()),
655        };
656        let consensus: Arc<dyn Consensus> =
657            Arc::new(UnreliableConsensus::new(consensus, unreliable));
658
659        // Wire up the TransactorService.
660        let isolated_runtime = Arc::new(IsolatedRuntime::default());
661        let pubsub_sender = PubSubClientConnection::noop().sender;
662        let shared_states = Arc::new(StateCache::new(
663            &config,
664            Arc::clone(&metrics),
665            Arc::clone(&pubsub_sender),
666        ));
667        let client = PersistClient::new(
668            config,
669            blob,
670            consensus,
671            metrics,
672            isolated_runtime,
673            shared_states,
674            pubsub_sender,
675        )?;
676        let transactor = Transactor::new(&client, handle.node_id(), shard_id).await?;
677        let service = TransactorService(Arc::new(Mutex::new(transactor)));
678        Ok(service)
679    }
680
681    async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
682        match req {
683            Body::ReqTxn { msg_id, txn } => {
684                let in_reply_to = msg_id;
685                match self.0.lock().await.transact(&txn).await {
686                    Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
687                        msg_id,
688                        in_reply_to,
689                        txn,
690                    }),
691                    Err(MaelstromError { code, text }) => {
692                        handle.send_res(src, |msg_id| Body::Error {
693                            msg_id: Some(msg_id),
694                            in_reply_to,
695                            code,
696                            text,
697                        })
698                    }
699                }
700            }
701            req => unimplemented!("unsupported req: {:?}", req),
702        }
703    }
704}
705
706mod codec_impls {
707    use arrow::array::{BinaryArray, BinaryBuilder, UInt64Array, UInt64Builder};
708    use arrow::datatypes::ToByteSlice;
709    use bytes::Bytes;
710    use mz_persist_types::Codec;
711    use mz_persist_types::codec_impls::{
712        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
713    };
714    use mz_persist_types::columnar::Schema;
715    use mz_persist_types::stats::NoneStats;
716
717    use crate::maelstrom::txn_list_append_single::{MaelstromKey, MaelstromVal};
718
719    impl Codec for MaelstromKey {
720        type Storage = ();
721        type Schema = MaelstromKeySchema;
722
723        fn codec_name() -> String {
724            "MaelstromKey".into()
725        }
726
727        fn encode<B>(&self, buf: &mut B)
728        where
729            B: bytes::BufMut,
730        {
731            let bytes = serde_json::to_vec(&self.0).expect("failed to encode key");
732            buf.put(bytes.as_slice());
733        }
734
735        fn decode<'a>(buf: &'a [u8], _schema: &MaelstromKeySchema) -> Result<Self, String> {
736            Ok(MaelstromKey(
737                serde_json::from_slice(buf).map_err(|err| err.to_string())?,
738            ))
739        }
740
741        fn encode_schema(_schema: &Self::Schema) -> Bytes {
742            Bytes::new()
743        }
744
745        fn decode_schema(buf: &Bytes) -> Self::Schema {
746            assert_eq!(*buf, Bytes::new());
747            MaelstromKeySchema
748        }
749    }
750
751    impl SimpleColumnarData for MaelstromKey {
752        type ArrowBuilder = UInt64Builder;
753        type ArrowColumn = UInt64Array;
754
755        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
756            builder.values_slice().to_byte_slice().len()
757        }
758
759        fn push(&self, builder: &mut Self::ArrowBuilder) {
760            builder.append_value(self.0);
761        }
762        fn push_null(builder: &mut Self::ArrowBuilder) {
763            builder.append_null();
764        }
765
766        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
767            *self = MaelstromKey(column.value(idx));
768        }
769    }
770
771    #[derive(Debug, PartialEq)]
772    pub struct MaelstromKeySchema;
773
774    impl Schema<MaelstromKey> for MaelstromKeySchema {
775        type ArrowColumn = UInt64Array;
776        type Statistics = NoneStats;
777
778        type Decoder = SimpleColumnarDecoder<MaelstromKey>;
779        type Encoder = SimpleColumnarEncoder<MaelstromKey>;
780
781        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
782            Ok(SimpleColumnarEncoder::default())
783        }
784
785        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
786            Ok(SimpleColumnarDecoder::new(col))
787        }
788    }
789
790    impl Codec for MaelstromVal {
791        type Storage = ();
792        type Schema = MaelstromValSchema;
793
794        fn codec_name() -> String {
795            "MaelstromVal".into()
796        }
797
798        fn encode<B>(&self, buf: &mut B)
799        where
800            B: bytes::BufMut,
801        {
802            let bytes = serde_json::to_vec(&self.0).expect("failed to encode val");
803            buf.put(bytes.as_slice());
804        }
805
806        fn decode<'a>(buf: &'a [u8], _schema: &MaelstromValSchema) -> Result<Self, String> {
807            Ok(MaelstromVal(
808                serde_json::from_slice(buf).map_err(|err| err.to_string())?,
809            ))
810        }
811
812        fn encode_schema(_schema: &Self::Schema) -> Bytes {
813            Bytes::new()
814        }
815
816        fn decode_schema(buf: &Bytes) -> Self::Schema {
817            assert_eq!(*buf, Bytes::new());
818            MaelstromValSchema
819        }
820    }
821
822    impl SimpleColumnarData for MaelstromVal {
823        type ArrowBuilder = BinaryBuilder;
824        type ArrowColumn = BinaryArray;
825
826        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
827            builder.values_slice().to_byte_slice().len()
828        }
829
830        fn push(&self, builder: &mut Self::ArrowBuilder) {
831            builder.append_value(&self.encode_to_vec());
832        }
833        fn push_null(builder: &mut Self::ArrowBuilder) {
834            builder.append_null()
835        }
836
837        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
838            *self = MaelstromVal::decode(column.value(idx), &MaelstromValSchema)
839                .expect("should be valid MaelstromVal");
840        }
841    }
842
843    #[derive(Debug, PartialEq)]
844    pub struct MaelstromValSchema;
845
846    impl Schema<MaelstromVal> for MaelstromValSchema {
847        type ArrowColumn = BinaryArray;
848        type Statistics = NoneStats;
849
850        type Decoder = SimpleColumnarDecoder<MaelstromVal>;
851        type Encoder = SimpleColumnarEncoder<MaelstromVal>;
852
853        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
854            Ok(SimpleColumnarEncoder::default())
855        }
856
857        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
858            Ok(SimpleColumnarDecoder::new(col))
859        }
860    }
861}