persistcli/maelstrom/
txn_list_append_single.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the Maelstrom txn-list-append workload using a single
11//! persist shard.
12
13use std::collections::BTreeMap;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use async_trait::async_trait;
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::lattice::Lattice;
20use mz_dyncfg::ConfigUpdates;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_persist::cfg::{BlobConfig, ConsensusConfig};
24use mz_persist::location::{Blob, Consensus, ExternalError};
25use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
26use mz_persist_client::async_runtime::IsolatedRuntime;
27use mz_persist_client::cache::StateCache;
28use mz_persist_client::cfg::PersistConfig;
29use mz_persist_client::critical::SinceHandle;
30use mz_persist_client::metrics::Metrics;
31use mz_persist_client::read::{Listen, ListenEvent};
32use mz_persist_client::rpc::PubSubClientConnection;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient, ShardId};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tokio::sync::Mutex;
39use tracing::{debug, info, trace};
40
41use crate::maelstrom::Args;
42use crate::maelstrom::api::{Body, ErrorCode, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
43use crate::maelstrom::node::{Handle, Service};
44use crate::maelstrom::services::{CachingBlob, MaelstromBlob, MaelstromConsensus};
45use crate::maelstrom::txn_list_append_single::codec_impls::{
46    MaelstromKeySchema, MaelstromValSchema,
47};
48
49/// Key of the persist shard used by [Transactor]
50#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
51pub struct MaelstromKey(u64);
52
53/// Val of the persist shard used by [Transactor]
54#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
55pub struct MaelstromVal(Vec<u64>);
56
57/// An implementation of read-write transactions on top of persist
58///
59/// This executes Maelstrom [txn-list-append] transactions. The timestamp
60/// selection is modeled after Materialize's SQL implementation.
61///
62/// [txn-list-append]: https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/workloads.md#workload-txn-list-append
63///
64/// A persist shard is maintained that directly represents a key-value map as it
65/// evolves over time. (Our real SQL implementation would likely instead
66/// maintain a WAL of updates to multiple tables.) Each transaction has
67/// associated timestamps:
68///
69/// - `read_ts`: at which the contents of the map are read (inclusive)
70/// - `write_ts`: at which txn mutations (if any) are written
71/// - `expected_upper`: the upper bound of the previous txn
72/// - `new_upper`: the upper bound after this transaction commits, if it does
73///
74/// To keep things simple, `write_ts` is always `read_ts+1`, `expected_upper` is
75/// `antichain[write_ts]`, and `new_upper` is `antichain[write_ts+1]`.
76///
77/// Transactions are "committed" by using `[WriteHandle::compare_and_append]`,
78/// which atomically:
79/// - verifies that `expected_upper` matches
80/// - writes the updates
81/// - advances the upper to `new_upper`.
82///
83/// This guarantees that all transactions are linearized with each txn's
84/// `read_ts` equal to the previous txn's `write_ts`. If two transactions race
85/// by reading at the same `read_ts` and writing at the same `write_ts`, the
86/// `compare_and_append` guarantees that only one of them succeeds and the other
87/// must retry with new timestamps.
88///
89/// Upon first use, the persist shard is initialized by advancing the upper to
90/// `antichain[T::minimum() + 1]`. This allows the first txn to use 0 as the
91/// `read_ts` and 1 as the `write_ts`.
92///
93/// Similarly modeling Materialize, the since of the shard is kept following the
94/// upper. To keep things simple, this is done by a fixed offset. This exercises
95/// persist compaction.
96///
97/// To ensure that both [mz_persist_client::read::ReadHandle::snapshot] and
98/// [mz_persist_client::read::ReadHandle::listen] are exercised, when a txn
99/// reads the state at `read_ts`, it artificially picks an `as_of` timestamp in
100/// `[since, read_ts]` and splits up the read data between snapshot and listen
101/// along this timestamp.
102#[derive(Debug)]
103pub struct Transactor {
104    cads_token: u64,
105    shard_id: ShardId,
106    client: PersistClient,
107    since: SinceHandle<MaelstromKey, MaelstromVal, u64, i64, u64>,
108    write: WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
109
110    read_ts: u64,
111
112    // Keep a long-lived listen, which is incrementally read as we go. Then
113    // assert that it has the same data as the short-lived snapshot+listen in
114    // `read`. This hopefully stresses slightly different parts of the system.
115    long_lived_updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
116    long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
117}
118
119impl Transactor {
120    pub async fn new(
121        client: &PersistClient,
122        node_id: NodeId,
123        shard_id: ShardId,
124    ) -> Result<Self, MaelstromError> {
125        let cads_token = node_id
126            .0
127            .trim_start_matches('n')
128            .parse::<u64>()
129            .expect("maelstrom node_id should be n followed by an integer");
130
131        let (mut write, mut read) = client
132            .open(
133                shard_id,
134                Arc::new(MaelstromKeySchema),
135                Arc::new(MaelstromValSchema),
136                Diagnostics::from_purpose("maelstrom long-lived"),
137                true,
138            )
139            .await?;
140        // Use the CONTROLLER_CRITICAL_SINCE id for all nodes so we get coverage
141        // of contending traffic.
142        let since = client
143            .open_critical_since(
144                shard_id,
145                PersistClient::CONTROLLER_CRITICAL_SINCE,
146                Diagnostics::from_purpose("maelstrom since"),
147            )
148            .await?;
149        let read_ts = Self::maybe_init_shard(&mut write).await?;
150
151        let mut long_lived_updates = Vec::new();
152        let as_of = since.since().clone();
153        let mut updates = read
154            .snapshot_and_fetch(as_of.clone())
155            .await
156            .expect("as_of unexpectedly unavailable");
157        long_lived_updates.append(&mut updates);
158        let long_lived_listen = read
159            .listen(as_of.clone())
160            .await
161            .expect("as_of unexpectedly unavailable");
162
163        Ok(Transactor {
164            client: client.clone(),
165            cads_token,
166            shard_id,
167            since,
168            write,
169            read_ts,
170            long_lived_updates,
171            long_lived_listen,
172        })
173    }
174
175    /// Initializes the shard, if it hasn't been already, and returns the read
176    /// timestamp.
177    async fn maybe_init_shard(
178        write: &mut WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
179    ) -> Result<u64, MaelstromError> {
180        debug!("Transactor::maybe_init");
181        const EMPTY_UPDATES: &[((MaelstromKey, MaelstromVal), u64, i64)] = &[];
182        let ts_min = u64::minimum();
183        let initial_upper = Antichain::from_elem(ts_min);
184        let new_upper = Antichain::from_elem(ts_min + 1);
185        let cas_res = write
186            .compare_and_append(EMPTY_UPDATES, initial_upper.clone(), new_upper.clone())
187            .await;
188        let read_ts = match cas_res? {
189            Ok(()) => 0,
190            Err(mismatch) => Self::extract_ts(&mismatch.current)? - 1,
191        };
192        Ok(read_ts)
193    }
194
195    pub async fn transact(
196        &mut self,
197        req_ops: &[ReqTxnOp],
198    ) -> Result<Vec<ResTxnOp>, MaelstromError> {
199        loop {
200            trace!("transact req={:?}", req_ops);
201            let state = self.read().await?;
202            debug!("transact req={:?} state={:?}", req_ops, state);
203            let (writes, res_ops) = Self::eval_txn(&state, req_ops);
204
205            // NB: We do the CaS even if writes is empty, so that read-only txns
206            // are also linearizable.
207            let write_ts = self.read_ts + 1;
208            let updates = writes
209                .into_iter()
210                .map(|(k, v, diff)| ((k, v), write_ts, diff));
211            let expected_upper = Antichain::from_elem(write_ts);
212            let new_upper = Antichain::from_elem(write_ts + 1);
213            let cas_res = self
214                .write
215                .compare_and_append(updates, expected_upper.clone(), new_upper)
216                .await?;
217            match cas_res {
218                Ok(()) => {
219                    self.read_ts = write_ts;
220                    self.advance_since().await?;
221                    return Ok(res_ops);
222                }
223                // We lost the CaS race, try again.
224                Err(mismatch) => {
225                    info!(
226                        "transact lost the CaS race, retrying: {:?} vs {:?}",
227                        mismatch.expected, mismatch.current
228                    );
229                    self.read_ts = Self::extract_ts(&mismatch.current)? - 1;
230                    continue;
231                }
232            }
233        }
234    }
235
236    async fn read_short_lived(
237        &mut self,
238    ) -> Result<
239        (
240            Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
241            Antichain<u64>,
242        ),
243        MaelstromError,
244    > {
245        loop {
246            // We're reading as of read_ts, but we can split the read between the
247            // snapshot and listen at any ts in `[since_ts, read_ts]`. Intentionally
248            // pick one that uses a combination of both to get coverage.
249            let as_of = Antichain::from_elem(self.read_ts);
250            let since_ts = Self::extract_ts(self.since.since())?;
251            assert!(self.read_ts >= since_ts, "{} vs {}", self.read_ts, since_ts);
252            let snap_ts = since_ts + (self.read_ts - since_ts) / 2;
253            let snap_as_of = Antichain::from_elem(snap_ts);
254
255            // Intentionally create this from scratch so we get a brand new copy of
256            // state and exercise some more code paths.
257            let mut read = self
258                .client
259                .open_leased_reader(
260                    self.shard_id,
261                    Arc::new(MaelstromKeySchema),
262                    Arc::new(MaelstromValSchema),
263                    Diagnostics::from_purpose("maelstrom short-lived"),
264                    true,
265                )
266                .await
267                .expect("codecs should match");
268
269            if !PartialOrder::less_equal(read.since(), &snap_as_of) {
270                // The desired snapshot as-of is before the actual critical since;
271                // refresh our state and try again.
272                info!(
273                    "read handle since {:?} is not before as_of {}, fetching a new read_ts and trying again",
274                    read.since().elements(),
275                    snap_ts,
276                );
277                self.advance_since().await?;
278
279                let recent_upper = self.write.fetch_recent_upper().await;
280                self.read_ts = Self::extract_ts(recent_upper)? - 1;
281                continue;
282            }
283
284            let updates_res = read.snapshot_and_fetch(snap_as_of.clone()).await;
285            let mut updates = match updates_res {
286                Ok(x) => x,
287                Err(since) => {
288                    // Because we artificially share the same CriticalReaderId
289                    // between nodes, it doesn't quite act like a capability.
290                    // Prod doesn't have this issue, because a new one will
291                    // fence out old ones, but it's done here to stress edge
292                    // cases.
293                    //
294                    // If we did succeed in this read, we'd anyway just find out
295                    // from compare_and_append that our read_ts was out of date,
296                    // so proceed by fetching a new one and trying again. We
297                    // also re-register the since to get an updated since value
298                    // for the snap_ts calculation above.
299                    info!(
300                        "snapshot cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
301                        snap_ts,
302                        since.0.as_option()
303                    );
304                    self.advance_since().await?;
305
306                    let recent_upper = self.write.fetch_recent_upper().await;
307                    self.read_ts = Self::extract_ts(recent_upper)? - 1;
308                    continue;
309                }
310            };
311
312            let listen_res = read.listen(snap_as_of).await;
313            let listen = match listen_res {
314                Ok(x) => x,
315                Err(since) => {
316                    // Because we artificially share the same CriticalReaderId
317                    // between nodes, it doesn't quite act like a capability.
318                    // Prod doesn't have this issue, because a new one will
319                    // fence out old ones, but it's done here to stress edge
320                    // cases.
321                    //
322                    // If we did succeed in this read, we'd anyway just find out
323                    // from compare_and_append that our read_ts was out of date,
324                    // so proceed by fetching a new one and trying again. We
325                    // also re-register the since to get an updated since value
326                    // for the snap_ts calculation above.
327                    info!(
328                        "listen cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
329                        snap_ts,
330                        since.0.as_option(),
331                    );
332                    self.advance_since().await?;
333                    let recent_upper = self.write.fetch_recent_upper().await;
334                    self.read_ts = Self::extract_ts(recent_upper)? - 1;
335                    assert!(
336                        PartialOrder::less_than(self.since.since(), recent_upper),
337                        "invariant: since {:?} should be held behind the recent upper {:?}",
338                        &**self.since.since(),
339                        &**recent_upper
340                    );
341                    continue;
342                }
343            };
344
345            trace!(
346                "read updates from snapshot as_of {}: {:?}",
347                snap_ts, updates
348            );
349            let listen_updates = Self::listen_through(listen, &as_of).await?;
350            trace!(
351                "read updates from listener as_of {} through {}: {:?}",
352                snap_ts, self.read_ts, listen_updates
353            );
354            updates.extend(listen_updates);
355
356            // Compute the contents of the collection as of `as_of`.
357            for (_, t, _) in updates.iter_mut() {
358                t.advance_by(as_of.borrow());
359            }
360            consolidate_updates(&mut updates);
361            return Ok((updates, as_of));
362        }
363    }
364
365    async fn read(&mut self) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
366        let (updates, as_of) = self.read_short_lived().await?;
367
368        let long_lived = self.read_long_lived(&as_of).await;
369        assert_eq!(&updates, &long_lived);
370
371        Self::extract_state_map(self.read_ts, updates)
372    }
373
374    async fn listen_through(
375        mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
376        frontier: &Antichain<u64>,
377    ) -> Result<Vec<((MaelstromKey, MaelstromVal), u64, i64)>, ExternalError> {
378        let mut ret = Vec::new();
379        loop {
380            for event in listen.fetch_next().await {
381                match event {
382                    ListenEvent::Progress(x) => {
383                        // NB: Unlike the snapshot as_of, a listener frontier is
384                        // not inclusive, so we have to wait until it's > our
385                        // as_of to be sure we have everything.
386                        if PartialOrder::less_than(frontier, &x) {
387                            return Ok(ret);
388                        }
389                    }
390                    ListenEvent::Updates(x) => {
391                        // We want the collection at as_of, so skip anything
392                        // past that.
393                        ret.extend(x.into_iter().filter(|(_, ts, _)| !frontier.less_than(ts)));
394                    }
395                }
396            }
397        }
398    }
399
400    async fn read_long_lived(
401        &mut self,
402        as_of: &Antichain<u64>,
403    ) -> Vec<((MaelstromKey, MaelstromVal), u64, i64)> {
404        while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
405            for event in self.long_lived_listen.fetch_next().await {
406                match event {
407                    ListenEvent::Updates(mut updates) => {
408                        self.long_lived_updates.append(&mut updates)
409                    }
410                    ListenEvent::Progress(_) => {} // No-op.
411                }
412            }
413        }
414        for (_, t, _) in self.long_lived_updates.iter_mut() {
415            t.advance_by(as_of.borrow());
416        }
417        consolidate_updates(&mut self.long_lived_updates);
418
419        // If as_of is less_than the frontier, we may have ended up with updates
420        // that we didn't want yet. We can't remove them from
421        // `self.long_lived_updates` because the long lived listener will only
422        // emit them once and we'll want them later. If performance was
423        // important, we could sort them to the end and return a subset, but
424        // it's not, so do the easy thing and copy the Vec.
425        self.long_lived_updates
426            .iter()
427            .filter(|(_, t, _)| !as_of.less_than(t))
428            .cloned()
429            .collect()
430    }
431
432    fn extract_state_map(
433        read_ts: u64,
434        updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
435    ) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
436        let mut ret = BTreeMap::new();
437        for ((k, v), _, d) in updates {
438            if d != 1 {
439                return Err(MaelstromError {
440                    code: ErrorCode::Crash,
441                    text: format!("invalid read at time {}", read_ts),
442                });
443            }
444            if ret.contains_key(&k) {
445                return Err(MaelstromError {
446                    code: ErrorCode::Crash,
447                    text: format!("unexpected duplicate key {:?}", k),
448                });
449            }
450            ret.insert(k, v);
451        }
452        Ok(ret)
453    }
454
455    fn eval_txn(
456        state: &BTreeMap<MaelstromKey, MaelstromVal>,
457        req_ops: &[ReqTxnOp],
458    ) -> (Vec<(MaelstromKey, MaelstromVal, i64)>, Vec<ResTxnOp>) {
459        let mut res_ops = Vec::new();
460        let mut updates = Vec::new();
461        let mut txn_state = BTreeMap::new();
462
463        for req_op in req_ops.iter() {
464            match req_op {
465                ReqTxnOp::Read { key } => {
466                    let current = txn_state
467                        .get(&MaelstromKey(*key))
468                        .or_else(|| state.get(&MaelstromKey(*key)));
469                    let val = current.cloned().unwrap_or_default().0;
470                    res_ops.push(ResTxnOp::Read { key: *key, val })
471                }
472                ReqTxnOp::Append { key, val } => {
473                    let current = txn_state
474                        .get(&MaelstromKey(*key))
475                        .or_else(|| state.get(&MaelstromKey(*key)));
476                    let mut vals = match current {
477                        Some(val) => {
478                            // Retract the value we're about to overwrite.
479                            updates.push((MaelstromKey(*key), val.clone(), -1));
480                            val.clone()
481                        }
482                        None => MaelstromVal::default(),
483                    };
484                    vals.0.push(*val);
485                    txn_state.insert(MaelstromKey(*key), vals.clone());
486                    updates.push((MaelstromKey(*key), vals, 1));
487                    res_ops.push(ResTxnOp::Append {
488                        key: key.clone(),
489                        val: *val,
490                    })
491                }
492            }
493        }
494
495        debug!(
496            "eval_txn\n  req={:?}\n  res={:?}\n  updates={:?}\n  state={:?}\n  txn_state={:?}",
497            req_ops, res_ops, updates, state, txn_state
498        );
499        (updates, res_ops)
500    }
501
502    async fn advance_since(&mut self) -> Result<(), MaelstromError> {
503        // To keep things interesting, advance the since.
504        const SINCE_LAG: u64 = 10;
505        let new_since = Antichain::from_elem(self.read_ts.saturating_sub(SINCE_LAG));
506
507        let mut expected_token = self.cads_token;
508        loop {
509            let res = self
510                .since
511                .maybe_compare_and_downgrade_since(&expected_token, (&self.cads_token, &new_since))
512                .await;
513            match res {
514                Some(Ok(latest_since)) => {
515                    // Success! Another process might have advanced past our read_ts, so
516                    // forward read_ts to since_ts.
517                    let since_ts = Self::extract_ts(&latest_since)?;
518                    if since_ts > self.read_ts {
519                        info!(
520                            "since was last updated by {}, forwarding our read_ts from {} to {}",
521                            expected_token, self.read_ts, since_ts
522                        );
523                        self.read_ts = since_ts;
524                    }
525
526                    return Ok(());
527                }
528                Some(Err(actual_token)) => {
529                    debug!(
530                        "actual downgrade_since token {} didn't match expected {}, retrying",
531                        actual_token, expected_token,
532                    );
533                    expected_token = actual_token;
534                }
535                None => {
536                    panic!("should not no-op `maybe_compare_and_downgrade_since` during testing");
537                }
538            }
539        }
540    }
541
542    fn extract_ts<T: TotalOrder + Copy>(frontier: &Antichain<T>) -> Result<T, MaelstromError> {
543        frontier.as_option().copied().ok_or_else(|| MaelstromError {
544            code: ErrorCode::Crash,
545            text: "shard unexpectedly closed".into(),
546        })
547    }
548}
549
550/// An adaptor to implement [Service] using [Transactor]
551#[derive(Debug)]
552pub struct TransactorService(pub Arc<Mutex<Transactor>>);
553
554#[async_trait]
555impl Service for TransactorService {
556    async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
557        // Use the Maelstrom services to initialize a new random ShardId (so we
558        // can repeatedly run tests against the same Blob and Consensus without
559        // conflicting) and communicate it between processes.
560        let shard_id = handle.maybe_init_shard_id().await?;
561
562        // Make sure the seed is recomputed each time through the retry
563        // closure, so we don't retry the same deterministic timeouts.
564        let seed: u64 = SystemTime::now()
565            .duration_since(UNIX_EPOCH)
566            .unwrap_or_default()
567            .subsec_nanos()
568            .into();
569        // It doesn't particularly matter what we set should_happen to, so we do
570        // this to have a convenient single tunable param.
571        let should_happen = 1.0 - args.unreliability;
572        // For consensus, set should_timeout to `args.unreliability` so that once we split
573        // ExternalErrors into determinate vs indeterminate, then
574        // `args.unreliability` will also be the fraction of txns that it's
575        // not save for Maelstrom to retry (b/c indeterminate error in
576        // Consensus CaS).
577        let should_timeout = args.unreliability;
578        // It doesn't particularly matter what we set should_happen and
579        // should_timeout to for blobs, so use the same handle for both.
580        let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
581
582        let mut config =
583            PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
584        {
585            // We only use the Postgres tuned queries when connected to vanilla
586            // Postgres, so we always want to enable them for testing.
587            let mut updates = ConfigUpdates::default();
588            updates.add(&mz_persist::postgres::USE_POSTGRES_TUNED_QUERIES, true);
589            config.apply_from(&updates);
590        }
591
592        let metrics = Arc::new(Metrics::new(&config, &MetricsRegistry::new()));
593
594        // Construct requested Blob.
595        let blob = match &args.blob_uri {
596            Some(blob_uri) => {
597                let cfg = BlobConfig::try_from(
598                    blob_uri,
599                    Box::new(config.clone()),
600                    metrics.s3_blob.clone(),
601                    Arc::clone(&config.configs),
602                )
603                .await
604                .expect("blob_uri should be valid");
605                loop {
606                    match cfg.clone().open().await {
607                        Ok(x) => break x,
608                        Err(err) => {
609                            info!("failed to open blob, trying again: {}", err);
610                        }
611                    }
612                }
613            }
614            None => MaelstromBlob::new(handle.clone()),
615        };
616        let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
617        // Normal production persist usage (even including a real SQL txn impl)
618        // isn't particularly benefitted by a cache, so we don't have one baked
619        // into persist. In contrast, our Maelstrom transaction model
620        // intentionally exercises both a new snapshot and new listener on each
621        // txn. As a result, without a cache, things would be terribly slow,
622        // unreliable would cause more retries than are interesting, and the
623        // Lamport diagrams that Maelstrom generates would be noisy.
624        let blob = CachingBlob::new(blob);
625        // to simplify some downstream logic (+ a bit more stress testing),
626        // always downgrade the since of critical handles when asked
627        config.critical_downgrade_interval = Duration::from_secs(0);
628        // set a live diff scan limit such that we'll explore both the fast and slow paths
629        config.set_state_versions_recent_live_diffs_limit(5);
630        let consensus = match &args.consensus_uri {
631            Some(consensus_uri) => {
632                let cfg = ConsensusConfig::try_from(
633                    consensus_uri,
634                    Box::new(config.clone()),
635                    metrics.postgres_consensus.clone(),
636                    Arc::clone(&config.configs),
637                )
638                .expect("consensus_uri should be valid");
639                loop {
640                    match cfg.clone().open().await {
641                        Ok(x) => break x,
642                        Err(err) => {
643                            info!("failed to open consensus, trying again: {}", err);
644                        }
645                    }
646                }
647            }
648            None => MaelstromConsensus::new(handle.clone()),
649        };
650        let consensus: Arc<dyn Consensus> =
651            Arc::new(UnreliableConsensus::new(consensus, unreliable));
652
653        // Wire up the TransactorService.
654        let isolated_runtime = Arc::new(IsolatedRuntime::new_for_tests());
655        let pubsub_sender = PubSubClientConnection::noop().sender;
656        let shared_states = Arc::new(StateCache::new(
657            &config,
658            Arc::clone(&metrics),
659            Arc::clone(&pubsub_sender),
660        ));
661        let client = PersistClient::new(
662            config,
663            blob,
664            consensus,
665            metrics,
666            isolated_runtime,
667            shared_states,
668            pubsub_sender,
669        )?;
670        let transactor = Transactor::new(&client, handle.node_id(), shard_id).await?;
671        let service = TransactorService(Arc::new(Mutex::new(transactor)));
672        Ok(service)
673    }
674
675    async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
676        match req {
677            Body::ReqTxn { msg_id, txn } => {
678                let in_reply_to = msg_id;
679                match self.0.lock().await.transact(&txn).await {
680                    Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
681                        msg_id,
682                        in_reply_to,
683                        txn,
684                    }),
685                    Err(MaelstromError { code, text }) => {
686                        handle.send_res(src, |msg_id| Body::Error {
687                            msg_id: Some(msg_id),
688                            in_reply_to,
689                            code,
690                            text,
691                        })
692                    }
693                }
694            }
695            req => unimplemented!("unsupported req: {:?}", req),
696        }
697    }
698}
699
700mod codec_impls {
701    use arrow::array::{BinaryArray, BinaryBuilder, UInt64Array, UInt64Builder};
702    use arrow::datatypes::ToByteSlice;
703    use bytes::Bytes;
704    use mz_persist_types::Codec;
705    use mz_persist_types::codec_impls::{
706        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
707    };
708    use mz_persist_types::columnar::Schema;
709    use mz_persist_types::stats::NoneStats;
710
711    use crate::maelstrom::txn_list_append_single::{MaelstromKey, MaelstromVal};
712
713    impl Codec for MaelstromKey {
714        type Storage = ();
715        type Schema = MaelstromKeySchema;
716
717        fn codec_name() -> String {
718            "MaelstromKey".into()
719        }
720
721        fn encode<B>(&self, buf: &mut B)
722        where
723            B: bytes::BufMut,
724        {
725            let bytes = serde_json::to_vec(&self.0).expect("failed to encode key");
726            buf.put(bytes.as_slice());
727        }
728
729        fn decode<'a>(buf: &'a [u8], _schema: &MaelstromKeySchema) -> Result<Self, String> {
730            Ok(MaelstromKey(
731                serde_json::from_slice(buf).map_err(|err| err.to_string())?,
732            ))
733        }
734
735        fn encode_schema(_schema: &Self::Schema) -> Bytes {
736            Bytes::new()
737        }
738
739        fn decode_schema(buf: &Bytes) -> Self::Schema {
740            assert_eq!(*buf, Bytes::new());
741            MaelstromKeySchema
742        }
743    }
744
745    impl SimpleColumnarData for MaelstromKey {
746        type ArrowBuilder = UInt64Builder;
747        type ArrowColumn = UInt64Array;
748
749        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
750            builder.values_slice().to_byte_slice().len()
751        }
752
753        fn push(&self, builder: &mut Self::ArrowBuilder) {
754            builder.append_value(self.0);
755        }
756        fn push_null(builder: &mut Self::ArrowBuilder) {
757            builder.append_null();
758        }
759
760        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
761            *self = MaelstromKey(column.value(idx));
762        }
763    }
764
765    #[derive(Debug, PartialEq)]
766    pub struct MaelstromKeySchema;
767
768    impl Schema<MaelstromKey> for MaelstromKeySchema {
769        type ArrowColumn = UInt64Array;
770        type Statistics = NoneStats;
771
772        type Decoder = SimpleColumnarDecoder<MaelstromKey>;
773        type Encoder = SimpleColumnarEncoder<MaelstromKey>;
774
775        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
776            Ok(SimpleColumnarEncoder::default())
777        }
778
779        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
780            Ok(SimpleColumnarDecoder::new(col))
781        }
782    }
783
784    impl Codec for MaelstromVal {
785        type Storage = ();
786        type Schema = MaelstromValSchema;
787
788        fn codec_name() -> String {
789            "MaelstromVal".into()
790        }
791
792        fn encode<B>(&self, buf: &mut B)
793        where
794            B: bytes::BufMut,
795        {
796            let bytes = serde_json::to_vec(&self.0).expect("failed to encode val");
797            buf.put(bytes.as_slice());
798        }
799
800        fn decode<'a>(buf: &'a [u8], _schema: &MaelstromValSchema) -> Result<Self, String> {
801            Ok(MaelstromVal(
802                serde_json::from_slice(buf).map_err(|err| err.to_string())?,
803            ))
804        }
805
806        fn encode_schema(_schema: &Self::Schema) -> Bytes {
807            Bytes::new()
808        }
809
810        fn decode_schema(buf: &Bytes) -> Self::Schema {
811            assert_eq!(*buf, Bytes::new());
812            MaelstromValSchema
813        }
814    }
815
816    impl SimpleColumnarData for MaelstromVal {
817        type ArrowBuilder = BinaryBuilder;
818        type ArrowColumn = BinaryArray;
819
820        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
821            builder.values_slice().to_byte_slice().len()
822        }
823
824        fn push(&self, builder: &mut Self::ArrowBuilder) {
825            builder.append_value(&self.encode_to_vec());
826        }
827        fn push_null(builder: &mut Self::ArrowBuilder) {
828            builder.append_null()
829        }
830
831        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
832            *self = MaelstromVal::decode(column.value(idx), &MaelstromValSchema)
833                .expect("should be valid MaelstromVal");
834        }
835    }
836
837    #[derive(Debug, PartialEq)]
838    pub struct MaelstromValSchema;
839
840    impl Schema<MaelstromVal> for MaelstromValSchema {
841        type ArrowColumn = BinaryArray;
842        type Statistics = NoneStats;
843
844        type Decoder = SimpleColumnarDecoder<MaelstromVal>;
845        type Encoder = SimpleColumnarEncoder<MaelstromVal>;
846
847        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
848            Ok(SimpleColumnarEncoder::default())
849        }
850
851        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
852            Ok(SimpleColumnarDecoder::new(col))
853        }
854    }
855}