persistcli/maelstrom/
txn_list_append_multi.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the Maelstrom txn-list-append workload using the
11//! multi-shard txn abstraction.
12
13use std::collections::btree_map::Entry;
14use std::collections::hash_map::DefaultHasher;
15use std::collections::{BTreeMap, BTreeSet};
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use async_trait::async_trait;
21use differential_dataflow::consolidation::consolidate_updates;
22use mz_ore::metrics::MetricsRegistry;
23use mz_ore::now::{NOW_ZERO, SYSTEM_TIME};
24use mz_persist::cfg::{BlobConfig, ConsensusConfig};
25use mz_persist::location::{Blob, Consensus, ExternalError};
26use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
27use mz_persist_client::async_runtime::IsolatedRuntime;
28use mz_persist_client::cache::StateCache;
29use mz_persist_client::cfg::PersistConfig;
30use mz_persist_client::metrics::Metrics as PersistMetrics;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::rpc::PubSubClientConnection;
33use mz_persist_client::{Diagnostics, PersistClient, ShardId};
34use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
35use mz_timestamp_oracle::TimestampOracle;
36use mz_timestamp_oracle::postgres_oracle::{
37    PostgresTimestampOracle, PostgresTimestampOracleConfig,
38};
39use mz_txn_wal::metrics::Metrics as TxnMetrics;
40use mz_txn_wal::operator::DataSubscribeTask;
41use mz_txn_wal::txns::{Tidy, TxnsHandle};
42use timely::progress::Timestamp;
43use tokio::sync::Mutex;
44use tracing::{debug, info};
45
46use crate::maelstrom::Args;
47use crate::maelstrom::api::{Body, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
48use crate::maelstrom::node::{Handle, Service};
49use crate::maelstrom::services::{
50    CachingBlob, MaelstromBlob, MaelstromConsensus, MemTimestampOracle,
51};
52
53#[derive(Debug)]
54pub struct Transactor {
55    txns_id: ShardId,
56    oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
57    client: PersistClient,
58    txns: TxnsHandle<String, (), u64, i64>,
59    tidy: Tidy,
60    data_reads: BTreeMap<ShardId, (u64, ReadHandle<String, (), u64, i64>)>,
61    peeks: BTreeMap<ShardId, DataSubscribeTask>,
62}
63
64impl Transactor {
65    pub async fn new(
66        client: PersistClient,
67        txns_id: ShardId,
68        oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
69    ) -> Result<Self, MaelstromError> {
70        let init_ts = u64::from(oracle.write_ts().await.timestamp);
71        let txns = TxnsHandle::open(
72            init_ts,
73            client.clone(),
74            mz_txn_wal::all_dyncfgs(client.dyncfgs().clone()),
75            Arc::new(TxnMetrics::new(&MetricsRegistry::new())),
76            txns_id,
77        )
78        .await;
79        oracle.apply_write(init_ts.into()).await;
80        Ok(Transactor {
81            txns_id,
82            oracle,
83            txns,
84            tidy: Tidy::default(),
85            client,
86            data_reads: BTreeMap::default(),
87            peeks: BTreeMap::default(),
88        })
89    }
90
91    pub async fn transact(
92        &mut self,
93        req_ops: &[ReqTxnOp],
94    ) -> Result<Vec<ResTxnOp>, MaelstromError> {
95        let mut read_ids = Vec::new();
96        let mut writes = BTreeMap::<ShardId, Vec<(String, i64)>>::new();
97        for op in req_ops {
98            match op {
99                ReqTxnOp::Read { key } => {
100                    read_ids.push(self.key_shard(*key));
101                }
102                ReqTxnOp::Append { key, val } => writes
103                    .entry(self.key_shard(*key))
104                    .or_default()
105                    .push((val.to_string(), 1)),
106            }
107        }
108
109        // First create and register any data shards as necessary.
110        for data_id in writes.keys().chain(read_ids.iter()) {
111            let _init_ts = self.ensure_registered(data_id).await;
112        }
113
114        // Run the core read+write, retry-at-a-higher-ts-on-conflict loop.
115        let mut read_ts = u64::from(self.oracle.read_ts().await);
116        info!("read ts {}", read_ts);
117        self.peeks.clear();
118        self.read_at(read_ts, read_ids.iter()).await;
119        if writes.is_empty() {
120            debug!("req committed at read_ts={}", read_ts);
121        } else {
122            let mut txn = self.txns.begin();
123            for (data_id, writes) in writes {
124                for (data, diff) in writes {
125                    txn.write(&data_id, data, (), diff).await;
126                }
127            }
128            let mut write_ts = u64::from(self.oracle.write_ts().await.timestamp);
129            loop {
130                // To be linearizable, we need to ensure that reads are done at
131                // the timestamp previous to the write_ts. However, we're not
132                // guaranteed that this is readable (someone could have consumed
133                // the write_ts and then crashed), so we first have to do an
134                // empty write at read_ts.
135                let new_read_ts = write_ts.checked_sub(1).expect("write_ts should be > 0");
136                info!("read ts {} write ts {}", new_read_ts, write_ts);
137                if new_read_ts != read_ts {
138                    self.unblock_and_read_at(new_read_ts, read_ids.iter()).await;
139                    read_ts = new_read_ts;
140                }
141
142                txn.tidy(std::mem::take(&mut self.tidy));
143                match txn.commit_at(&mut self.txns, write_ts).await {
144                    Ok(maintenance) => {
145                        self.oracle.apply_write(write_ts.into()).await;
146                        // Aggressively allow the txns shard to compact. To
147                        // exercise more edge cases, do it before we apply the
148                        // newly committed txn.
149                        self.txns.compact_to(write_ts).await;
150
151                        debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts);
152                        let tidy = maintenance.apply(&mut self.txns).await;
153                        self.tidy.merge(tidy);
154                        break;
155                    }
156                    Err(current) => {
157                        write_ts = current;
158                        // Have to redo our reads, but that's taken care of at
159                        // the top of the loop.
160                        continue;
161                    }
162                }
163            }
164        }
165
166        // Normally, txns would have to be all reads followed by all writes. To
167        // support any txn-list-append txns, this map is filled in with writes
168        // from _this_ txn as we walk through the request, allowing us to append
169        // them to reads.
170        let mut this_txn_writes = BTreeMap::<_, Vec<_>>::new();
171
172        let res = req_ops
173            .iter()
174            .map(|op| match op {
175                ReqTxnOp::Read { key } => {
176                    let key_shard = self.key_shard(*key);
177                    let mut data = self
178                        .peeks
179                        .get(&key_shard)
180                        .expect("key should have been read")
181                        .output()
182                        .iter()
183                        // The DataSubscribe only guarantees that this output contains
184                        // everything <= read_ts, but it might contain things after it,
185                        // too. Filter them out.
186                        .filter(|(_, t, _)| *t <= read_ts)
187                        .map(|(k, t, d)| {
188                            let k = k.parse().expect("valid u64");
189                            (k, *t, *d)
190                        })
191                        .collect::<Vec<_>>();
192                    let mut seen = BTreeSet::new();
193                    let mut val = Vec::new();
194                    consolidate_updates(&mut data);
195                    // Sort things in commit (ts) order, then by key, then with
196                    // insertions before retractions (so we can assert that
197                    // retractions mean removal from the `seen` map).
198                    data.sort_by_key(|(k, t, d)| (*t, *k, std::cmp::Reverse(*d)));
199                    debug!(
200                        "{} {:.9} read after sort {:?}",
201                        key,
202                        key_shard.to_string(),
203                        data
204                    );
205                    for (x, _, d) in data {
206                        if d == 1 {
207                            assert!(seen.insert(x));
208                            val.push(x);
209                        } else if d == -1 {
210                            assert!(seen.remove(&x));
211                            val.retain(|y| *y != x);
212                        } else {
213                            panic!("unexpected diff: {}", d);
214                        }
215                    }
216                    if let Some(this_writes) = this_txn_writes.get(key) {
217                        val.extend(this_writes.iter().copied());
218                    }
219                    ResTxnOp::Read { key: *key, val }
220                }
221                ReqTxnOp::Append { key, val } => {
222                    this_txn_writes.entry(key).or_default().push(val);
223                    ResTxnOp::Append {
224                        key: *key,
225                        val: *val,
226                    }
227                }
228            })
229            .collect();
230        Ok(res)
231    }
232
233    // Returns the minimum timestamp at which this can be read.
234    async fn ensure_registered(&mut self, data_id: &ShardId) -> Result<u64, ExternalError> {
235        // Already registered.
236        if let Some((init_ts, _)) = self.data_reads.get(data_id) {
237            return Ok(*init_ts);
238        }
239
240        // Not registered
241        let data_read = self
242            .client
243            .open_leased_reader(
244                *data_id,
245                Arc::new(StringSchema),
246                Arc::new(UnitSchema),
247                Diagnostics::from_purpose("txn data"),
248                true,
249            )
250            .await
251            .expect("data schema shouldn't change");
252
253        let mut init_ts = u64::from(self.oracle.write_ts().await.timestamp);
254        loop {
255            let data_write = self
256                .client
257                .open_writer(
258                    *data_id,
259                    Arc::new(StringSchema),
260                    Arc::new(UnitSchema),
261                    Diagnostics::from_purpose("txn data"),
262                )
263                .await
264                .expect("data schema shouldn't change");
265            let res = self.txns.register(init_ts, [data_write]).await;
266            match res {
267                Ok(_) => {
268                    self.oracle.apply_write(init_ts.into()).await;
269                    self.data_reads.insert(*data_id, (init_ts, data_read));
270                    return Ok(init_ts);
271                }
272                Err(new_init_ts) => {
273                    debug!(
274                        "register {:.9} at {} mismatch current={}",
275                        data_id, init_ts, new_init_ts
276                    );
277                    init_ts = u64::from(self.oracle.write_ts().await.timestamp);
278                    continue;
279                }
280            }
281        }
282    }
283
284    async fn read_at(&mut self, read_ts: u64, data_ids: impl Iterator<Item = &ShardId>) {
285        // Ensure these reads don't block.
286        let tidy = self.txns.apply_le(&read_ts).await;
287        self.tidy.merge(tidy);
288
289        // SUBTLE! Maelstrom txn-list-append requires that we be able to
290        // reconstruct the order in which we appended list items. To avoid
291        // needing to change the staged writes if our read_ts advances, we
292        // instead do something overly clever and use the update timestamps. To
293        // recover them, instead of grabbing a snapshot at the read_ts, we have
294        // to start a subscription at time 0 and walk it forward until we pass
295        // read_ts.
296        for data_id in data_ids {
297            let peek = match self.peeks.entry(*data_id) {
298                Entry::Occupied(x) => x.into_mut(),
299                Entry::Vacant(x) => {
300                    let peek =
301                        DataSubscribeTask::new(self.client.clone(), self.txns_id, *data_id, 0)
302                            .await;
303                    x.insert(peek)
304                }
305            };
306            peek.step_past(read_ts).await;
307        }
308    }
309
310    async fn unblock_and_read_at(
311        &mut self,
312        read_ts: u64,
313        data_ids: impl Iterator<Item = &ShardId>,
314    ) {
315        debug!("unblock_and_read_at {}", read_ts);
316        let mut txn = self.txns.begin();
317        match txn.commit_at(&mut self.txns, read_ts).await {
318            Ok(apply) => {
319                self.tidy.merge(apply.apply(&mut self.txns).await);
320            }
321            // Already unblocked.
322            Err(_) => {}
323        }
324        self.read_at(read_ts, data_ids).await
325    }
326
327    // Constructs a ShardId that is stable per key (so each maelstrom process
328    // gets the same one) and per txns_id (so runs of maelstrom don't interfere
329    // with each other).
330    fn key_shard(&self, key: u64) -> ShardId {
331        let mut h = DefaultHasher::new();
332        key.hash(&mut h);
333        self.txns_id.hash(&mut h);
334        let mut buf = [0u8; 16];
335        buf[0..8].copy_from_slice(&h.finish().to_le_bytes());
336        let shard_id = format!("s{}", uuid::Uuid::from_bytes(buf));
337        shard_id.parse().expect("valid shard id")
338    }
339}
340
341/// An adaptor to implement [Service] using [Transactor]
342#[derive(Debug)]
343pub struct TransactorService(pub Arc<Mutex<Transactor>>);
344
345#[async_trait]
346impl Service for TransactorService {
347    async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
348        // Use the Maelstrom services to initialize a new random ShardId (so we
349        // can repeatedly run tests against the same Blob and Consensus without
350        // conflicting) and communicate it between processes.
351        let shard_id = handle.maybe_init_shard_id().await?;
352
353        // Make sure the seed is recomputed each time through the retry
354        // closure, so we don't retry the same deterministic timeouts.
355        let seed: u64 = SystemTime::now()
356            .duration_since(UNIX_EPOCH)
357            .unwrap_or_default()
358            .subsec_nanos()
359            .into();
360        // It doesn't particularly matter what we set should_happen to, so we do
361        // this to have a convenient single tunable param.
362        let should_happen = 1.0 - args.unreliability;
363        // For consensus, set should_timeout to `args.unreliability` so that once we split
364        // ExternalErrors into determinate vs indeterminate, then
365        // `args.unreliability` will also be the fraction of txns that it's
366        // not save for Maelstrom to retry (b/c indeterminate error in
367        // Consensus CaS).
368        let should_timeout = args.unreliability;
369        // It doesn't particularly matter what we set should_happen and
370        // should_timeout to for blobs, so use the same handle for both.
371        let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
372
373        let mut config =
374            PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
375        let metrics_registry = MetricsRegistry::new();
376        let metrics = Arc::new(PersistMetrics::new(&config, &metrics_registry));
377
378        // Construct requested Blob.
379        let blob = match &args.blob_uri {
380            Some(blob_uri) => {
381                let cfg = BlobConfig::try_from(
382                    blob_uri,
383                    Box::new(config.clone()),
384                    metrics.s3_blob.clone(),
385                    Arc::clone(&config.configs),
386                )
387                .await
388                .expect("blob_uri should be valid");
389                loop {
390                    match cfg.clone().open().await {
391                        Ok(x) => break x,
392                        Err(err) => {
393                            info!("failed to open blob, trying again: {}", err);
394                        }
395                    }
396                }
397            }
398            None => MaelstromBlob::new(handle.clone()),
399        };
400        let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
401        // Normal production persist usage (even including a real SQL txn impl)
402        // isn't particularly benefitted by a cache, so we don't have one baked
403        // into persist. In contrast, our Maelstrom transaction model
404        // intentionally exercises both a new snapshot and new listener on each
405        // txn. As a result, without a cache, things would be terribly slow,
406        // unreliable would cause more retries than are interesting, and the
407        // Lamport diagrams that Maelstrom generates would be noisy.
408        let blob = CachingBlob::new(blob);
409        // to simplify some downstream logic (+ a bit more stress testing),
410        // always downgrade the since of critical handles when asked
411        config.critical_downgrade_interval = Duration::from_secs(0);
412        // set a live diff scan limit such that we'll explore both the fast and slow paths
413        config.set_state_versions_recent_live_diffs_limit(5);
414        let consensus = match &args.consensus_uri {
415            Some(consensus_uri) => {
416                let cfg = ConsensusConfig::try_from(
417                    consensus_uri,
418                    Box::new(config.clone()),
419                    metrics.postgres_consensus.clone(),
420                )
421                .expect("consensus_uri should be valid");
422                loop {
423                    match cfg.clone().open().await {
424                        Ok(x) => break x,
425                        Err(err) => {
426                            info!("failed to open consensus, trying again: {}", err);
427                        }
428                    }
429                }
430            }
431            None => MaelstromConsensus::new(handle.clone()),
432        };
433        let consensus: Arc<dyn Consensus> =
434            Arc::new(UnreliableConsensus::new(consensus, unreliable));
435
436        // Wire up the TransactorService.
437        let isolated_runtime = Arc::new(IsolatedRuntime::default());
438        let pubsub_sender = PubSubClientConnection::noop().sender;
439        let shared_states = Arc::new(StateCache::new(
440            &config,
441            Arc::clone(&metrics),
442            Arc::clone(&pubsub_sender),
443        ));
444        let client = PersistClient::new(
445            config,
446            blob,
447            consensus,
448            metrics,
449            isolated_runtime,
450            shared_states,
451            pubsub_sender,
452        )?;
453        // It's an annoying refactor to add an oracle_uri cli flag, so for now,
454        // piggy-back on --consensus_uri.
455        let oracle_uri = args.consensus_uri.clone();
456        let oracle_scheme = oracle_uri.as_ref().map(|x| (x.scheme(), x));
457        let oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send> = match oracle_scheme {
458            Some(("postgres", uri)) | Some(("postgresql", uri)) => {
459                let cfg = PostgresTimestampOracleConfig::new(uri, &metrics_registry);
460                Box::new(
461                    PostgresTimestampOracle::open(
462                        cfg,
463                        "maelstrom".to_owned(),
464                        mz_repr::Timestamp::minimum(),
465                        NOW_ZERO.clone(),
466                        false, /* read-only */
467                    )
468                    .await,
469                )
470            }
471            Some(("mem", _)) => Box::new(MemTimestampOracle::default()),
472            Some((scheme, _)) => unimplemented!("unsupported oracle type: {}", scheme),
473            None => unimplemented!("TODO: support maelstrom oracle"),
474        };
475        let transactor = Transactor::new(client, shard_id, oracle).await?;
476        let service = TransactorService(Arc::new(Mutex::new(transactor)));
477        Ok(service)
478    }
479
480    async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
481        match req {
482            Body::ReqTxn { msg_id, txn } => {
483                let in_reply_to = msg_id;
484                match self.0.lock().await.transact(&txn).await {
485                    Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
486                        msg_id,
487                        in_reply_to,
488                        txn,
489                    }),
490                    Err(MaelstromError { code, text }) => {
491                        handle.send_res(src, |msg_id| Body::Error {
492                            msg_id: Some(msg_id),
493                            in_reply_to,
494                            code,
495                            text,
496                        })
497                    }
498                }
499            }
500            req => unimplemented!("unsupported req: {:?}", req),
501        }
502    }
503}