Crate mz_txn_wal

source ·
Expand description

Atomic multi-shard persist writes.

This crate presents an abstraction on top of persist shards, allowing efficient atomic multi-shard writes. This is accomplished through an additional txn shard that coordinates writes to a (potentially large) number of data shards. Data shards may be added and removed to the set at any time.

WARNING! While a data shard is registered to the txn set, writing to it directly (i.e. using a WriteHandle instead of the TxnsHandle) will lead to incorrectness, undefined behavior, and (potentially sticky) panics.

Benefits of these txns:

  • Key idea: A transactional write costs in proportion to the total size of data written, and the number of data shards involved (plus one for the txns shard).
  • Key idea: As time progresses, the upper of every data shard is logically (but not physically) advanced en masse with a single write to the txns shard. (Data writes may also be bundled into this, if desired.)
  • Transactions are write-only, but read-then-write transactions can be built on top by using read and write timestamps selected to have no possible writes in between (e.g. write_ts/commit_ts = read_ts + 1).
  • Transactions of any size are supported in bounded memory. This is done though the usual persist mechanism of spilling to s3. These spilled batched are efficiently re-timestamped when a commit must be retried at a higher timestamp.
  • The data shards may be read independently of each other.
  • The persist “maintenance” work assigned on behalf of the committed txn is (usually, see below) assigned to the txn committer.
  • It is possible to implement any of snapshot, serializable, or strict-serializable isolation on top of this interface via write and read timestamp selections (see #Isolation below for details).
  • It is possible to serialize and communicate an uncommitted Txn between processes and also to merge uncommitted Txns, if necessary (e.g. consolidating all monitoring collections, statement logging, etc into the periodic timestamp advancement). This is not initially implemented, but could be.


  • Data shards must all use the same codecs for K, V, T, D. However, each data shard may have a independent K and V schemas. The txns shard inherits the T codec from the data shards (and uses its own K, V, D ones).
  • All txn writes are linearized through the txns shard, so there is some limit to horizontal and geographical scale out.
  • Performance has been tuned for throughput and un-contended latency. Latency on contended workloads will likely be quite bad. At a high level, if N txns are run concurrently, 1 will commit and N-1 will have to (usually cheaply) retry. (However, note that it is also possible to combine and commit multiple txns at the same timestamp, as mentioned above, which gives us some amount of knobs for doing something different here.)

§Intuition and Jargon

  • The txns shard is the source of truth for what has (and has not) committed to a set of data shards.
  • Each data shard must be registered at some register_ts before being used in transactions. Registration is for bookkeeping only, there is no particular meaning to the timestamp other than it being a lower bound on when txns using this data shard can commit. Registration only needs to be run once-ever per data shard, but it is idempotent, so can also be run at-least-once.
  • A txn is broken into three phases:
    • (Elided: A pre-txn phase where MZ might perform reads for read-then-write txns or might buffer writes.)

    • commit: The txn is committed by writing lightweight pointers to (potentially large) batches of data as updates in txns_shard with a timestamp of commit_ts. Feel free to think of this as a WAL. This makes the txn durable (thus “definite”) and also advances the logical upper of every data shard registered at a timestamp before commit_ts, including those not involved in the txn. However, at this point, it is not yet possible to read at the commit ts.

    • apply: We could serve reads of data shards from the information in the txns shard, but instead we choose to serve them from the physical data shard itself so that we may reuse existing persist infrastructure (e.g. multi-worker persist-source). This means we must take the batch pointers written to the txns shard and, in commit_ts order, “denormalize” them into each data shard with compare_and_append. We call this process applying the txn. Feel free to think of this as applying the WAL.

      (Note that this means each data shard’s physical upper reflects the last committed txn touching that shard, and so the logical upper may be greater than this. See TxnsCache for more details.)

    • tidy: After a committed txn has been applied, the updates for that txn are retracted from the txns shard. (To handle races, both application and retraction are written to be idempotent.) This prevents the txns shard from growing unboundedly and also means that, at any given time, the txns shard contains the set of txns that need to be applied (as well as the set of registered data shards).


// Open a txn shard, initializing it if necessary.
let txns_id = ShardId::new();
let mut txns = TxnsHandle::<String, (), u64, i64>::open(
    0u64, client.clone(), metrics, txns_id, StringSchema.into(), UnitSchema.into()

// Register data shards to the txn set.
let (d0, d1) = (ShardId::new(), ShardId::new());
txns.register(1u64, [d0_write]).await.expect("not previously initialized");
txns.register(2u64, [d1_write]).await.expect("not previously initialized");

// Commit a txn. This is durable if/when the `commit_at` succeeds, but reads
// at the commit ts will _block_ until after the txn is applied. Users are
// free to pass up the commit ack (e.g. to pgwire) to get a bit of latency
// back. NB: It is expected that the txn committer will run the apply step,
// but in the event of a crash, neither correctness nor liveness depend on
// it.
let mut txn = txns.begin();
txn.write(&d0, "0".into(), (), 1);
txn.write(&d1, "1".into(), (), -1);
let tidy = txn.commit_at(&mut txns, 3).await.expect("ts 3 available")
    // Make it available to reads by applying it.
    .apply(&mut txns).await;

// Commit a contended txn at a higher timestamp. Note that the upper of `d1`
// is also advanced by this. At the same time clean up after our last commit
// (the tidy).
let mut txn = txns.begin();
txn.write(&d0, "2".into(), (), 1);
txn.commit_at(&mut txns, 3).await.expect_err("ts 3 not available");
let _tidy = txn.commit_at(&mut txns, 4).await.expect("ts 4 available")
    .apply(&mut txns).await;

// Read data shard(s) at some `read_ts`.
let mut subscribe = DataSubscribe::new("example", client, txns_id, d1, 4, Antichain::new(), true);
while subscribe.progress() <= 4 {
let updates = subscribe.output();


This section is about “read-then-write” txns where all reads are performed before any writes (read-only and write-only are trivial specializations of this). All reads are performed at some read_ts and then all writes are performed at write_ts (aka the commit_ts).

  • To implement snapshot isolation using the above, select any read_ts < write_ts. The write_ts can advance as necessary when retrying on conflicts.
  • To implement serializable isolation using the above, select write_ts = read_ts + 1. If the write_ts must be pushed as a result of a conflict, then the read_ts must be similarly advanced. Note that if you happen to have a system for efficiently computing changes to data as inputs change (hmmm), it may be better to reason about (read_ts, new_read_ts] then to recompute the reads from scratch.
  • To implement strict serializable (serializable + linearizable) isolation, do the same as serializable, but with the additional constraints on write_ts required by linearizability (handwave).


For details of the implementation of writes, see TxnsHandle.

For details of the implementation of reads, see TxnsCache.


  • Prometheus monitoring metrics.
  • Timely operators for the crate
  • proto 🔒
  • A cache of the txn shard contents.
  • Interfaces for reading txn shards as well as data shards.
  • Interfaces for writing txn shards as well as data shards.
  • An interface for atomic multi-shard writes.



  • Adds the full set of all txn-wal Configs.
  • apply_caa 🔒
    Ensures that a committed batch has been applied into a physical data shard, making it available for reads.
  • cads 🔒
  • empty_caa 🔒
    Ensures that the upper of the shard is past init_ts by writing an empty batch, retrying as necessary.
  • small_caa 🔒
    Helper for common logging for compare_and_append-ing a small amount of data.