mz_txn_wal/lib.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Atomic multi-shard [persist] writes.
11//!
12//! [persist]: mz_persist_client
13//!
14//! This crate presents an abstraction on top of persist shards, allowing
15//! efficient atomic multi-shard writes. This is accomplished through an
16//! additional _txn_ shard that coordinates writes to a (potentially large)
17//! number of _data_ shards. Data shards may be added and removed to the set at
18//! any time.
19//!
20//! **WARNING!** While a data shard is registered to the txn set, writing to it
21//! directly (i.e. using a [WriteHandle] instead of the [TxnsHandle]) will lead
22//! to incorrectness, undefined behavior, and (potentially sticky) panics.
23//!
24//! [WriteHandle]: mz_persist_client::write::WriteHandle
25//! [TxnsHandle]: crate::txns::TxnsHandle
26//!
27//! Benefits of these txns:
28//! - _Key idea_: A transactional write costs in proportion to the total size of
29//! data written, and the number of data shards involved (plus one for the
30//! txns shard).
31//! - _Key idea_: As time progresses, the upper of every data shard is logically
32//! (but not physically) advanced en masse with a single write to the txns
33//! shard. (Data writes may also be bundled into this, if desired.)
34//! - Transactions are write-only, but read-then-write transactions can be built
35//! on top by using read and write timestamps selected to have no possible
36//! writes in between (e.g. `write_ts/commit_ts = read_ts + 1`).
37//! - Transactions of any size are supported in bounded memory. This is done
38//! though the usual persist mechanism of spilling to s3. These spilled
39//! batched are efficiently re-timestamped when a commit must be retried at a
40//! higher timestamp.
41//! - The data shards may be read independently of each other.
42//! - The persist "maintenance" work assigned on behalf of the committed txn is
43//! (usually, see below) assigned to the txn committer.
44//! - It is possible to implement any of snapshot, serializable, or
45//! strict-serializable isolation on top of this interface via write and read
46//! timestamp selections (see [#Isolation](#isolation) below for details).
47//! - It is possible to serialize and communicate an uncommitted [Txn] between
48//! processes and also to merge uncommitted [Txn]s, if necessary (e.g.
49//! consolidating all monitoring collections, statement logging, etc into the
50//! periodic timestamp advancement). This is not initially implemented, but
51//! could be.
52//!
53//! [Txn]: crate::txn_write::Txn
54//!
55//! Restrictions:
56//! - Data shards must all use the same codecs for `K, V, T, D`. However, each
57//! data shard may have a independent `K` and `V` schemas. The txns shard
58//! inherits the `T` codec from the data shards (and uses its own `K, V, D`
59//! ones).
60//! - All txn writes are linearized through the txns shard, so there is some
61//! limit to horizontal and geographical scale out.
62//! - Performance has been tuned for _throughput_ and _un-contended latency_.
63//! Latency on contended workloads will likely be quite bad. At a high level,
64//! if N txns are run concurrently, 1 will commit and N-1 will have to
65//! (usually cheaply) retry. (However, note that it is also possible to
66//! combine and commit multiple txns at the same timestamp, as mentioned
67//! above, which gives us some amount of knobs for doing something different
68//! here.)
69//!
70//! # Intuition and Jargon
71//!
72//! - The _txns shard_ is the source of truth for what has (and has not)
73//! committed to a set of _data shards_.
74//! - Each data shard must be _registered_ at some `register_ts` before being
75//! used in transactions. Registration is for bookkeeping only, there is no
76//! particular meaning to the timestamp other than it being a lower bound on
77//! when txns using this data shard can commit. Registration only needs to be
78//! run once-ever per data shard, but it is idempotent, so can also be run
79//! at-least-once.
80//! - A txn is broken into three phases:
81//! - (Elided: A pre-txn phase where MZ might perform reads for
82//! read-then-write txns or might buffer writes.)
83//! - _commit_: The txn is committed by writing lightweight pointers to
84//! (potentially large) batches of data as updates in txns_shard with a
85//! timestamp of `commit_ts`. Feel free to think of this as a WAL. This
86//! makes the txn durable (thus "definite") and also advances the _logical
87//! upper_ of every data shard registered at a timestamp before commit_ts,
88//! including those not involved in the txn. However, at this point, it is
89//! not yet possible to read at the commit ts.
90//! - _apply_: We could serve reads of data shards from the information in the
91//! txns shard, but instead we choose to serve them from the physical data
92//! shard itself so that we may reuse existing persist infrastructure (e.g.
93//! multi-worker persist-source). This means we must take the batch pointers
94//! written to the txns shard and, in commit_ts order, "denormalize" them
95//! into each data shard with `compare_and_append`. We call this process
96//! applying the txn. Feel free to think of this as applying the WAL.
97//!
98//! (Note that this means each data shard's _physical upper_ reflects the
99//! last committed txn touching that shard, and so the _logical upper_ may
100//! be greater than this. See [TxnsCache] for more details.)
101//! - _tidy_: After a committed txn has been applied, the updates for that txn
102//! are retracted from the txns shard. (To handle races, both application
103//! and retraction are written to be idempotent.) This prevents the txns
104//! shard from growing unboundedly and also means that, at any given time,
105//! the txns shard contains the set of txns that need to be applied (as well
106//! as the set of registered data shards).
107//!
108//! [TxnsCache]: crate::txn_cache::TxnsCache
109//!
110//! # Usage
111//!
112//! ```
113//! # use std::sync::Arc;
114//! # use mz_ore::metrics::MetricsRegistry;
115//! # use mz_persist_client::critical::Opaque;
116//! # use mz_persist_client::{Diagnostics, PersistClient, ShardId};
117//! # use mz_txn_wal::metrics::Metrics;
118//! # use mz_txn_wal::operator::DataSubscribe;
119//! # use mz_txn_wal::txns::TxnsHandle;
120//! # use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
121//! # use timely::progress::Antichain;
122//! #
123//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
124//! # let client = PersistClient::new_for_tests().await;
125//! # let dyncfgs = mz_txn_wal::all_dyncfgs(client.dyncfgs().clone());
126//! # let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
127//! # mz_ore::test::init_logging();
128//! // Open a txn shard, initializing it if necessary.
129//! let txns_id = ShardId::new();
130//! let mut txns = TxnsHandle::<String, (), u64, i64>::open(
131//! 0u64, client.clone(), dyncfgs, metrics, txns_id, Opaque::encode(&0u64)
132//! ).await;
133//!
134//! // Register data shards to the txn set.
135//! let (d0, d1) = (ShardId::new(), ShardId::new());
136//! # let d0_write = client.open_writer(
137//! # d0, StringSchema.into(), UnitSchema.into(), Diagnostics::for_tests()
138//! # ).await.unwrap();
139//! # let d1_write = client.open_writer(
140//! # d1, StringSchema.into(), UnitSchema.into(), Diagnostics::for_tests()
141//! # ).await.unwrap();
142//! txns.register(1u64, [d0_write]).await.expect("not previously initialized");
143//! txns.register(2u64, [d1_write]).await.expect("not previously initialized");
144//!
145//! // Commit a txn. This is durable if/when the `commit_at` succeeds, but reads
146//! // at the commit ts will _block_ until after the txn is applied. Users are
147//! // free to pass up the commit ack (e.g. to pgwire) to get a bit of latency
148//! // back. NB: It is expected that the txn committer will run the apply step,
149//! // but in the event of a crash, neither correctness nor liveness depend on
150//! // it.
151//! let mut txn = txns.begin();
152//! txn.write(&d0, "0".into(), (), 1);
153//! txn.write(&d1, "1".into(), (), -1);
154//! let tidy = txn.commit_at(&mut txns, 3).await.expect("ts 3 available")
155//! // Make it available to reads by applying it.
156//! .apply(&mut txns).await;
157//!
158//! // Commit a contended txn at a higher timestamp. Note that the upper of `d1`
159//! // is also advanced by this. At the same time clean up after our last commit
160//! // (the tidy).
161//! let mut txn = txns.begin();
162//! txn.write(&d0, "2".into(), (), 1);
163//! txn.tidy(tidy);
164//! txn.commit_at(&mut txns, 3).await.expect_err("ts 3 not available");
165//! let _tidy = txn.commit_at(&mut txns, 4).await.expect("ts 4 available")
166//! .apply(&mut txns).await;
167//!
168//! // Read data shard(s) at some `read_ts`.
169//! let mut subscribe = DataSubscribe::new("example", client, txns_id, d1, 4, Antichain::new());
170//! while subscribe.progress() <= 4 {
171//! subscribe.step();
172//! # tokio::task::yield_now().await;
173//! }
174//! let updates = subscribe.output();
175//! # })
176//! ```
177//!
178//! # Isolation
179//!
180//! This section is about "read-then-write" txns where all reads are performed
181//! before any writes (read-only and write-only are trivial specializations of
182//! this). All reads are performed at some `read_ts` and then all writes are
183//! performed at `write_ts` (aka the `commit_ts`).
184//!
185//! - To implement snapshot isolation using the above, select any `read_ts <
186//! write_ts`. The `write_ts` can advance as necessary when retrying on
187//! conflicts.
188//! - To implement serializable isolation using the above, select `write_ts =
189//! read_ts + 1`. If the `write_ts` must be pushed as a result of a conflict,
190//! then the `read_ts` must be similarly advanced. Note that if you happen to
191//! have a system for efficiently computing changes to data as inputs change
192//! (hmmm), it may be better to reason about `(read_ts, new_read_ts]` then to
193//! recompute the reads from scratch.
194//! - To implement strict serializable (serializable + linearizable) isolation,
195//! do the same as serializable, but with the additional constraints on
196//! write_ts required by linearizability (handwave).
197//!
198//! # Implementation
199//!
200//! For details of the implementation of writes, see [TxnsHandle].
201//!
202//! For details of the implementation of reads, see [TxnsCache].
203
204#![warn(missing_docs, missing_debug_implementations)]
205
206use std::fmt::Debug;
207use std::fmt::Write;
208
209use differential_dataflow::Hashable;
210use differential_dataflow::difference::Monoid;
211use differential_dataflow::lattice::Lattice;
212use mz_dyncfg::ConfigSet;
213use mz_ore::instrument;
214use mz_persist_client::ShardId;
215use mz_persist_client::critical::SinceHandle;
216use mz_persist_client::error::UpperMismatch;
217use mz_persist_client::write::WriteHandle;
218use mz_persist_types::codec_impls::{ShardIdSchema, VecU8Schema};
219use mz_persist_types::stats::PartStats;
220use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
221use mz_persist_types::{Codec, Codec64, StepForward};
222use timely::order::TotalOrder;
223use timely::progress::{Antichain, Timestamp};
224use tracing::{debug, error};
225
226use crate::proto::ProtoIdBatch;
227use crate::txns::DataWriteApply;
228
229pub mod metrics;
230pub mod operator;
231pub mod txn_cache;
232pub mod txn_read;
233pub mod txn_write;
234pub mod txns;
235
236mod proto {
237 use bytes::Bytes;
238 use mz_persist_client::batch::ProtoBatch;
239 use prost::Message;
240 use uuid::Uuid;
241
242 include!(concat!(env!("OUT_DIR"), "/mz_txn_wal.proto.rs"));
243
244 impl ProtoIdBatch {
245 pub(crate) fn new(batch: ProtoBatch) -> ProtoIdBatch {
246 ProtoIdBatch {
247 batch_id: Bytes::copy_from_slice(Uuid::new_v4().as_bytes()),
248 batch: Some(batch),
249 }
250 }
251
252 /// Recovers the ProtoBatch from an encoded batch.
253 ///
254 /// This might be an encoded ProtoIdBatch (new path) or a ProtoBatch
255 /// (legacy path). Some proto shenanigans are done to sniff out which.
256 pub(crate) fn parse(buf: &[u8]) -> ProtoBatch {
257 let b = ProtoIdBatch::decode(buf).expect("valid ProtoIdBatch");
258 // First try the new format.
259 if let Some(batch) = b.batch {
260 return batch;
261 }
262 // Fall back to the legacy format.
263 ProtoBatch::decode(buf).expect("valid (legacy) ProtoBatch")
264 }
265 }
266}
267
268/// Adds the full set of all txn-wal `Config`s.
269pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
270 configs
271 .add(&crate::operator::DATA_SHARD_RETRYER_CLAMP)
272 .add(&crate::operator::DATA_SHARD_RETRYER_INITIAL_BACKOFF)
273 .add(&crate::operator::DATA_SHARD_RETRYER_MULTIPLIER)
274 .add(&crate::txns::APPLY_ENSURE_SCHEMA_MATCH)
275}
276
277/// A reasonable default implementation of [TxnsCodec].
278///
279/// This uses the "native" Codecs for `ShardId` and `Vec<u8>`, with the latter
280/// empty for [TxnsEntry::Register] and non-empty for [TxnsEntry::Append].
281#[derive(Debug)]
282pub struct TxnsCodecDefault;
283
284impl TxnsCodec for TxnsCodecDefault {
285 type Key = ShardId;
286 type Val = Vec<u8>;
287 fn schemas() -> (<Self::Key as Codec>::Schema, <Self::Val as Codec>::Schema) {
288 (ShardIdSchema, VecU8Schema)
289 }
290 fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
291 match e {
292 TxnsEntry::Register(data_id, ts) => (data_id, ts.to_vec()),
293 TxnsEntry::Append(data_id, ts, batch) => {
294 // Put the ts at the end to let decode truncate it off.
295 (data_id, batch.into_iter().chain(ts).collect())
296 }
297 }
298 }
299 fn decode(key: Self::Key, mut val: Self::Val) -> TxnsEntry {
300 let mut ts = [0u8; 8];
301 let ts_idx = val.len().checked_sub(8).expect("ts encoded at end of val");
302 ts.copy_from_slice(&val[ts_idx..]);
303 val.truncate(ts_idx);
304 if val.is_empty() {
305 TxnsEntry::Register(key, ts)
306 } else {
307 TxnsEntry::Append(key, ts, val)
308 }
309 }
310 fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
311 let stats = stats
312 .key
313 .col("")?
314 .try_as_string()
315 .map_err(|err| error!("unexpected stats type: {}", err))
316 .ok()?;
317 let data_id_str = data_id.to_string();
318 Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
319 }
320}
321
322/// Helper for common logging for compare_and_append-ing a small amount of data.
323#[instrument(level = "debug", fields(shard=%txns_or_data_write.shard_id(), ts=?new_upper))]
324pub(crate) async fn small_caa<S, F, K, V, T, D>(
325 name: F,
326 txns_or_data_write: &mut WriteHandle<K, V, T, D>,
327 updates: &[((&K, &V), &T, D)],
328 upper: T,
329 new_upper: T,
330) -> Result<(), T>
331where
332 S: AsRef<str>,
333 F: Fn() -> S,
334 K: Debug + Codec,
335 V: Debug + Codec,
336 T: Timestamp + Lattice + TotalOrder + Codec64 + Sync,
337 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
338{
339 fn debug_sep<'a, T: Debug + 'a>(sep: &str, xs: impl IntoIterator<Item = &'a T>) -> String {
340 xs.into_iter().fold(String::new(), |mut output, x| {
341 let _ = write!(output, "{}{:?}", sep, x);
342 output
343 })
344 }
345 debug!(
346 "CaA {} [{:?},{:?}){}",
347 name().as_ref(),
348 upper,
349 new_upper,
350 // This is a "small" CaA so we can inline the data in this debug log.
351 debug_sep("\n ", updates)
352 );
353 let res = txns_or_data_write
354 .compare_and_append(
355 updates,
356 Antichain::from_elem(upper.clone()),
357 Antichain::from_elem(new_upper.clone()),
358 )
359 .await
360 .expect("usage was valid");
361 match res {
362 Ok(()) => {
363 debug!(
364 "CaA {} [{:?},{:?}) success",
365 name().as_ref(),
366 upper,
367 new_upper
368 );
369 Ok(())
370 }
371 Err(UpperMismatch { current, .. }) => {
372 let current = current
373 .into_option()
374 .expect("txns shard should not be closed");
375 debug!(
376 "CaA {} [{:?},{:?}) mismatch actual={:?}",
377 name().as_ref(),
378 upper,
379 new_upper,
380 current,
381 );
382 Err(current)
383 }
384 }
385}
386
387/// Ensures that the upper of the shard is past init_ts by writing an empty
388/// batch, retrying as necessary.
389///
390/// This method is idempotent.
391pub(crate) async fn empty_caa<S, F, K, V, T, D>(
392 name: F,
393 txns_or_data_write: &mut WriteHandle<K, V, T, D>,
394 init_ts: T,
395) where
396 S: AsRef<str>,
397 F: Fn() -> S,
398 K: Debug + Codec,
399 V: Debug + Codec,
400 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
401 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
402{
403 let name = name();
404 let empty: &[((&K, &V), &T, D)] = &[];
405 let Some(mut upper) = txns_or_data_write.shared_upper().into_option() else {
406 // Shard is closed, which means the upper must be past init_ts.
407 return;
408 };
409 loop {
410 if init_ts < upper {
411 return;
412 }
413 let res = small_caa(
414 || name.as_ref(),
415 txns_or_data_write,
416 empty,
417 upper,
418 init_ts.step_forward(),
419 )
420 .await;
421 match res {
422 Ok(()) => return,
423 Err(current) => {
424 upper = current;
425 }
426 }
427 }
428}
429
430/// Ensures that a committed batch has been applied into a physical data shard,
431/// making it available for reads.
432///
433/// This process is definite work on top of definite input, so the
434/// implementation assumes that if the upper of the shard passes commit_ts then
435/// the work must have already been done by someone else. (Think how our compute
436/// replicas race to compute some MATERIALIZED VIEW, but they're all guaranteed
437/// to get the same answer.)
438#[instrument(level = "debug", fields(shard=%data_write.shard_id(), ts=?commit_ts))]
439async fn apply_caa<K, V, T, D>(
440 data_write: &mut DataWriteApply<K, V, T, D>,
441 batch_raws: &Vec<&[u8]>,
442 commit_ts: T,
443) where
444 K: Debug + Codec,
445 V: Debug + Codec,
446 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
447 D: Monoid + Ord + Codec64 + Send + Sync,
448{
449 let mut batches = batch_raws
450 .into_iter()
451 .map(|batch| ProtoIdBatch::parse(batch))
452 .map(|batch| data_write.batch_from_transmittable_batch(batch))
453 .collect::<Vec<_>>();
454 let Some(mut upper) = data_write.shared_upper().into_option() else {
455 // Shard is closed, which means the upper must be past init_ts.
456 // Mark the batches as consumed, so we don't get warnings in the logs.
457 for batch in batches {
458 batch.into_hollow_batch();
459 }
460 return;
461 };
462 loop {
463 if commit_ts < upper {
464 debug!(
465 "CaA data {:.9} apply t={:?} already done",
466 data_write.shard_id().to_string(),
467 commit_ts
468 );
469 // Mark the batches as consumed, so we don't get warnings in the logs.
470 for batch in batches {
471 batch.into_hollow_batch();
472 }
473 return;
474 }
475
476 // Make sure we're using the same schema to CaA these batches as what
477 // they were written with.
478 data_write.maybe_replace_with_batch_schema(&batches).await;
479
480 debug!(
481 "CaA data {:.9} apply b={:?} t={:?} [{:?},{:?})",
482 data_write.shard_id().to_string(),
483 batch_raws
484 .iter()
485 .map(|batch_raw| batch_raw.hashed())
486 .collect::<Vec<_>>(),
487 commit_ts,
488 upper,
489 commit_ts.step_forward(),
490 );
491 let mut batches = batches.iter_mut().collect::<Vec<_>>();
492 let res = data_write
493 .compare_and_append_batch(
494 batches.as_mut_slice(),
495 Antichain::from_elem(upper.clone()),
496 Antichain::from_elem(commit_ts.step_forward()),
497 true,
498 )
499 .await
500 .expect("usage was valid");
501 match res {
502 Ok(()) => {
503 debug!(
504 "CaA data {:.9} apply t={:?} [{:?},{:?}) success",
505 data_write.shard_id().to_string(),
506 commit_ts,
507 upper,
508 commit_ts.step_forward(),
509 );
510 return;
511 }
512 Err(UpperMismatch { current, .. }) => {
513 let current = current.into_option().expect("data should not be closed");
514 debug!(
515 "CaA data {:.9} apply t={:?} [{:?},{:?}) mismatch actual={:?}",
516 data_write.shard_id().to_string(),
517 commit_ts,
518 upper,
519 commit_ts.step_forward(),
520 current,
521 );
522 upper = current;
523 continue;
524 }
525 }
526 }
527}
528
529#[instrument(level = "debug", fields(shard=%txns_since.shard_id(), ts=?new_since_ts))]
530pub(crate) async fn cads<T, C>(
531 txns_since: &mut SinceHandle<C::Key, C::Val, T, i64>,
532 new_since_ts: T,
533) where
534 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
535 C: TxnsCodec,
536{
537 // Fast-path, don't bother trying to CaDS if we're already past that
538 // since.
539 if !txns_since.since().less_than(&new_since_ts) {
540 return;
541 }
542 let token = txns_since.opaque().clone();
543 let res = txns_since
544 .compare_and_downgrade_since(&token, (&token, &Antichain::from_elem(new_since_ts)))
545 .await;
546 match res {
547 Ok(_) => {}
548 Err(actual) => {
549 mz_ore::halt!("fenced by another process @ {actual:?}. ours = {token:?}")
550 }
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use std::collections::{BTreeMap, BTreeSet};
557 use std::sync::Arc;
558 use std::sync::Mutex;
559
560 use crossbeam_channel::{Receiver, Sender, TryRecvError};
561 use differential_dataflow::consolidation::consolidate_updates;
562 use mz_persist_client::read::ReadHandle;
563 use mz_persist_client::{Diagnostics, PersistClient, ShardId};
564 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
565 use prost::Message;
566
567 use crate::operator::DataSubscribe;
568 use crate::txn_cache::TxnsCache;
569 use crate::txn_write::{Txn, TxnApply};
570 use crate::txns::{Tidy, TxnsHandle};
571
572 use super::*;
573
574 impl<K, V, T, D, C> TxnsHandle<K, V, T, D, C>
575 where
576 K: Debug + Codec + Clone,
577 V: Debug + Codec + Clone,
578 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
579 D: Debug + Monoid + Ord + Codec64 + Send + Sync + Clone,
580 C: TxnsCodec,
581 {
582 /// Returns a new, empty test transaction that can involve the data shards
583 /// registered with this handle.
584 pub(crate) fn begin_test(&self) -> TestTxn<K, V, T, D> {
585 TestTxn::new()
586 }
587 }
588
589 /// A [`Txn`] wrapper that exposes extra functionality for tests.
590 #[derive(Debug)]
591 pub struct TestTxn<K, V, T, D> {
592 txn: Txn<K, V, T, D>,
593 /// A copy of every write to use in tests.
594 writes: BTreeMap<ShardId, Vec<(K, V, D)>>,
595 }
596
597 impl<K, V, T, D> TestTxn<K, V, T, D>
598 where
599 K: Debug + Codec + Clone,
600 V: Debug + Codec + Clone,
601 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
602 D: Debug + Monoid + Ord + Codec64 + Send + Sync + Clone,
603 {
604 pub(crate) fn new() -> Self {
605 Self {
606 txn: Txn::new(),
607 writes: BTreeMap::default(),
608 }
609 }
610
611 pub(crate) async fn write(&mut self, data_id: &ShardId, key: K, val: V, diff: D) {
612 self.writes
613 .entry(*data_id)
614 .or_default()
615 .push((key.clone(), val.clone(), diff.clone()));
616 self.txn.write(data_id, key, val, diff).await
617 }
618
619 pub(crate) async fn commit_at<C>(
620 &mut self,
621 handle: &mut TxnsHandle<K, V, T, D, C>,
622 commit_ts: T,
623 ) -> Result<TxnApply<T>, T>
624 where
625 C: TxnsCodec,
626 {
627 self.txn.commit_at(handle, commit_ts).await
628 }
629
630 pub(crate) fn merge(&mut self, other: Self) {
631 for (data_id, writes) in other.writes {
632 self.writes.entry(data_id).or_default().extend(writes);
633 }
634 self.txn.merge(other.txn)
635 }
636
637 pub(crate) fn tidy(&mut self, tidy: Tidy) {
638 self.txn.tidy(tidy)
639 }
640
641 #[allow(dead_code)]
642 fn take_tidy(&mut self) -> Tidy {
643 self.txn.take_tidy()
644 }
645 }
646
647 /// A test helper for collecting committed writes and later comparing them
648 /// to reads for correctness.
649 #[derive(Debug, Clone)]
650 pub struct CommitLog {
651 client: PersistClient,
652 txns_id: ShardId,
653 writes: Arc<Mutex<Vec<(ShardId, String, u64, i64)>>>,
654 tx: Sender<(ShardId, String, u64, i64)>,
655 rx: Receiver<(ShardId, String, u64, i64)>,
656 }
657
658 impl CommitLog {
659 pub fn new(client: PersistClient, txns_id: ShardId) -> Self {
660 let (tx, rx) = crossbeam_channel::unbounded();
661 CommitLog {
662 client,
663 txns_id,
664 writes: Arc::new(Mutex::new(Vec::new())),
665 tx,
666 rx,
667 }
668 }
669
670 pub fn record(&self, update: (ShardId, String, u64, i64)) {
671 let () = self.tx.send(update).unwrap();
672 }
673
674 pub fn record_txn(&self, commit_ts: u64, txn: &TestTxn<String, (), u64, i64>) {
675 for (data_id, writes) in txn.writes.iter() {
676 for (k, (), d) in writes.iter() {
677 self.record((*data_id, k.clone(), commit_ts, *d));
678 }
679 }
680 }
681
682 #[track_caller]
683 pub fn assert_eq(
684 &self,
685 data_id: ShardId,
686 as_of: u64,
687 until: u64,
688 actual: impl IntoIterator<Item = (String, u64, i64)>,
689 ) {
690 // First read everything off the channel.
691 let mut expected = {
692 let mut writes = self.writes.lock().unwrap();
693 loop {
694 match self.rx.try_recv() {
695 Ok(x) => writes.push(x),
696 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
697 }
698 }
699 writes
700 .iter()
701 .flat_map(|(id, key, ts, diff)| {
702 if id != &data_id {
703 return None;
704 }
705 let mut ts = *ts;
706 if ts < as_of {
707 ts = as_of;
708 }
709 if until <= ts {
710 None
711 } else {
712 Some((key.clone(), ts, *diff))
713 }
714 })
715 .collect()
716 };
717 consolidate_updates(&mut expected);
718 let mut actual = actual.into_iter().filter(|(_, t, _)| t < &until).collect();
719 consolidate_updates(&mut actual);
720 // NB: Extra spaces after actual are so it lines up with expected.
721 tracing::debug!(
722 "{:.9} as_of={} until={} actual ={:?}",
723 data_id,
724 as_of,
725 until,
726 actual
727 );
728 tracing::debug!(
729 "{:.9} as_of={} until={} expected={:?}",
730 data_id,
731 as_of,
732 until,
733 expected
734 );
735 assert_eq!(actual, expected)
736 }
737
738 #[allow(ungated_async_fn_track_caller)]
739 #[track_caller]
740 pub async fn assert_snapshot(&self, data_id: ShardId, as_of: u64) {
741 let mut cache: TxnsCache<u64, TxnsCodecDefault> =
742 TxnsCache::open(&self.client, self.txns_id, Some(data_id)).await;
743 let _ = cache.update_gt(&as_of).await;
744 let snapshot = cache.data_snapshot(data_id, as_of);
745 let mut data_read: ReadHandle<String, (), _, _> = self
746 .client
747 .open_leased_reader(
748 data_id,
749 Arc::new(StringSchema),
750 Arc::new(UnitSchema),
751 Diagnostics::from_purpose("assert snapshot"),
752 true,
753 )
754 .await
755 .expect("reader creation shouldn't panic");
756 let snapshot = snapshot
757 .snapshot_and_fetch(&mut data_read)
758 .await
759 .expect("snapshot shouldn't panic");
760 data_read.expire().await;
761 let snapshot: Vec<_> = snapshot
762 .into_iter()
763 .map(|((k, ()), t, d)| (k, t, d))
764 .collect();
765
766 // Check that a subscribe would produce the same result.
767 let subscribe = self.subscribe(data_id, as_of, as_of + 1).await;
768 assert_eq!(
769 snapshot.iter().collect::<BTreeSet<_>>(),
770 subscribe.output().into_iter().collect::<BTreeSet<_>>()
771 );
772
773 // Check that the result is correct.
774 self.assert_eq(data_id, as_of, as_of + 1, snapshot);
775 }
776
777 #[allow(ungated_async_fn_track_caller)]
778 #[track_caller]
779 pub async fn assert_subscribe(&self, data_id: ShardId, as_of: u64, until: u64) {
780 let data_subscribe = self.subscribe(data_id, as_of, until).await;
781 self.assert_eq(data_id, as_of, until, data_subscribe.output().clone());
782 }
783
784 #[allow(ungated_async_fn_track_caller)]
785 #[track_caller]
786 pub async fn subscribe(&self, data_id: ShardId, as_of: u64, until: u64) -> DataSubscribe {
787 let mut data_subscribe = DataSubscribe::new(
788 "test",
789 self.client.clone(),
790 self.txns_id,
791 data_id,
792 as_of,
793 Antichain::new(),
794 );
795 data_subscribe.step_past(until - 1).await;
796 data_subscribe
797 }
798 }
799
800 pub(crate) async fn writer(
801 client: &PersistClient,
802 data_id: ShardId,
803 ) -> WriteHandle<String, (), u64, i64> {
804 client
805 .open_writer(
806 data_id,
807 Arc::new(StringSchema),
808 Arc::new(UnitSchema),
809 Diagnostics::for_tests(),
810 )
811 .await
812 .expect("codecs should not change")
813 }
814
815 pub(crate) async fn reader(
816 client: &PersistClient,
817 data_id: ShardId,
818 ) -> ReadHandle<String, (), u64, i64> {
819 client
820 .open_leased_reader(
821 data_id,
822 Arc::new(StringSchema),
823 Arc::new(UnitSchema),
824 Diagnostics::for_tests(),
825 true,
826 )
827 .await
828 .expect("codecs should not change")
829 }
830
831 pub(crate) async fn write_directly(
832 ts: u64,
833 data_write: &mut WriteHandle<String, (), u64, i64>,
834 keys: &[&str],
835 log: &CommitLog,
836 ) {
837 let data_id = data_write.shard_id();
838 let keys = keys.iter().map(|x| (*x).to_owned()).collect::<Vec<_>>();
839 let updates = keys.iter().map(|k| ((k, &()), &ts, 1)).collect::<Vec<_>>();
840 let mut current = data_write.shared_upper().into_option().unwrap();
841 loop {
842 let res = crate::small_caa(
843 || format!("data {:.9} directly", data_id),
844 data_write,
845 &updates,
846 current,
847 ts + 1,
848 )
849 .await;
850 match res {
851 Ok(()) => {
852 for ((k, ()), t, d) in updates {
853 log.record((data_id, k.to_owned(), *t, d));
854 }
855 return;
856 }
857 Err(new_current) => current = new_current,
858 }
859 }
860 }
861
862 #[mz_ore::test(tokio::test)]
863 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
864 async fn commit_log() {
865 let (d0, d1) = (ShardId::new(), ShardId::new());
866 let log0 = CommitLog::new(PersistClient::new_for_tests().await, ShardId::new());
867
868 // Send before cloning into another handle.
869 log0.record((d0, "0".into(), 0, 1));
870
871 // Send after cloning into another handle. Also push duplicate (which
872 // gets consolidated).
873 let log1 = log0.clone();
874 log0.record((d0, "2".into(), 2, 1));
875 log1.record((d0, "2".into(), 2, 1));
876
877 // Send retraction.
878 log0.record((d0, "3".into(), 3, 1));
879 log1.record((d0, "3".into(), 4, -1));
880
881 // Send out of order.
882 log0.record((d0, "1".into(), 1, 1));
883
884 // Send to a different shard.
885 log1.record((d1, "5".into(), 5, 1));
886
887 // Assert_eq with no advancement or truncation.
888 log0.assert_eq(
889 d0,
890 0,
891 6,
892 vec![
893 ("0".into(), 0, 1),
894 ("1".into(), 1, 1),
895 ("2".into(), 2, 2),
896 ("3".into(), 3, 1),
897 ("3".into(), 4, -1),
898 ],
899 );
900 log0.assert_eq(d1, 0, 6, vec![("5".into(), 5, 1)]);
901
902 // Assert_eq with advancement.
903 log0.assert_eq(
904 d0,
905 4,
906 6,
907 vec![("0".into(), 4, 1), ("1".into(), 4, 1), ("2".into(), 4, 2)],
908 );
909
910 // Assert_eq with truncation.
911 log0.assert_eq(
912 d0,
913 0,
914 3,
915 vec![("0".into(), 0, 1), ("1".into(), 1, 1), ("2".into(), 2, 2)],
916 );
917 }
918
919 #[mz_ore::test(tokio::test)]
920 #[cfg_attr(miri, ignore)] // too slow
921 async fn unique_batch_serialization() {
922 let client = PersistClient::new_for_tests().await;
923 let mut write = writer(&client, ShardId::new()).await;
924 let data = [(("foo".to_owned(), ()), 0, 1)];
925 let batch = write
926 .batch(&data, Antichain::from_elem(0), Antichain::from_elem(1))
927 .await
928 .unwrap();
929
930 // Pretend we somehow got two batches that happen to have the same
931 // serialization.
932 let b0_raw = batch.into_transmittable_batch();
933 let b1_raw = b0_raw.clone();
934 assert_eq!(b0_raw.encode_to_vec(), b1_raw.encode_to_vec());
935
936 // They don't if we wrap them in ProtoIdBatch.
937 let b0 = ProtoIdBatch::new(b0_raw.clone());
938 let b1 = ProtoIdBatch::new(b1_raw);
939 assert!(b0.encode_to_vec() != b1.encode_to_vec());
940
941 // The transmittable batch roundtrips.
942 let roundtrip = ProtoIdBatch::parse(&b0.encode_to_vec());
943 assert_eq!(roundtrip, b0_raw);
944
945 // We've started running things in all of staging, so we've got to be
946 // able to read the previous serialization (ProtoBatch directly) back.
947 let roundtrip = ProtoIdBatch::parse(&b0_raw.encode_to_vec());
948 assert_eq!(roundtrip, b0_raw);
949 }
950}