persistcli/maelstrom/
txn_list_append_multi.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the Maelstrom txn-list-append workload using the
11//! multi-shard txn abstraction.
12
13use std::collections::btree_map::Entry;
14use std::collections::hash_map::DefaultHasher;
15use std::collections::{BTreeMap, BTreeSet};
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use async_trait::async_trait;
21use differential_dataflow::consolidation::consolidate_updates;
22use mz_dyncfg::ConfigUpdates;
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::{NOW_ZERO, SYSTEM_TIME};
25use mz_persist::cfg::{BlobConfig, ConsensusConfig};
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
28use mz_persist_client::async_runtime::IsolatedRuntime;
29use mz_persist_client::cache::StateCache;
30use mz_persist_client::cfg::PersistConfig;
31use mz_persist_client::metrics::Metrics as PersistMetrics;
32use mz_persist_client::read::ReadHandle;
33use mz_persist_client::rpc::PubSubClientConnection;
34use mz_persist_client::{Diagnostics, PersistClient, ShardId};
35use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
36use mz_timestamp_oracle::TimestampOracle;
37use mz_timestamp_oracle::postgres_oracle::{
38    PostgresTimestampOracle, PostgresTimestampOracleConfig,
39};
40use mz_txn_wal::metrics::Metrics as TxnMetrics;
41use mz_txn_wal::operator::DataSubscribeTask;
42use mz_txn_wal::txns::{Tidy, TxnsHandle};
43use timely::progress::Timestamp;
44use tokio::sync::Mutex;
45use tracing::{debug, info};
46
47use crate::maelstrom::Args;
48use crate::maelstrom::api::{Body, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
49use crate::maelstrom::node::{Handle, Service};
50use crate::maelstrom::services::{
51    CachingBlob, MaelstromBlob, MaelstromConsensus, MemTimestampOracle,
52};
53
54#[derive(Debug)]
55pub struct Transactor {
56    txns_id: ShardId,
57    oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
58    client: PersistClient,
59    txns: TxnsHandle<String, (), u64, i64>,
60    tidy: Tidy,
61    data_reads: BTreeMap<ShardId, (u64, ReadHandle<String, (), u64, i64>)>,
62    peeks: BTreeMap<ShardId, DataSubscribeTask>,
63}
64
65impl Transactor {
66    pub async fn new(
67        client: PersistClient,
68        txns_id: ShardId,
69        oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
70    ) -> Result<Self, MaelstromError> {
71        let init_ts = u64::from(oracle.write_ts().await.timestamp);
72        let txns = TxnsHandle::open(
73            init_ts,
74            client.clone(),
75            mz_txn_wal::all_dyncfgs(client.dyncfgs().clone()),
76            Arc::new(TxnMetrics::new(&MetricsRegistry::new())),
77            txns_id,
78        )
79        .await;
80        oracle.apply_write(init_ts.into()).await;
81        Ok(Transactor {
82            txns_id,
83            oracle,
84            txns,
85            tidy: Tidy::default(),
86            client,
87            data_reads: BTreeMap::default(),
88            peeks: BTreeMap::default(),
89        })
90    }
91
92    pub async fn transact(
93        &mut self,
94        req_ops: &[ReqTxnOp],
95    ) -> Result<Vec<ResTxnOp>, MaelstromError> {
96        let mut read_ids = Vec::new();
97        let mut writes = BTreeMap::<ShardId, Vec<(String, i64)>>::new();
98        for op in req_ops {
99            match op {
100                ReqTxnOp::Read { key } => {
101                    read_ids.push(self.key_shard(*key));
102                }
103                ReqTxnOp::Append { key, val } => writes
104                    .entry(self.key_shard(*key))
105                    .or_default()
106                    .push((val.to_string(), 1)),
107            }
108        }
109
110        // First create and register any data shards as necessary.
111        for data_id in writes.keys().chain(read_ids.iter()) {
112            let _init_ts = self.ensure_registered(data_id).await;
113        }
114
115        // Run the core read+write, retry-at-a-higher-ts-on-conflict loop.
116        let mut read_ts = u64::from(self.oracle.read_ts().await);
117        info!("read ts {}", read_ts);
118        self.peeks.clear();
119        self.read_at(read_ts, read_ids.iter()).await;
120        if writes.is_empty() {
121            debug!("req committed at read_ts={}", read_ts);
122        } else {
123            let mut txn = self.txns.begin();
124            for (data_id, writes) in writes {
125                for (data, diff) in writes {
126                    txn.write(&data_id, data, (), diff).await;
127                }
128            }
129            let mut write_ts = u64::from(self.oracle.write_ts().await.timestamp);
130            loop {
131                // To be linearizable, we need to ensure that reads are done at
132                // the timestamp previous to the write_ts. However, we're not
133                // guaranteed that this is readable (someone could have consumed
134                // the write_ts and then crashed), so we first have to do an
135                // empty write at read_ts.
136                let new_read_ts = write_ts.checked_sub(1).expect("write_ts should be > 0");
137                info!("read ts {} write ts {}", new_read_ts, write_ts);
138                if new_read_ts != read_ts {
139                    self.unblock_and_read_at(new_read_ts, read_ids.iter()).await;
140                    read_ts = new_read_ts;
141                }
142
143                txn.tidy(std::mem::take(&mut self.tidy));
144                match txn.commit_at(&mut self.txns, write_ts).await {
145                    Ok(maintenance) => {
146                        self.oracle.apply_write(write_ts.into()).await;
147                        // Aggressively allow the txns shard to compact. To
148                        // exercise more edge cases, do it before we apply the
149                        // newly committed txn.
150                        self.txns.compact_to(write_ts).await;
151
152                        debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts);
153                        let tidy = maintenance.apply(&mut self.txns).await;
154                        self.tidy.merge(tidy);
155                        break;
156                    }
157                    Err(current) => {
158                        write_ts = current;
159                        // Have to redo our reads, but that's taken care of at
160                        // the top of the loop.
161                        continue;
162                    }
163                }
164            }
165        }
166
167        // Normally, txns would have to be all reads followed by all writes. To
168        // support any txn-list-append txns, this map is filled in with writes
169        // from _this_ txn as we walk through the request, allowing us to append
170        // them to reads.
171        let mut this_txn_writes = BTreeMap::<_, Vec<_>>::new();
172
173        let res = req_ops
174            .iter()
175            .map(|op| match op {
176                ReqTxnOp::Read { key } => {
177                    let key_shard = self.key_shard(*key);
178                    let mut data = self
179                        .peeks
180                        .get(&key_shard)
181                        .expect("key should have been read")
182                        .output()
183                        .iter()
184                        // The DataSubscribe only guarantees that this output contains
185                        // everything <= read_ts, but it might contain things after it,
186                        // too. Filter them out.
187                        .filter(|(_, t, _)| *t <= read_ts)
188                        .map(|(k, t, d)| {
189                            let k = k.parse().expect("valid u64");
190                            (k, *t, *d)
191                        })
192                        .collect::<Vec<_>>();
193                    let mut seen = BTreeSet::new();
194                    let mut val = Vec::new();
195                    consolidate_updates(&mut data);
196                    // Sort things in commit (ts) order, then by key, then with
197                    // insertions before retractions (so we can assert that
198                    // retractions mean removal from the `seen` map).
199                    data.sort_by_key(|(k, t, d)| (*t, *k, std::cmp::Reverse(*d)));
200                    debug!(
201                        "{} {:.9} read after sort {:?}",
202                        key,
203                        key_shard.to_string(),
204                        data
205                    );
206                    for (x, _, d) in data {
207                        if d == 1 {
208                            assert!(seen.insert(x));
209                            val.push(x);
210                        } else if d == -1 {
211                            assert!(seen.remove(&x));
212                            val.retain(|y| *y != x);
213                        } else {
214                            panic!("unexpected diff: {}", d);
215                        }
216                    }
217                    if let Some(this_writes) = this_txn_writes.get(key) {
218                        val.extend(this_writes.iter().copied());
219                    }
220                    ResTxnOp::Read { key: *key, val }
221                }
222                ReqTxnOp::Append { key, val } => {
223                    this_txn_writes.entry(key).or_default().push(val);
224                    ResTxnOp::Append {
225                        key: *key,
226                        val: *val,
227                    }
228                }
229            })
230            .collect();
231        Ok(res)
232    }
233
234    // Returns the minimum timestamp at which this can be read.
235    async fn ensure_registered(&mut self, data_id: &ShardId) -> Result<u64, ExternalError> {
236        // Already registered.
237        if let Some((init_ts, _)) = self.data_reads.get(data_id) {
238            return Ok(*init_ts);
239        }
240
241        // Not registered
242        let data_read = self
243            .client
244            .open_leased_reader(
245                *data_id,
246                Arc::new(StringSchema),
247                Arc::new(UnitSchema),
248                Diagnostics::from_purpose("txn data"),
249                true,
250            )
251            .await
252            .expect("data schema shouldn't change");
253
254        let mut init_ts = u64::from(self.oracle.write_ts().await.timestamp);
255        loop {
256            let data_write = self
257                .client
258                .open_writer(
259                    *data_id,
260                    Arc::new(StringSchema),
261                    Arc::new(UnitSchema),
262                    Diagnostics::from_purpose("txn data"),
263                )
264                .await
265                .expect("data schema shouldn't change");
266            let res = self.txns.register(init_ts, [data_write]).await;
267            match res {
268                Ok(_) => {
269                    self.oracle.apply_write(init_ts.into()).await;
270                    self.data_reads.insert(*data_id, (init_ts, data_read));
271                    return Ok(init_ts);
272                }
273                Err(new_init_ts) => {
274                    debug!(
275                        "register {:.9} at {} mismatch current={}",
276                        data_id, init_ts, new_init_ts
277                    );
278                    init_ts = u64::from(self.oracle.write_ts().await.timestamp);
279                    continue;
280                }
281            }
282        }
283    }
284
285    async fn read_at(&mut self, read_ts: u64, data_ids: impl Iterator<Item = &ShardId>) {
286        // Ensure these reads don't block.
287        let tidy = self.txns.apply_le(&read_ts).await;
288        self.tidy.merge(tidy);
289
290        // SUBTLE! Maelstrom txn-list-append requires that we be able to
291        // reconstruct the order in which we appended list items. To avoid
292        // needing to change the staged writes if our read_ts advances, we
293        // instead do something overly clever and use the update timestamps. To
294        // recover them, instead of grabbing a snapshot at the read_ts, we have
295        // to start a subscription at time 0 and walk it forward until we pass
296        // read_ts.
297        for data_id in data_ids {
298            let peek = match self.peeks.entry(*data_id) {
299                Entry::Occupied(x) => x.into_mut(),
300                Entry::Vacant(x) => {
301                    let peek =
302                        DataSubscribeTask::new(self.client.clone(), self.txns_id, *data_id, 0)
303                            .await;
304                    x.insert(peek)
305                }
306            };
307            peek.step_past(read_ts).await;
308        }
309    }
310
311    async fn unblock_and_read_at(
312        &mut self,
313        read_ts: u64,
314        data_ids: impl Iterator<Item = &ShardId>,
315    ) {
316        debug!("unblock_and_read_at {}", read_ts);
317        let mut txn = self.txns.begin();
318        match txn.commit_at(&mut self.txns, read_ts).await {
319            Ok(apply) => {
320                self.tidy.merge(apply.apply(&mut self.txns).await);
321            }
322            // Already unblocked.
323            Err(_) => {}
324        }
325        self.read_at(read_ts, data_ids).await
326    }
327
328    // Constructs a ShardId that is stable per key (so each maelstrom process
329    // gets the same one) and per txns_id (so runs of maelstrom don't interfere
330    // with each other).
331    fn key_shard(&self, key: u64) -> ShardId {
332        let mut h = DefaultHasher::new();
333        key.hash(&mut h);
334        self.txns_id.hash(&mut h);
335        let mut buf = [0u8; 16];
336        buf[0..8].copy_from_slice(&h.finish().to_le_bytes());
337        let shard_id = format!("s{}", uuid::Uuid::from_bytes(buf));
338        shard_id.parse().expect("valid shard id")
339    }
340}
341
342/// An adaptor to implement [Service] using [Transactor]
343#[derive(Debug)]
344pub struct TransactorService(pub Arc<Mutex<Transactor>>);
345
346#[async_trait]
347impl Service for TransactorService {
348    async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
349        // Use the Maelstrom services to initialize a new random ShardId (so we
350        // can repeatedly run tests against the same Blob and Consensus without
351        // conflicting) and communicate it between processes.
352        let shard_id = handle.maybe_init_shard_id().await?;
353
354        // Make sure the seed is recomputed each time through the retry
355        // closure, so we don't retry the same deterministic timeouts.
356        let seed: u64 = SystemTime::now()
357            .duration_since(UNIX_EPOCH)
358            .unwrap_or_default()
359            .subsec_nanos()
360            .into();
361        // It doesn't particularly matter what we set should_happen to, so we do
362        // this to have a convenient single tunable param.
363        let should_happen = 1.0 - args.unreliability;
364        // For consensus, set should_timeout to `args.unreliability` so that once we split
365        // ExternalErrors into determinate vs indeterminate, then
366        // `args.unreliability` will also be the fraction of txns that it's
367        // not save for Maelstrom to retry (b/c indeterminate error in
368        // Consensus CaS).
369        let should_timeout = args.unreliability;
370        // It doesn't particularly matter what we set should_happen and
371        // should_timeout to for blobs, so use the same handle for both.
372        let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
373
374        let mut config =
375            PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
376        {
377            // We only use the Postgres tuned queries when connected to vanilla
378            // Postgres, so we always want to enable them for testing.
379            let mut updates = ConfigUpdates::default();
380            updates.add(&mz_persist::postgres::USE_POSTGRES_TUNED_QUERIES, true);
381            config.apply_from(&updates);
382        }
383
384        let metrics_registry = MetricsRegistry::new();
385        let metrics = Arc::new(PersistMetrics::new(&config, &metrics_registry));
386
387        // Construct requested Blob.
388        let blob = match &args.blob_uri {
389            Some(blob_uri) => {
390                let cfg = BlobConfig::try_from(
391                    blob_uri,
392                    Box::new(config.clone()),
393                    metrics.s3_blob.clone(),
394                    Arc::clone(&config.configs),
395                )
396                .await
397                .expect("blob_uri should be valid");
398                loop {
399                    match cfg.clone().open().await {
400                        Ok(x) => break x,
401                        Err(err) => {
402                            info!("failed to open blob, trying again: {}", err);
403                        }
404                    }
405                }
406            }
407            None => MaelstromBlob::new(handle.clone()),
408        };
409        let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
410        // Normal production persist usage (even including a real SQL txn impl)
411        // isn't particularly benefitted by a cache, so we don't have one baked
412        // into persist. In contrast, our Maelstrom transaction model
413        // intentionally exercises both a new snapshot and new listener on each
414        // txn. As a result, without a cache, things would be terribly slow,
415        // unreliable would cause more retries than are interesting, and the
416        // Lamport diagrams that Maelstrom generates would be noisy.
417        let blob = CachingBlob::new(blob);
418        // to simplify some downstream logic (+ a bit more stress testing),
419        // always downgrade the since of critical handles when asked
420        config.critical_downgrade_interval = Duration::from_secs(0);
421        // set a live diff scan limit such that we'll explore both the fast and slow paths
422        config.set_state_versions_recent_live_diffs_limit(5);
423        let consensus = match &args.consensus_uri {
424            Some(consensus_uri) => {
425                let cfg = ConsensusConfig::try_from(
426                    consensus_uri,
427                    Box::new(config.clone()),
428                    metrics.postgres_consensus.clone(),
429                    Arc::clone(&config.configs),
430                )
431                .expect("consensus_uri should be valid");
432                loop {
433                    match cfg.clone().open().await {
434                        Ok(x) => break x,
435                        Err(err) => {
436                            info!("failed to open consensus, trying again: {}", err);
437                        }
438                    }
439                }
440            }
441            None => MaelstromConsensus::new(handle.clone()),
442        };
443        let consensus: Arc<dyn Consensus> =
444            Arc::new(UnreliableConsensus::new(consensus, unreliable));
445
446        // Wire up the TransactorService.
447        let isolated_runtime = Arc::new(IsolatedRuntime::new_for_tests());
448        let pubsub_sender = PubSubClientConnection::noop().sender;
449        let shared_states = Arc::new(StateCache::new(
450            &config,
451            Arc::clone(&metrics),
452            Arc::clone(&pubsub_sender),
453        ));
454        let client = PersistClient::new(
455            config,
456            blob,
457            consensus,
458            metrics,
459            isolated_runtime,
460            shared_states,
461            pubsub_sender,
462        )?;
463        // It's an annoying refactor to add an oracle_uri cli flag, so for now,
464        // piggy-back on --consensus_uri.
465        let oracle_uri = args.consensus_uri.clone();
466        let oracle_scheme = oracle_uri.as_ref().map(|x| (x.scheme(), x));
467        let oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send> = match oracle_scheme {
468            Some(("postgres", uri)) | Some(("postgresql", uri)) => {
469                let cfg = PostgresTimestampOracleConfig::new(uri, &metrics_registry);
470                Box::new(
471                    PostgresTimestampOracle::open(
472                        cfg,
473                        "maelstrom".to_owned(),
474                        mz_repr::Timestamp::minimum(),
475                        NOW_ZERO.clone(),
476                        false, /* read-only */
477                    )
478                    .await,
479                )
480            }
481            Some(("mem", _)) => Box::new(MemTimestampOracle::default()),
482            Some((scheme, _)) => unimplemented!("unsupported oracle type: {}", scheme),
483            None => unimplemented!("TODO: support maelstrom oracle"),
484        };
485        let transactor = Transactor::new(client, shard_id, oracle).await?;
486        let service = TransactorService(Arc::new(Mutex::new(transactor)));
487        Ok(service)
488    }
489
490    async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
491        match req {
492            Body::ReqTxn { msg_id, txn } => {
493                let in_reply_to = msg_id;
494                match self.0.lock().await.transact(&txn).await {
495                    Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
496                        msg_id,
497                        in_reply_to,
498                        txn,
499                    }),
500                    Err(MaelstromError { code, text }) => {
501                        handle.send_res(src, |msg_id| Body::Error {
502                            msg_id: Some(msg_id),
503                            in_reply_to,
504                            code,
505                            text,
506                        })
507                    }
508                }
509            }
510            req => unimplemented!("unsupported req: {:?}", req),
511        }
512    }
513}