persistcli/maelstrom/
txn_list_append_single.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the Maelstrom txn-list-append workload using a single
11//! persist shard.
12
13use std::collections::BTreeMap;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use async_trait::async_trait;
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::lattice::Lattice;
20use mz_dyncfg::ConfigUpdates;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_persist::cfg::{BlobConfig, ConsensusConfig};
24use mz_persist::location::{Blob, Consensus, ExternalError};
25use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
26use mz_persist_client::async_runtime::IsolatedRuntime;
27use mz_persist_client::cache::StateCache;
28use mz_persist_client::cfg::PersistConfig;
29use mz_persist_client::critical::SinceHandle;
30use mz_persist_client::metrics::Metrics;
31use mz_persist_client::read::{Listen, ListenEvent};
32use mz_persist_client::rpc::PubSubClientConnection;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient, ShardId};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tokio::sync::Mutex;
39use tracing::{debug, info, trace};
40
41use crate::maelstrom::Args;
42use crate::maelstrom::api::{Body, ErrorCode, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
43use crate::maelstrom::node::{Handle, Service};
44use crate::maelstrom::services::{CachingBlob, MaelstromBlob, MaelstromConsensus};
45use crate::maelstrom::txn_list_append_single::codec_impls::{
46    MaelstromKeySchema, MaelstromValSchema,
47};
48
49/// Key of the persist shard used by [Transactor]
50#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
51pub struct MaelstromKey(u64);
52
53/// Val of the persist shard used by [Transactor]
54#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
55pub struct MaelstromVal(Vec<u64>);
56
57/// An implementation of read-write transactions on top of persist
58///
59/// This executes Maelstrom [txn-list-append] transactions. The timestamp
60/// selection is modeled after Materialize's SQL implementation.
61///
62/// [txn-list-append]: https://github.com/jepsen-io/maelstrom/blob/v0.2.1/doc/workloads.md#workload-txn-list-append
63///
64/// A persist shard is maintained that directly represents a key-value map as it
65/// evolves over time. (Our real SQL implementation would likely instead
66/// maintain a WAL of updates to multiple tables.) Each transaction has
67/// associated timestamps:
68///
69/// - `read_ts`: at which the contents of the map are read (inclusive)
70/// - `write_ts`: at which txn mutations (if any) are written
71/// - `expected_upper`: the upper bound of the previous txn
72/// - `new_upper`: the upper bound after this transaction commits, if it does
73///
74/// To keep things simple, `write_ts` is always `read_ts+1`, `expected_upper` is
75/// `antichain[write_ts]`, and `new_upper` is `antichain[write_ts+1]`.
76///
77/// Transactions are "committed" by using `[WriteHandle::compare_and_append]`,
78/// which atomically:
79/// - verifies that `expected_upper` matches
80/// - writes the updates
81/// - advances the upper to `new_upper`.
82///
83/// This guarantees that all transactions are linearized with each txn's
84/// `read_ts` equal to the previous txn's `write_ts`. If two transactions race
85/// by reading at the same `read_ts` and writing at the same `write_ts`, the
86/// `compare_and_append` guarantees that only one of them succeeds and the other
87/// must retry with new timestamps.
88///
89/// Upon first use, the persist shard is initialized by advancing the upper to
90/// `antichain[T::minimum() + 1]`. This allows the first txn to use 0 as the
91/// `read_ts` and 1 as the `write_ts`.
92///
93/// Similarly modeling Materialize, the since of the shard is kept following the
94/// upper. To keep things simple, this is done by a fixed offset. This exercises
95/// persist compaction.
96///
97/// To ensure that both [mz_persist_client::read::ReadHandle::snapshot] and
98/// [mz_persist_client::read::ReadHandle::listen] are exercised, when a txn
99/// reads the state at `read_ts`, it artificially picks an `as_of` timestamp in
100/// `[since, read_ts]` and splits up the read data between snapshot and listen
101/// along this timestamp.
102#[derive(Debug)]
103pub struct Transactor {
104    cads_token: u64,
105    shard_id: ShardId,
106    client: PersistClient,
107    since: SinceHandle<MaelstromKey, MaelstromVal, u64, i64, u64>,
108    write: WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
109
110    read_ts: u64,
111
112    // Keep a long-lived listen, which is incrementally read as we go. Then
113    // assert that it has the same data as the short-lived snapshot+listen in
114    // `read`. This hopefully stresses slightly different parts of the system.
115    long_lived_updates: Vec<(
116        (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
117        u64,
118        i64,
119    )>,
120    long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
121}
122
123impl Transactor {
124    pub async fn new(
125        client: &PersistClient,
126        node_id: NodeId,
127        shard_id: ShardId,
128    ) -> Result<Self, MaelstromError> {
129        let cads_token = node_id
130            .0
131            .trim_start_matches('n')
132            .parse::<u64>()
133            .expect("maelstrom node_id should be n followed by an integer");
134
135        let (mut write, mut read) = client
136            .open(
137                shard_id,
138                Arc::new(MaelstromKeySchema),
139                Arc::new(MaelstromValSchema),
140                Diagnostics::from_purpose("maelstrom long-lived"),
141                true,
142            )
143            .await?;
144        // Use the CONTROLLER_CRITICAL_SINCE id for all nodes so we get coverage
145        // of contending traffic.
146        let since = client
147            .open_critical_since(
148                shard_id,
149                PersistClient::CONTROLLER_CRITICAL_SINCE,
150                Diagnostics::from_purpose("maelstrom since"),
151            )
152            .await?;
153        let read_ts = Self::maybe_init_shard(&mut write).await?;
154
155        let mut long_lived_updates = Vec::new();
156        let as_of = since.since().clone();
157        let mut updates = read
158            .snapshot_and_fetch(as_of.clone())
159            .await
160            .expect("as_of unexpectedly unavailable");
161        long_lived_updates.append(&mut updates);
162        let long_lived_listen = read
163            .listen(as_of.clone())
164            .await
165            .expect("as_of unexpectedly unavailable");
166
167        Ok(Transactor {
168            client: client.clone(),
169            cads_token,
170            shard_id,
171            since,
172            write,
173            read_ts,
174            long_lived_updates,
175            long_lived_listen,
176        })
177    }
178
179    /// Initializes the shard, if it hasn't been already, and returns the read
180    /// timestamp.
181    async fn maybe_init_shard(
182        write: &mut WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
183    ) -> Result<u64, MaelstromError> {
184        debug!("Transactor::maybe_init");
185        const EMPTY_UPDATES: &[((MaelstromKey, MaelstromVal), u64, i64)] = &[];
186        let ts_min = u64::minimum();
187        let initial_upper = Antichain::from_elem(ts_min);
188        let new_upper = Antichain::from_elem(ts_min + 1);
189        let cas_res = write
190            .compare_and_append(EMPTY_UPDATES, initial_upper.clone(), new_upper.clone())
191            .await;
192        let read_ts = match cas_res? {
193            Ok(()) => 0,
194            Err(mismatch) => Self::extract_ts(&mismatch.current)? - 1,
195        };
196        Ok(read_ts)
197    }
198
199    pub async fn transact(
200        &mut self,
201        req_ops: &[ReqTxnOp],
202    ) -> Result<Vec<ResTxnOp>, MaelstromError> {
203        loop {
204            trace!("transact req={:?}", req_ops);
205            let state = self.read().await?;
206            debug!("transact req={:?} state={:?}", req_ops, state);
207            let (writes, res_ops) = Self::eval_txn(&state, req_ops);
208
209            // NB: We do the CaS even if writes is empty, so that read-only txns
210            // are also linearizable.
211            let write_ts = self.read_ts + 1;
212            let updates = writes
213                .into_iter()
214                .map(|(k, v, diff)| ((k, v), write_ts, diff));
215            let expected_upper = Antichain::from_elem(write_ts);
216            let new_upper = Antichain::from_elem(write_ts + 1);
217            let cas_res = self
218                .write
219                .compare_and_append(updates, expected_upper.clone(), new_upper)
220                .await?;
221            match cas_res {
222                Ok(()) => {
223                    self.read_ts = write_ts;
224                    self.advance_since().await?;
225                    return Ok(res_ops);
226                }
227                // We lost the CaS race, try again.
228                Err(mismatch) => {
229                    info!(
230                        "transact lost the CaS race, retrying: {:?} vs {:?}",
231                        mismatch.expected, mismatch.current
232                    );
233                    self.read_ts = Self::extract_ts(&mismatch.current)? - 1;
234                    continue;
235                }
236            }
237        }
238    }
239
240    async fn read_short_lived(
241        &mut self,
242    ) -> Result<
243        (
244            Vec<(
245                (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
246                u64,
247                i64,
248            )>,
249            Antichain<u64>,
250        ),
251        MaelstromError,
252    > {
253        loop {
254            // We're reading as of read_ts, but we can split the read between the
255            // snapshot and listen at any ts in `[since_ts, read_ts]`. Intentionally
256            // pick one that uses a combination of both to get coverage.
257            let as_of = Antichain::from_elem(self.read_ts);
258            let since_ts = Self::extract_ts(self.since.since())?;
259            assert!(self.read_ts >= since_ts, "{} vs {}", self.read_ts, since_ts);
260            let snap_ts = since_ts + (self.read_ts - since_ts) / 2;
261            let snap_as_of = Antichain::from_elem(snap_ts);
262
263            // Intentionally create this from scratch so we get a brand new copy of
264            // state and exercise some more code paths.
265            let mut read = self
266                .client
267                .open_leased_reader(
268                    self.shard_id,
269                    Arc::new(MaelstromKeySchema),
270                    Arc::new(MaelstromValSchema),
271                    Diagnostics::from_purpose("maelstrom short-lived"),
272                    true,
273                )
274                .await
275                .expect("codecs should match");
276
277            if !PartialOrder::less_equal(read.since(), &snap_as_of) {
278                // The desired snapshot as-of is before the actual critical since;
279                // refresh our state and try again.
280                info!(
281                    "read handle since {:?} is not before as_of {}, fetching a new read_ts and trying again",
282                    read.since().elements(),
283                    snap_ts,
284                );
285                self.advance_since().await?;
286
287                let recent_upper = self.write.fetch_recent_upper().await;
288                self.read_ts = Self::extract_ts(recent_upper)? - 1;
289                continue;
290            }
291
292            let updates_res = read.snapshot_and_fetch(snap_as_of.clone()).await;
293            let mut updates = match updates_res {
294                Ok(x) => x,
295                Err(since) => {
296                    // Because we artificially share the same CriticalReaderId
297                    // between nodes, it doesn't quite act like a capability.
298                    // Prod doesn't have this issue, because a new one will
299                    // fence out old ones, but it's done here to stress edge
300                    // cases.
301                    //
302                    // If we did succeed in this read, we'd anyway just find out
303                    // from compare_and_append that our read_ts was out of date,
304                    // so proceed by fetching a new one and trying again. We
305                    // also re-register the since to get an updated since value
306                    // for the snap_ts calculation above.
307                    info!(
308                        "snapshot cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
309                        snap_ts,
310                        since.0.as_option()
311                    );
312                    self.advance_since().await?;
313
314                    let recent_upper = self.write.fetch_recent_upper().await;
315                    self.read_ts = Self::extract_ts(recent_upper)? - 1;
316                    continue;
317                }
318            };
319
320            let listen_res = read.listen(snap_as_of).await;
321            let listen = match listen_res {
322                Ok(x) => x,
323                Err(since) => {
324                    // Because we artificially share the same CriticalReaderId
325                    // between nodes, it doesn't quite act like a capability.
326                    // Prod doesn't have this issue, because a new one will
327                    // fence out old ones, but it's done here to stress edge
328                    // cases.
329                    //
330                    // If we did succeed in this read, we'd anyway just find out
331                    // from compare_and_append that our read_ts was out of date,
332                    // so proceed by fetching a new one and trying again. We
333                    // also re-register the since to get an updated since value
334                    // for the snap_ts calculation above.
335                    info!(
336                        "listen cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
337                        snap_ts,
338                        since.0.as_option(),
339                    );
340                    self.advance_since().await?;
341                    let recent_upper = self.write.fetch_recent_upper().await;
342                    self.read_ts = Self::extract_ts(recent_upper)? - 1;
343                    assert!(
344                        PartialOrder::less_than(self.since.since(), recent_upper),
345                        "invariant: since {:?} should be held behind the recent upper {:?}",
346                        &**self.since.since(),
347                        &**recent_upper
348                    );
349                    continue;
350                }
351            };
352
353            trace!(
354                "read updates from snapshot as_of {}: {:?}",
355                snap_ts, updates
356            );
357            let listen_updates = Self::listen_through(listen, &as_of).await?;
358            trace!(
359                "read updates from listener as_of {} through {}: {:?}",
360                snap_ts, self.read_ts, listen_updates
361            );
362            updates.extend(listen_updates);
363
364            // Compute the contents of the collection as of `as_of`.
365            for (_, t, _) in updates.iter_mut() {
366                t.advance_by(as_of.borrow());
367            }
368            consolidate_updates(&mut updates);
369            return Ok((updates, as_of));
370        }
371    }
372
373    async fn read(&mut self) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
374        let (updates, as_of) = self.read_short_lived().await?;
375
376        let long_lived = self.read_long_lived(&as_of).await;
377        assert_eq!(&updates, &long_lived);
378
379        Self::extract_state_map(self.read_ts, updates)
380    }
381
382    async fn listen_through(
383        mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
384        frontier: &Antichain<u64>,
385    ) -> Result<
386        Vec<(
387            (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
388            u64,
389            i64,
390        )>,
391        ExternalError,
392    > {
393        let mut ret = Vec::new();
394        loop {
395            for event in listen.fetch_next().await {
396                match event {
397                    ListenEvent::Progress(x) => {
398                        // NB: Unlike the snapshot as_of, a listener frontier is
399                        // not inclusive, so we have to wait until it's > our
400                        // as_of to be sure we have everything.
401                        if PartialOrder::less_than(frontier, &x) {
402                            return Ok(ret);
403                        }
404                    }
405                    ListenEvent::Updates(x) => {
406                        // We want the collection at as_of, so skip anything
407                        // past that.
408                        ret.extend(x.into_iter().filter(|(_, ts, _)| !frontier.less_than(ts)));
409                    }
410                }
411            }
412        }
413    }
414
415    async fn read_long_lived(
416        &mut self,
417        as_of: &Antichain<u64>,
418    ) -> Vec<(
419        (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
420        u64,
421        i64,
422    )> {
423        while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
424            for event in self.long_lived_listen.fetch_next().await {
425                match event {
426                    ListenEvent::Updates(mut updates) => {
427                        self.long_lived_updates.append(&mut updates)
428                    }
429                    ListenEvent::Progress(_) => {} // No-op.
430                }
431            }
432        }
433        for (_, t, _) in self.long_lived_updates.iter_mut() {
434            t.advance_by(as_of.borrow());
435        }
436        consolidate_updates(&mut self.long_lived_updates);
437
438        // If as_of is less_than the frontier, we may have ended up with updates
439        // that we didn't want yet. We can't remove them from
440        // `self.long_lived_updates` because the long lived listener will only
441        // emit them once and we'll want them later. If performance was
442        // important, we could sort them to the end and return a subset, but
443        // it's not, so do the easy thing and copy the Vec.
444        self.long_lived_updates
445            .iter()
446            .filter(|(_, t, _)| !as_of.less_than(t))
447            .cloned()
448            .collect()
449    }
450
451    fn extract_state_map(
452        read_ts: u64,
453        updates: Vec<(
454            (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
455            u64,
456            i64,
457        )>,
458    ) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
459        let mut ret = BTreeMap::new();
460        for ((k, v), _, d) in updates {
461            if d != 1 {
462                return Err(MaelstromError {
463                    code: ErrorCode::Crash,
464                    text: format!("invalid read at time {}", read_ts),
465                });
466            }
467            let k = k.map_err(|err| MaelstromError {
468                code: ErrorCode::Crash,
469                text: format!("invalid key {}", err),
470            })?;
471            let v = v.map_err(|err| MaelstromError {
472                code: ErrorCode::Crash,
473                text: format!("invalid val {}", err),
474            })?;
475            if ret.contains_key(&k) {
476                return Err(MaelstromError {
477                    code: ErrorCode::Crash,
478                    text: format!("unexpected duplicate key {:?}", k),
479                });
480            }
481            ret.insert(k, v);
482        }
483        Ok(ret)
484    }
485
486    fn eval_txn(
487        state: &BTreeMap<MaelstromKey, MaelstromVal>,
488        req_ops: &[ReqTxnOp],
489    ) -> (Vec<(MaelstromKey, MaelstromVal, i64)>, Vec<ResTxnOp>) {
490        let mut res_ops = Vec::new();
491        let mut updates = Vec::new();
492        let mut txn_state = BTreeMap::new();
493
494        for req_op in req_ops.iter() {
495            match req_op {
496                ReqTxnOp::Read { key } => {
497                    let current = txn_state
498                        .get(&MaelstromKey(*key))
499                        .or_else(|| state.get(&MaelstromKey(*key)));
500                    let val = current.cloned().unwrap_or_default().0;
501                    res_ops.push(ResTxnOp::Read { key: *key, val })
502                }
503                ReqTxnOp::Append { key, val } => {
504                    let current = txn_state
505                        .get(&MaelstromKey(*key))
506                        .or_else(|| state.get(&MaelstromKey(*key)));
507                    let mut vals = match current {
508                        Some(val) => {
509                            // Retract the value we're about to overwrite.
510                            updates.push((MaelstromKey(*key), val.clone(), -1));
511                            val.clone()
512                        }
513                        None => MaelstromVal::default(),
514                    };
515                    vals.0.push(*val);
516                    txn_state.insert(MaelstromKey(*key), vals.clone());
517                    updates.push((MaelstromKey(*key), vals, 1));
518                    res_ops.push(ResTxnOp::Append {
519                        key: key.clone(),
520                        val: *val,
521                    })
522                }
523            }
524        }
525
526        debug!(
527            "eval_txn\n  req={:?}\n  res={:?}\n  updates={:?}\n  state={:?}\n  txn_state={:?}",
528            req_ops, res_ops, updates, state, txn_state
529        );
530        (updates, res_ops)
531    }
532
533    async fn advance_since(&mut self) -> Result<(), MaelstromError> {
534        // To keep things interesting, advance the since.
535        const SINCE_LAG: u64 = 10;
536        let new_since = Antichain::from_elem(self.read_ts.saturating_sub(SINCE_LAG));
537
538        let mut expected_token = self.cads_token;
539        loop {
540            let res = self
541                .since
542                .maybe_compare_and_downgrade_since(&expected_token, (&self.cads_token, &new_since))
543                .await;
544            match res {
545                Some(Ok(latest_since)) => {
546                    // Success! Another process might have advanced past our read_ts, so
547                    // forward read_ts to since_ts.
548                    let since_ts = Self::extract_ts(&latest_since)?;
549                    if since_ts > self.read_ts {
550                        info!(
551                            "since was last updated by {}, forwarding our read_ts from {} to {}",
552                            expected_token, self.read_ts, since_ts
553                        );
554                        self.read_ts = since_ts;
555                    }
556
557                    return Ok(());
558                }
559                Some(Err(actual_token)) => {
560                    debug!(
561                        "actual downgrade_since token {} didn't match expected {}, retrying",
562                        actual_token, expected_token,
563                    );
564                    expected_token = actual_token;
565                }
566                None => {
567                    panic!("should not no-op `maybe_compare_and_downgrade_since` during testing");
568                }
569            }
570        }
571    }
572
573    fn extract_ts<T: TotalOrder + Copy>(frontier: &Antichain<T>) -> Result<T, MaelstromError> {
574        frontier.as_option().copied().ok_or_else(|| MaelstromError {
575            code: ErrorCode::Crash,
576            text: "shard unexpectedly closed".into(),
577        })
578    }
579}
580
581/// An adaptor to implement [Service] using [Transactor]
582#[derive(Debug)]
583pub struct TransactorService(pub Arc<Mutex<Transactor>>);
584
585#[async_trait]
586impl Service for TransactorService {
587    async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
588        // Use the Maelstrom services to initialize a new random ShardId (so we
589        // can repeatedly run tests against the same Blob and Consensus without
590        // conflicting) and communicate it between processes.
591        let shard_id = handle.maybe_init_shard_id().await?;
592
593        // Make sure the seed is recomputed each time through the retry
594        // closure, so we don't retry the same deterministic timeouts.
595        let seed: u64 = SystemTime::now()
596            .duration_since(UNIX_EPOCH)
597            .unwrap_or_default()
598            .subsec_nanos()
599            .into();
600        // It doesn't particularly matter what we set should_happen to, so we do
601        // this to have a convenient single tunable param.
602        let should_happen = 1.0 - args.unreliability;
603        // For consensus, set should_timeout to `args.unreliability` so that once we split
604        // ExternalErrors into determinate vs indeterminate, then
605        // `args.unreliability` will also be the fraction of txns that it's
606        // not save for Maelstrom to retry (b/c indeterminate error in
607        // Consensus CaS).
608        let should_timeout = args.unreliability;
609        // It doesn't particularly matter what we set should_happen and
610        // should_timeout to for blobs, so use the same handle for both.
611        let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
612
613        let mut config =
614            PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
615        {
616            // We only use the Postgres tuned queries when connected to vanilla
617            // Postgres, so we always want to enable them for testing.
618            let mut updates = ConfigUpdates::default();
619            updates.add(&mz_persist::postgres::USE_POSTGRES_TUNED_QUERIES, true);
620            config.apply_from(&updates);
621        }
622
623        let metrics = Arc::new(Metrics::new(&config, &MetricsRegistry::new()));
624
625        // Construct requested Blob.
626        let blob = match &args.blob_uri {
627            Some(blob_uri) => {
628                let cfg = BlobConfig::try_from(
629                    blob_uri,
630                    Box::new(config.clone()),
631                    metrics.s3_blob.clone(),
632                    Arc::clone(&config.configs),
633                )
634                .await
635                .expect("blob_uri should be valid");
636                loop {
637                    match cfg.clone().open().await {
638                        Ok(x) => break x,
639                        Err(err) => {
640                            info!("failed to open blob, trying again: {}", err);
641                        }
642                    }
643                }
644            }
645            None => MaelstromBlob::new(handle.clone()),
646        };
647        let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
648        // Normal production persist usage (even including a real SQL txn impl)
649        // isn't particularly benefitted by a cache, so we don't have one baked
650        // into persist. In contrast, our Maelstrom transaction model
651        // intentionally exercises both a new snapshot and new listener on each
652        // txn. As a result, without a cache, things would be terribly slow,
653        // unreliable would cause more retries than are interesting, and the
654        // Lamport diagrams that Maelstrom generates would be noisy.
655        let blob = CachingBlob::new(blob);
656        // to simplify some downstream logic (+ a bit more stress testing),
657        // always downgrade the since of critical handles when asked
658        config.critical_downgrade_interval = Duration::from_secs(0);
659        // set a live diff scan limit such that we'll explore both the fast and slow paths
660        config.set_state_versions_recent_live_diffs_limit(5);
661        let consensus = match &args.consensus_uri {
662            Some(consensus_uri) => {
663                let cfg = ConsensusConfig::try_from(
664                    consensus_uri,
665                    Box::new(config.clone()),
666                    metrics.postgres_consensus.clone(),
667                    Arc::clone(&config.configs),
668                )
669                .expect("consensus_uri should be valid");
670                loop {
671                    match cfg.clone().open().await {
672                        Ok(x) => break x,
673                        Err(err) => {
674                            info!("failed to open consensus, trying again: {}", err);
675                        }
676                    }
677                }
678            }
679            None => MaelstromConsensus::new(handle.clone()),
680        };
681        let consensus: Arc<dyn Consensus> =
682            Arc::new(UnreliableConsensus::new(consensus, unreliable));
683
684        // Wire up the TransactorService.
685        let isolated_runtime = Arc::new(IsolatedRuntime::new_for_tests());
686        let pubsub_sender = PubSubClientConnection::noop().sender;
687        let shared_states = Arc::new(StateCache::new(
688            &config,
689            Arc::clone(&metrics),
690            Arc::clone(&pubsub_sender),
691        ));
692        let client = PersistClient::new(
693            config,
694            blob,
695            consensus,
696            metrics,
697            isolated_runtime,
698            shared_states,
699            pubsub_sender,
700        )?;
701        let transactor = Transactor::new(&client, handle.node_id(), shard_id).await?;
702        let service = TransactorService(Arc::new(Mutex::new(transactor)));
703        Ok(service)
704    }
705
706    async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
707        match req {
708            Body::ReqTxn { msg_id, txn } => {
709                let in_reply_to = msg_id;
710                match self.0.lock().await.transact(&txn).await {
711                    Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
712                        msg_id,
713                        in_reply_to,
714                        txn,
715                    }),
716                    Err(MaelstromError { code, text }) => {
717                        handle.send_res(src, |msg_id| Body::Error {
718                            msg_id: Some(msg_id),
719                            in_reply_to,
720                            code,
721                            text,
722                        })
723                    }
724                }
725            }
726            req => unimplemented!("unsupported req: {:?}", req),
727        }
728    }
729}
730
731mod codec_impls {
732    use arrow::array::{BinaryArray, BinaryBuilder, UInt64Array, UInt64Builder};
733    use arrow::datatypes::ToByteSlice;
734    use bytes::Bytes;
735    use mz_persist_types::Codec;
736    use mz_persist_types::codec_impls::{
737        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
738    };
739    use mz_persist_types::columnar::Schema;
740    use mz_persist_types::stats::NoneStats;
741
742    use crate::maelstrom::txn_list_append_single::{MaelstromKey, MaelstromVal};
743
744    impl Codec for MaelstromKey {
745        type Storage = ();
746        type Schema = MaelstromKeySchema;
747
748        fn codec_name() -> String {
749            "MaelstromKey".into()
750        }
751
752        fn encode<B>(&self, buf: &mut B)
753        where
754            B: bytes::BufMut,
755        {
756            let bytes = serde_json::to_vec(&self.0).expect("failed to encode key");
757            buf.put(bytes.as_slice());
758        }
759
760        fn decode<'a>(buf: &'a [u8], _schema: &MaelstromKeySchema) -> Result<Self, String> {
761            Ok(MaelstromKey(
762                serde_json::from_slice(buf).map_err(|err| err.to_string())?,
763            ))
764        }
765
766        fn encode_schema(_schema: &Self::Schema) -> Bytes {
767            Bytes::new()
768        }
769
770        fn decode_schema(buf: &Bytes) -> Self::Schema {
771            assert_eq!(*buf, Bytes::new());
772            MaelstromKeySchema
773        }
774    }
775
776    impl SimpleColumnarData for MaelstromKey {
777        type ArrowBuilder = UInt64Builder;
778        type ArrowColumn = UInt64Array;
779
780        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
781            builder.values_slice().to_byte_slice().len()
782        }
783
784        fn push(&self, builder: &mut Self::ArrowBuilder) {
785            builder.append_value(self.0);
786        }
787        fn push_null(builder: &mut Self::ArrowBuilder) {
788            builder.append_null();
789        }
790
791        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
792            *self = MaelstromKey(column.value(idx));
793        }
794    }
795
796    #[derive(Debug, PartialEq)]
797    pub struct MaelstromKeySchema;
798
799    impl Schema<MaelstromKey> for MaelstromKeySchema {
800        type ArrowColumn = UInt64Array;
801        type Statistics = NoneStats;
802
803        type Decoder = SimpleColumnarDecoder<MaelstromKey>;
804        type Encoder = SimpleColumnarEncoder<MaelstromKey>;
805
806        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
807            Ok(SimpleColumnarEncoder::default())
808        }
809
810        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
811            Ok(SimpleColumnarDecoder::new(col))
812        }
813    }
814
815    impl Codec for MaelstromVal {
816        type Storage = ();
817        type Schema = MaelstromValSchema;
818
819        fn codec_name() -> String {
820            "MaelstromVal".into()
821        }
822
823        fn encode<B>(&self, buf: &mut B)
824        where
825            B: bytes::BufMut,
826        {
827            let bytes = serde_json::to_vec(&self.0).expect("failed to encode val");
828            buf.put(bytes.as_slice());
829        }
830
831        fn decode<'a>(buf: &'a [u8], _schema: &MaelstromValSchema) -> Result<Self, String> {
832            Ok(MaelstromVal(
833                serde_json::from_slice(buf).map_err(|err| err.to_string())?,
834            ))
835        }
836
837        fn encode_schema(_schema: &Self::Schema) -> Bytes {
838            Bytes::new()
839        }
840
841        fn decode_schema(buf: &Bytes) -> Self::Schema {
842            assert_eq!(*buf, Bytes::new());
843            MaelstromValSchema
844        }
845    }
846
847    impl SimpleColumnarData for MaelstromVal {
848        type ArrowBuilder = BinaryBuilder;
849        type ArrowColumn = BinaryArray;
850
851        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
852            builder.values_slice().to_byte_slice().len()
853        }
854
855        fn push(&self, builder: &mut Self::ArrowBuilder) {
856            builder.append_value(&self.encode_to_vec());
857        }
858        fn push_null(builder: &mut Self::ArrowBuilder) {
859            builder.append_null()
860        }
861
862        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
863            *self = MaelstromVal::decode(column.value(idx), &MaelstromValSchema)
864                .expect("should be valid MaelstromVal");
865        }
866    }
867
868    #[derive(Debug, PartialEq)]
869    pub struct MaelstromValSchema;
870
871    impl Schema<MaelstromVal> for MaelstromValSchema {
872        type ArrowColumn = BinaryArray;
873        type Statistics = NoneStats;
874
875        type Decoder = SimpleColumnarDecoder<MaelstromVal>;
876        type Encoder = SimpleColumnarEncoder<MaelstromVal>;
877
878        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
879            Ok(SimpleColumnarEncoder::default())
880        }
881
882        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
883            Ok(SimpleColumnarDecoder::new(col))
884        }
885    }
886}