mz_txn_wal/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Atomic multi-shard [persist] writes.
11//!
12//! [persist]: mz_persist_client
13//!
14//! This crate presents an abstraction on top of persist shards, allowing
15//! efficient atomic multi-shard writes. This is accomplished through an
16//! additional _txn_ shard that coordinates writes to a (potentially large)
17//! number of _data_ shards. Data shards may be added and removed to the set at
18//! any time.
19//!
20//! **WARNING!** While a data shard is registered to the txn set, writing to it
21//! directly (i.e. using a [WriteHandle] instead of the [TxnsHandle]) will lead
22//! to incorrectness, undefined behavior, and (potentially sticky) panics.
23//!
24//! [WriteHandle]: mz_persist_client::write::WriteHandle
25//! [TxnsHandle]: crate::txns::TxnsHandle
26//!
27//! Benefits of these txns:
28//! - _Key idea_: A transactional write costs in proportion to the total size of
29//!   data written, and the number of data shards involved (plus one for the
30//!   txns shard).
31//! - _Key idea_: As time progresses, the upper of every data shard is logically
32//!   (but not physically) advanced en masse with a single write to the txns
33//!   shard. (Data writes may also be bundled into this, if desired.)
34//! - Transactions are write-only, but read-then-write transactions can be built
35//!   on top by using read and write timestamps selected to have no possible
36//!   writes in between (e.g. `write_ts/commit_ts = read_ts + 1`).
37//! - Transactions of any size are supported in bounded memory. This is done
38//!   though the usual persist mechanism of spilling to s3. These spilled
39//!   batched are efficiently re-timestamped when a commit must be retried at a
40//!   higher timestamp.
41//! - The data shards may be read independently of each other.
42//! - The persist "maintenance" work assigned on behalf of the committed txn is
43//!   (usually, see below) assigned to the txn committer.
44//! - It is possible to implement any of snapshot, serializable, or
45//!   strict-serializable isolation on top of this interface via write and read
46//!   timestamp selections (see [#Isolation](#isolation) below for details).
47//! - It is possible to serialize and communicate an uncommitted [Txn] between
48//!   processes and also to merge uncommitted [Txn]s, if necessary (e.g.
49//!   consolidating all monitoring collections, statement logging, etc into the
50//!   periodic timestamp advancement). This is not initially implemented, but
51//!   could be.
52//!
53//! [Txn]: crate::txn_write::Txn
54//!
55//! Restrictions:
56//! - Data shards must all use the same codecs for `K, V, T, D`. However, each
57//!   data shard may have a independent `K` and `V` schemas. The txns shard
58//!   inherits the `T` codec from the data shards (and uses its own `K, V, D`
59//!   ones).
60//! - All txn writes are linearized through the txns shard, so there is some
61//!   limit to horizontal and geographical scale out.
62//! - Performance has been tuned for _throughput_ and _un-contended latency_.
63//!   Latency on contended workloads will likely be quite bad. At a high level,
64//!   if N txns are run concurrently, 1 will commit and N-1 will have to
65//!   (usually cheaply) retry. (However, note that it is also possible to
66//!   combine and commit multiple txns at the same timestamp, as mentioned
67//!   above, which gives us some amount of knobs for doing something different
68//!   here.)
69//!
70//! # Intuition and Jargon
71//!
72//! - The _txns shard_ is the source of truth for what has (and has not)
73//!   committed to a set of _data shards_.
74//! - Each data shard must be _registered_ at some `register_ts` before being
75//!   used in transactions. Registration is for bookkeeping only, there is no
76//!   particular meaning to the timestamp other than it being a lower bound on
77//!   when txns using this data shard can commit. Registration only needs to be
78//!   run once-ever per data shard, but it is idempotent, so can also be run
79//!   at-least-once.
80//! - A txn is broken into three phases:
81//!   - (Elided: A pre-txn phase where MZ might perform reads for
82//!     read-then-write txns or might buffer writes.)
83//!   - _commit_: The txn is committed by writing lightweight pointers to
84//!     (potentially large) batches of data as updates in txns_shard with a
85//!     timestamp of `commit_ts`. Feel free to think of this as a WAL. This
86//!     makes the txn durable (thus "definite") and also advances the _logical
87//!     upper_ of every data shard registered at a timestamp before commit_ts,
88//!     including those not involved in the txn. However, at this point, it is
89//!     not yet possible to read at the commit ts.
90//!   - _apply_: We could serve reads of data shards from the information in the
91//!     txns shard, but instead we choose to serve them from the physical data
92//!     shard itself so that we may reuse existing persist infrastructure (e.g.
93//!     multi-worker persist-source). This means we must take the batch pointers
94//!     written to the txns shard and, in commit_ts order, "denormalize" them
95//!     into each data shard with `compare_and_append`. We call this process
96//!     applying the txn. Feel free to think of this as applying the WAL.
97//!
98//!     (Note that this means each data shard's _physical upper_ reflects the
99//!     last committed txn touching that shard, and so the _logical upper_ may
100//!     be greater than this. See [TxnsCache] for more details.)
101//!   - _tidy_: After a committed txn has been applied, the updates for that txn
102//!     are retracted from the txns shard. (To handle races, both application
103//!     and retraction are written to be idempotent.) This prevents the txns
104//!     shard from growing unboundedly and also means that, at any given time,
105//!     the txns shard contains the set of txns that need to be applied (as well
106//!     as the set of registered data shards).
107//!
108//! [TxnsCache]: crate::txn_cache::TxnsCache
109//!
110//! # Usage
111//!
112//! ```
113//! # use std::sync::Arc;
114//! # use mz_ore::metrics::MetricsRegistry;
115//! # use mz_persist_client::{Diagnostics, PersistClient, ShardId};
116//! # use mz_txn_wal::metrics::Metrics;
117//! # use mz_txn_wal::operator::DataSubscribe;
118//! # use mz_txn_wal::txns::TxnsHandle;
119//! # use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
120//! # use timely::progress::Antichain;
121//! #
122//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
123//! # let client = PersistClient::new_for_tests().await;
124//! # let dyncfgs = mz_txn_wal::all_dyncfgs(client.dyncfgs().clone());
125//! # let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
126//! # mz_ore::test::init_logging();
127//! // Open a txn shard, initializing it if necessary.
128//! let txns_id = ShardId::new();
129//! let mut txns = TxnsHandle::<String, (), u64, i64>::open(
130//!     0u64, client.clone(), dyncfgs, metrics, txns_id
131//! ).await;
132//!
133//! // Register data shards to the txn set.
134//! let (d0, d1) = (ShardId::new(), ShardId::new());
135//! # let d0_write = client.open_writer(
136//! #    d0, StringSchema.into(), UnitSchema.into(), Diagnostics::for_tests()
137//! # ).await.unwrap();
138//! # let d1_write = client.open_writer(
139//! #    d1, StringSchema.into(), UnitSchema.into(), Diagnostics::for_tests()
140//! # ).await.unwrap();
141//! txns.register(1u64, [d0_write]).await.expect("not previously initialized");
142//! txns.register(2u64, [d1_write]).await.expect("not previously initialized");
143//!
144//! // Commit a txn. This is durable if/when the `commit_at` succeeds, but reads
145//! // at the commit ts will _block_ until after the txn is applied. Users are
146//! // free to pass up the commit ack (e.g. to pgwire) to get a bit of latency
147//! // back. NB: It is expected that the txn committer will run the apply step,
148//! // but in the event of a crash, neither correctness nor liveness depend on
149//! // it.
150//! let mut txn = txns.begin();
151//! txn.write(&d0, "0".into(), (), 1);
152//! txn.write(&d1, "1".into(), (), -1);
153//! let tidy = txn.commit_at(&mut txns, 3).await.expect("ts 3 available")
154//!     // Make it available to reads by applying it.
155//!     .apply(&mut txns).await;
156//!
157//! // Commit a contended txn at a higher timestamp. Note that the upper of `d1`
158//! // is also advanced by this. At the same time clean up after our last commit
159//! // (the tidy).
160//! let mut txn = txns.begin();
161//! txn.write(&d0, "2".into(), (), 1);
162//! txn.tidy(tidy);
163//! txn.commit_at(&mut txns, 3).await.expect_err("ts 3 not available");
164//! let _tidy = txn.commit_at(&mut txns, 4).await.expect("ts 4 available")
165//!     .apply(&mut txns).await;
166//!
167//! // Read data shard(s) at some `read_ts`.
168//! let mut subscribe = DataSubscribe::new("example", client, txns_id, d1, 4, Antichain::new(), true);
169//! while subscribe.progress() <= 4 {
170//!     subscribe.step();
171//! #   tokio::task::yield_now().await;
172//! }
173//! let updates = subscribe.output();
174//! # })
175//! ```
176//!
177//! # Isolation
178//!
179//! This section is about "read-then-write" txns where all reads are performed
180//! before any writes (read-only and write-only are trivial specializations of
181//! this). All reads are performed at some `read_ts` and then all writes are
182//! performed at `write_ts` (aka the `commit_ts`).
183//!
184//! - To implement snapshot isolation using the above, select any `read_ts <
185//!   write_ts`. The `write_ts` can advance as necessary when retrying on
186//!   conflicts.
187//! - To implement serializable isolation using the above, select `write_ts =
188//!   read_ts + 1`. If the `write_ts` must be pushed as a result of a conflict,
189//!   then the `read_ts` must be similarly advanced. Note that if you happen to
190//!   have a system for efficiently computing changes to data as inputs change
191//!   (hmmm), it may be better to reason about `(read_ts, new_read_ts]` then to
192//!   recompute the reads from scratch.
193//! - To implement strict serializable (serializable + linearizable) isolation,
194//!   do the same as serializable, but with the additional constraints on
195//!   write_ts required by linearizability (handwave).
196//!
197//! # Implementation
198//!
199//! For details of the implementation of writes, see [TxnsHandle].
200//!
201//! For details of the implementation of reads, see [TxnsCache].
202
203#![warn(missing_docs, missing_debug_implementations)]
204
205use std::fmt::Debug;
206use std::fmt::Write;
207
208use differential_dataflow::Hashable;
209use differential_dataflow::difference::Semigroup;
210use differential_dataflow::lattice::Lattice;
211use mz_dyncfg::ConfigSet;
212use mz_ore::instrument;
213use mz_persist_client::ShardId;
214use mz_persist_client::critical::SinceHandle;
215use mz_persist_client::error::UpperMismatch;
216use mz_persist_client::write::WriteHandle;
217use mz_persist_types::codec_impls::{ShardIdSchema, VecU8Schema};
218use mz_persist_types::stats::PartStats;
219use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
220use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
221use timely::order::TotalOrder;
222use timely::progress::{Antichain, Timestamp};
223use tracing::{debug, error};
224
225use crate::proto::ProtoIdBatch;
226use crate::txns::DataWriteApply;
227
228pub mod metrics;
229pub mod operator;
230pub mod txn_cache;
231pub mod txn_read;
232pub mod txn_write;
233pub mod txns;
234
235mod proto {
236    use bytes::Bytes;
237    use mz_persist_client::batch::ProtoBatch;
238    use prost::Message;
239    use uuid::Uuid;
240
241    include!(concat!(env!("OUT_DIR"), "/mz_txn_wal.proto.rs"));
242
243    impl ProtoIdBatch {
244        pub(crate) fn new(batch: ProtoBatch) -> ProtoIdBatch {
245            ProtoIdBatch {
246                batch_id: Bytes::copy_from_slice(Uuid::new_v4().as_bytes()),
247                batch: Some(batch),
248            }
249        }
250
251        /// Recovers the ProtoBatch from an encoded batch.
252        ///
253        /// This might be an encoded ProtoIdBatch (new path) or a ProtoBatch
254        /// (legacy path). Some proto shenanigans are done to sniff out which.
255        pub(crate) fn parse(buf: &[u8]) -> ProtoBatch {
256            let b = ProtoIdBatch::decode(buf).expect("valid ProtoIdBatch");
257            // First try the new format.
258            if let Some(batch) = b.batch {
259                return batch;
260            }
261            // Fall back to the legacy format.
262            ProtoBatch::decode(buf).expect("valid (legacy) ProtoBatch")
263        }
264    }
265}
266
267/// Adds the full set of all txn-wal `Config`s.
268pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
269    configs
270        .add(&crate::operator::DATA_SHARD_RETRYER_CLAMP)
271        .add(&crate::operator::DATA_SHARD_RETRYER_INITIAL_BACKOFF)
272        .add(&crate::operator::DATA_SHARD_RETRYER_MULTIPLIER)
273        .add(&crate::txns::APPLY_ENSURE_SCHEMA_MATCH)
274}
275
276/// A reasonable default implementation of [TxnsCodec].
277///
278/// This uses the "native" Codecs for `ShardId` and `Vec<u8>`, with the latter
279/// empty for [TxnsEntry::Register] and non-empty for [TxnsEntry::Append].
280#[derive(Debug)]
281pub struct TxnsCodecDefault;
282
283impl TxnsCodec for TxnsCodecDefault {
284    type Key = ShardId;
285    type Val = Vec<u8>;
286    fn schemas() -> (<Self::Key as Codec>::Schema, <Self::Val as Codec>::Schema) {
287        (ShardIdSchema, VecU8Schema)
288    }
289    fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
290        match e {
291            TxnsEntry::Register(data_id, ts) => (data_id, ts.to_vec()),
292            TxnsEntry::Append(data_id, ts, batch) => {
293                // Put the ts at the end to let decode truncate it off.
294                (data_id, batch.into_iter().chain(ts).collect())
295            }
296        }
297    }
298    fn decode(key: Self::Key, mut val: Self::Val) -> TxnsEntry {
299        let mut ts = [0u8; 8];
300        let ts_idx = val.len().checked_sub(8).expect("ts encoded at end of val");
301        ts.copy_from_slice(&val[ts_idx..]);
302        val.truncate(ts_idx);
303        if val.is_empty() {
304            TxnsEntry::Register(key, ts)
305        } else {
306            TxnsEntry::Append(key, ts, val)
307        }
308    }
309    fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
310        let stats = stats
311            .key
312            .col("")?
313            .try_as_string()
314            .map_err(|err| error!("unexpected stats type: {}", err))
315            .ok()?;
316        let data_id_str = data_id.to_string();
317        Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
318    }
319}
320
321/// Helper for common logging for compare_and_append-ing a small amount of data.
322#[instrument(level = "debug", fields(shard=%txns_or_data_write.shard_id(), ts=?new_upper))]
323pub(crate) async fn small_caa<S, F, K, V, T, D>(
324    name: F,
325    txns_or_data_write: &mut WriteHandle<K, V, T, D>,
326    updates: &[((&K, &V), &T, D)],
327    upper: T,
328    new_upper: T,
329) -> Result<(), T>
330where
331    S: AsRef<str>,
332    F: Fn() -> S,
333    K: Debug + Codec,
334    V: Debug + Codec,
335    T: Timestamp + Lattice + TotalOrder + Codec64 + Sync,
336    D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
337{
338    fn debug_sep<'a, T: Debug + 'a>(sep: &str, xs: impl IntoIterator<Item = &'a T>) -> String {
339        xs.into_iter().fold(String::new(), |mut output, x| {
340            let _ = write!(output, "{}{:?}", sep, x);
341            output
342        })
343    }
344    debug!(
345        "CaA {} [{:?},{:?}){}",
346        name().as_ref(),
347        upper,
348        new_upper,
349        // This is a "small" CaA so we can inline the data in this debug log.
350        debug_sep("\n  ", updates)
351    );
352    let res = txns_or_data_write
353        .compare_and_append(
354            updates,
355            Antichain::from_elem(upper.clone()),
356            Antichain::from_elem(new_upper.clone()),
357        )
358        .await
359        .expect("usage was valid");
360    match res {
361        Ok(()) => {
362            debug!(
363                "CaA {} [{:?},{:?}) success",
364                name().as_ref(),
365                upper,
366                new_upper
367            );
368            Ok(())
369        }
370        Err(UpperMismatch { current, .. }) => {
371            let current = current
372                .into_option()
373                .expect("txns shard should not be closed");
374            debug!(
375                "CaA {} [{:?},{:?}) mismatch actual={:?}",
376                name().as_ref(),
377                upper,
378                new_upper,
379                current,
380            );
381            Err(current)
382        }
383    }
384}
385
386/// Ensures that the upper of the shard is past init_ts by writing an empty
387/// batch, retrying as necessary.
388///
389/// This method is idempotent.
390pub(crate) async fn empty_caa<S, F, K, V, T, D>(
391    name: F,
392    txns_or_data_write: &mut WriteHandle<K, V, T, D>,
393    init_ts: T,
394) where
395    S: AsRef<str>,
396    F: Fn() -> S,
397    K: Debug + Codec,
398    V: Debug + Codec,
399    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
400    D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
401{
402    let name = name();
403    let empty: &[((&K, &V), &T, D)] = &[];
404    let Some(mut upper) = txns_or_data_write.shared_upper().into_option() else {
405        // Shard is closed, which means the upper must be past init_ts.
406        return;
407    };
408    loop {
409        if init_ts < upper {
410            return;
411        }
412        let res = small_caa(
413            || name.as_ref(),
414            txns_or_data_write,
415            empty,
416            upper,
417            init_ts.step_forward(),
418        )
419        .await;
420        match res {
421            Ok(()) => return,
422            Err(current) => {
423                upper = current;
424            }
425        }
426    }
427}
428
429/// Ensures that a committed batch has been applied into a physical data shard,
430/// making it available for reads.
431///
432/// This process is definite work on top of definite input, so the
433/// implementation assumes that if the upper of the shard passes commit_ts then
434/// the work must have already been done by someone else. (Think how our compute
435/// replicas race to compute some MATERIALIZED VIEW, but they're all guaranteed
436/// to get the same answer.)
437#[instrument(level = "debug", fields(shard=%data_write.shard_id(), ts=?commit_ts))]
438async fn apply_caa<K, V, T, D>(
439    data_write: &mut DataWriteApply<K, V, T, D>,
440    batch_raws: &Vec<&[u8]>,
441    commit_ts: T,
442) where
443    K: Debug + Codec,
444    V: Debug + Codec,
445    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
446    D: Semigroup + Ord + Codec64 + Send + Sync,
447{
448    let mut batches = batch_raws
449        .into_iter()
450        .map(|batch| ProtoIdBatch::parse(batch))
451        .map(|batch| data_write.batch_from_transmittable_batch(batch))
452        .collect::<Vec<_>>();
453    let Some(mut upper) = data_write.shared_upper().into_option() else {
454        // Shard is closed, which means the upper must be past init_ts.
455        // Mark the batches as consumed, so we don't get warnings in the logs.
456        for batch in batches {
457            batch.into_hollow_batch();
458        }
459        return;
460    };
461    loop {
462        if commit_ts < upper {
463            debug!(
464                "CaA data {:.9} apply t={:?} already done",
465                data_write.shard_id().to_string(),
466                commit_ts
467            );
468            // Mark the batches as consumed, so we don't get warnings in the logs.
469            for batch in batches {
470                batch.into_hollow_batch();
471            }
472            return;
473        }
474
475        // Make sure we're using the same schema to CaA these batches as what
476        // they were written with.
477        data_write.maybe_replace_with_batch_schema(&batches).await;
478
479        debug!(
480            "CaA data {:.9} apply b={:?} t={:?} [{:?},{:?})",
481            data_write.shard_id().to_string(),
482            batch_raws
483                .iter()
484                .map(|batch_raw| batch_raw.hashed())
485                .collect::<Vec<_>>(),
486            commit_ts,
487            upper,
488            commit_ts.step_forward(),
489        );
490        let mut batches = batches.iter_mut().collect::<Vec<_>>();
491        let res = data_write
492            .compare_and_append_batch(
493                batches.as_mut_slice(),
494                Antichain::from_elem(upper.clone()),
495                Antichain::from_elem(commit_ts.step_forward()),
496                true,
497            )
498            .await
499            .expect("usage was valid");
500        match res {
501            Ok(()) => {
502                debug!(
503                    "CaA data {:.9} apply t={:?} [{:?},{:?}) success",
504                    data_write.shard_id().to_string(),
505                    commit_ts,
506                    upper,
507                    commit_ts.step_forward(),
508                );
509                return;
510            }
511            Err(UpperMismatch { current, .. }) => {
512                let current = current.into_option().expect("data should not be closed");
513                debug!(
514                    "CaA data {:.9} apply t={:?} [{:?},{:?}) mismatch actual={:?}",
515                    data_write.shard_id().to_string(),
516                    commit_ts,
517                    upper,
518                    commit_ts.step_forward(),
519                    current,
520                );
521                upper = current;
522                continue;
523            }
524        }
525    }
526}
527
528#[instrument(level = "debug", fields(shard=%txns_since.shard_id(), ts=?new_since_ts))]
529pub(crate) async fn cads<T, O, C>(
530    txns_since: &mut SinceHandle<C::Key, C::Val, T, i64, O>,
531    new_since_ts: T,
532) where
533    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
534    O: Opaque + Debug + Codec64,
535    C: TxnsCodec,
536{
537    // Fast-path, don't bother trying to CaDS if we're already past that
538    // since.
539    if !txns_since.since().less_than(&new_since_ts) {
540        return;
541    }
542    let token = txns_since.opaque().clone();
543    let res = txns_since
544        .compare_and_downgrade_since(&token, (&token, &Antichain::from_elem(new_since_ts)))
545        .await;
546    match res {
547        Ok(_) => {}
548        Err(actual) => {
549            mz_ore::halt!("fenced by another process @ {actual:?}. ours = {token:?}")
550        }
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use std::collections::{BTreeMap, BTreeSet};
557    use std::sync::Arc;
558    use std::sync::Mutex;
559
560    use crossbeam_channel::{Receiver, Sender, TryRecvError};
561    use differential_dataflow::consolidation::consolidate_updates;
562    use mz_persist_client::read::ReadHandle;
563    use mz_persist_client::{Diagnostics, PersistClient, ShardId};
564    use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
565    use prost::Message;
566
567    use crate::operator::DataSubscribe;
568    use crate::txn_cache::TxnsCache;
569    use crate::txn_write::{Txn, TxnApply};
570    use crate::txns::{Tidy, TxnsHandle};
571
572    use super::*;
573
574    impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>
575    where
576        K: Debug + Codec + Clone,
577        V: Debug + Codec + Clone,
578        T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
579        D: Debug + Semigroup + Ord + Codec64 + Send + Sync + Clone,
580        O: Opaque + Debug + Codec64,
581        C: TxnsCodec,
582    {
583        /// Returns a new, empty test transaction that can involve the data shards
584        /// registered with this handle.
585        pub(crate) fn begin_test(&self) -> TestTxn<K, V, T, D> {
586            TestTxn::new()
587        }
588    }
589
590    /// A [`Txn`] wrapper that exposes extra functionality for tests.
591    #[derive(Debug)]
592    pub struct TestTxn<K, V, T, D> {
593        txn: Txn<K, V, T, D>,
594        /// A copy of every write to use in tests.
595        writes: BTreeMap<ShardId, Vec<(K, V, D)>>,
596    }
597
598    impl<K, V, T, D> TestTxn<K, V, T, D>
599    where
600        K: Debug + Codec + Clone,
601        V: Debug + Codec + Clone,
602        T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
603        D: Debug + Semigroup + Ord + Codec64 + Send + Sync + Clone,
604    {
605        pub(crate) fn new() -> Self {
606            Self {
607                txn: Txn::new(),
608                writes: BTreeMap::default(),
609            }
610        }
611
612        pub(crate) async fn write(&mut self, data_id: &ShardId, key: K, val: V, diff: D) {
613            self.writes
614                .entry(*data_id)
615                .or_default()
616                .push((key.clone(), val.clone(), diff.clone()));
617            self.txn.write(data_id, key, val, diff).await
618        }
619
620        pub(crate) async fn commit_at<O, C>(
621            &mut self,
622            handle: &mut TxnsHandle<K, V, T, D, O, C>,
623            commit_ts: T,
624        ) -> Result<TxnApply<T>, T>
625        where
626            O: Opaque + Debug + Codec64,
627            C: TxnsCodec,
628        {
629            self.txn.commit_at(handle, commit_ts).await
630        }
631
632        pub(crate) fn merge(&mut self, other: Self) {
633            for (data_id, writes) in other.writes {
634                self.writes.entry(data_id).or_default().extend(writes);
635            }
636            self.txn.merge(other.txn)
637        }
638
639        pub(crate) fn tidy(&mut self, tidy: Tidy) {
640            self.txn.tidy(tidy)
641        }
642
643        #[allow(dead_code)]
644        fn take_tidy(&mut self) -> Tidy {
645            self.txn.take_tidy()
646        }
647    }
648
649    /// A test helper for collecting committed writes and later comparing them
650    /// to reads for correctness.
651    #[derive(Debug, Clone)]
652    pub struct CommitLog {
653        client: PersistClient,
654        txns_id: ShardId,
655        writes: Arc<Mutex<Vec<(ShardId, String, u64, i64)>>>,
656        tx: Sender<(ShardId, String, u64, i64)>,
657        rx: Receiver<(ShardId, String, u64, i64)>,
658    }
659
660    impl CommitLog {
661        pub fn new(client: PersistClient, txns_id: ShardId) -> Self {
662            let (tx, rx) = crossbeam_channel::unbounded();
663            CommitLog {
664                client,
665                txns_id,
666                writes: Arc::new(Mutex::new(Vec::new())),
667                tx,
668                rx,
669            }
670        }
671
672        pub fn record(&self, update: (ShardId, String, u64, i64)) {
673            let () = self.tx.send(update).unwrap();
674        }
675
676        pub fn record_txn(&self, commit_ts: u64, txn: &TestTxn<String, (), u64, i64>) {
677            for (data_id, writes) in txn.writes.iter() {
678                for (k, (), d) in writes.iter() {
679                    self.record((*data_id, k.clone(), commit_ts, *d));
680                }
681            }
682        }
683
684        #[track_caller]
685        pub fn assert_eq(
686            &self,
687            data_id: ShardId,
688            as_of: u64,
689            until: u64,
690            actual: impl IntoIterator<Item = (String, u64, i64)>,
691        ) {
692            // First read everything off the channel.
693            let mut expected = {
694                let mut writes = self.writes.lock().unwrap();
695                loop {
696                    match self.rx.try_recv() {
697                        Ok(x) => writes.push(x),
698                        Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
699                    }
700                }
701                writes
702                    .iter()
703                    .flat_map(|(id, key, ts, diff)| {
704                        if id != &data_id {
705                            return None;
706                        }
707                        let mut ts = *ts;
708                        if ts < as_of {
709                            ts = as_of;
710                        }
711                        if until <= ts {
712                            None
713                        } else {
714                            Some((key.clone(), ts, *diff))
715                        }
716                    })
717                    .collect()
718            };
719            consolidate_updates(&mut expected);
720            let mut actual = actual.into_iter().filter(|(_, t, _)| t < &until).collect();
721            consolidate_updates(&mut actual);
722            // NB: Extra spaces after actual are so it lines up with expected.
723            tracing::debug!(
724                "{:.9} as_of={} until={} actual  ={:?}",
725                data_id,
726                as_of,
727                until,
728                actual
729            );
730            tracing::debug!(
731                "{:.9} as_of={} until={} expected={:?}",
732                data_id,
733                as_of,
734                until,
735                expected
736            );
737            assert_eq!(actual, expected)
738        }
739
740        #[allow(ungated_async_fn_track_caller)]
741        #[track_caller]
742        pub async fn assert_snapshot(&self, data_id: ShardId, as_of: u64) {
743            let mut cache: TxnsCache<u64, TxnsCodecDefault> =
744                TxnsCache::open(&self.client, self.txns_id, Some(data_id)).await;
745            let _ = cache.update_gt(&as_of).await;
746            let snapshot = cache.data_snapshot(data_id, as_of);
747            let mut data_read = self
748                .client
749                .open_leased_reader(
750                    data_id,
751                    Arc::new(StringSchema),
752                    Arc::new(UnitSchema),
753                    Diagnostics::from_purpose("assert snapshot"),
754                    true,
755                )
756                .await
757                .expect("reader creation shouldn't panic");
758            let snapshot = snapshot
759                .snapshot_and_fetch(&mut data_read)
760                .await
761                .expect("snapshot shouldn't panic");
762            data_read.expire().await;
763            let snapshot: Vec<_> = snapshot
764                .into_iter()
765                .map(|((k, v), t, d)| {
766                    let (k, ()) = (k.unwrap(), v.unwrap());
767                    (k, t, d)
768                })
769                .collect();
770
771            // Check that a subscribe would produce the same result.
772            let subscribe = self.subscribe(data_id, as_of, as_of + 1).await;
773            assert_eq!(
774                snapshot.iter().collect::<BTreeSet<_>>(),
775                subscribe.output().into_iter().collect::<BTreeSet<_>>()
776            );
777
778            // Check that the result is correct.
779            self.assert_eq(data_id, as_of, as_of + 1, snapshot);
780        }
781
782        #[allow(ungated_async_fn_track_caller)]
783        #[track_caller]
784        pub async fn assert_subscribe(&self, data_id: ShardId, as_of: u64, until: u64) {
785            let data_subscribe = self.subscribe(data_id, as_of, until).await;
786            self.assert_eq(data_id, as_of, until, data_subscribe.output().clone());
787        }
788
789        #[allow(ungated_async_fn_track_caller)]
790        #[track_caller]
791        pub async fn subscribe(&self, data_id: ShardId, as_of: u64, until: u64) -> DataSubscribe {
792            let mut data_subscribe = DataSubscribe::new(
793                "test",
794                self.client.clone(),
795                self.txns_id,
796                data_id,
797                as_of,
798                Antichain::new(),
799                true,
800            );
801            data_subscribe.step_past(until - 1).await;
802            data_subscribe
803        }
804    }
805
806    pub(crate) async fn writer(
807        client: &PersistClient,
808        data_id: ShardId,
809    ) -> WriteHandle<String, (), u64, i64> {
810        client
811            .open_writer(
812                data_id,
813                Arc::new(StringSchema),
814                Arc::new(UnitSchema),
815                Diagnostics::for_tests(),
816            )
817            .await
818            .expect("codecs should not change")
819    }
820
821    pub(crate) async fn reader(
822        client: &PersistClient,
823        data_id: ShardId,
824    ) -> ReadHandle<String, (), u64, i64> {
825        client
826            .open_leased_reader(
827                data_id,
828                Arc::new(StringSchema),
829                Arc::new(UnitSchema),
830                Diagnostics::for_tests(),
831                true,
832            )
833            .await
834            .expect("codecs should not change")
835    }
836
837    pub(crate) async fn write_directly(
838        ts: u64,
839        data_write: &mut WriteHandle<String, (), u64, i64>,
840        keys: &[&str],
841        log: &CommitLog,
842    ) {
843        let data_id = data_write.shard_id();
844        let keys = keys.iter().map(|x| (*x).to_owned()).collect::<Vec<_>>();
845        let updates = keys.iter().map(|k| ((k, &()), &ts, 1)).collect::<Vec<_>>();
846        let mut current = data_write.shared_upper().into_option().unwrap();
847        loop {
848            let res = crate::small_caa(
849                || format!("data {:.9} directly", data_id),
850                data_write,
851                &updates,
852                current,
853                ts + 1,
854            )
855            .await;
856            match res {
857                Ok(()) => {
858                    for ((k, ()), t, d) in updates {
859                        log.record((data_id, k.to_owned(), *t, d));
860                    }
861                    return;
862                }
863                Err(new_current) => current = new_current,
864            }
865        }
866    }
867
868    #[mz_ore::test(tokio::test)]
869    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
870    async fn commit_log() {
871        let (d0, d1) = (ShardId::new(), ShardId::new());
872        let log0 = CommitLog::new(PersistClient::new_for_tests().await, ShardId::new());
873
874        // Send before cloning into another handle.
875        log0.record((d0, "0".into(), 0, 1));
876
877        // Send after cloning into another handle. Also push duplicate (which
878        // gets consolidated).
879        let log1 = log0.clone();
880        log0.record((d0, "2".into(), 2, 1));
881        log1.record((d0, "2".into(), 2, 1));
882
883        // Send retraction.
884        log0.record((d0, "3".into(), 3, 1));
885        log1.record((d0, "3".into(), 4, -1));
886
887        // Send out of order.
888        log0.record((d0, "1".into(), 1, 1));
889
890        // Send to a different shard.
891        log1.record((d1, "5".into(), 5, 1));
892
893        // Assert_eq with no advancement or truncation.
894        log0.assert_eq(
895            d0,
896            0,
897            6,
898            vec![
899                ("0".into(), 0, 1),
900                ("1".into(), 1, 1),
901                ("2".into(), 2, 2),
902                ("3".into(), 3, 1),
903                ("3".into(), 4, -1),
904            ],
905        );
906        log0.assert_eq(d1, 0, 6, vec![("5".into(), 5, 1)]);
907
908        // Assert_eq with advancement.
909        log0.assert_eq(
910            d0,
911            4,
912            6,
913            vec![("0".into(), 4, 1), ("1".into(), 4, 1), ("2".into(), 4, 2)],
914        );
915
916        // Assert_eq with truncation.
917        log0.assert_eq(
918            d0,
919            0,
920            3,
921            vec![("0".into(), 0, 1), ("1".into(), 1, 1), ("2".into(), 2, 2)],
922        );
923    }
924
925    #[mz_ore::test(tokio::test)]
926    #[cfg_attr(miri, ignore)] // too slow
927    async fn unique_batch_serialization() {
928        let client = PersistClient::new_for_tests().await;
929        let mut write = writer(&client, ShardId::new()).await;
930        let data = [(("foo".to_owned(), ()), 0, 1)];
931        let batch = write
932            .batch(&data, Antichain::from_elem(0), Antichain::from_elem(1))
933            .await
934            .unwrap();
935
936        // Pretend we somehow got two batches that happen to have the same
937        // serialization.
938        let b0_raw = batch.into_transmittable_batch();
939        let b1_raw = b0_raw.clone();
940        assert_eq!(b0_raw.encode_to_vec(), b1_raw.encode_to_vec());
941
942        // They don't if we wrap them in ProtoIdBatch.
943        let b0 = ProtoIdBatch::new(b0_raw.clone());
944        let b1 = ProtoIdBatch::new(b1_raw);
945        assert!(b0.encode_to_vec() != b1.encode_to_vec());
946
947        // The transmittable batch roundtrips.
948        let roundtrip = ProtoIdBatch::parse(&b0.encode_to_vec());
949        assert_eq!(roundtrip, b0_raw);
950
951        // We've started running things in all of staging, so we've got to be
952        // able to read the previous serialization (ProtoBatch directly) back.
953        let roundtrip = ProtoIdBatch::parse(&b0_raw.encode_to_vec());
954        assert_eq!(roundtrip, b0_raw);
955    }
956}