mz_txn_wal/operator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Timely operators for the crate
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::future::Future;
15use std::rc::Rc;
16use std::sync::mpsc::TryRecvError;
17use std::sync::{Arc, mpsc};
18use std::time::Duration;
19
20use differential_dataflow::Hashable;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use mz_dyncfg::{Config, ConfigSet};
24use mz_ore::cast::CastFrom;
25use mz_persist_client::cfg::RetryParameters;
26use mz_persist_client::operators::shard_source::{
27 ErrorHandler, FilterResult, SnapshotMode, shard_source,
28};
29use mz_persist_client::{Diagnostics, PersistClient, ShardId};
30use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
31use mz_persist_types::txn::TxnsCodec;
32use mz_persist_types::{Codec, Codec64, StepForward};
33use mz_timely_util::activator::ArcActivator;
34use mz_timely_util::builder_async::{PressOnDropButton, button};
35use timely::dataflow::channels::pact::Pipeline;
36#[cfg(test)]
37use timely::dataflow::operators::Input;
38use timely::dataflow::operators::capture::Event;
39use timely::dataflow::operators::generic::OutputBuilder;
40use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
41use timely::dataflow::operators::vec::{Broadcast, Map};
42use timely::dataflow::operators::{Capture, Leave, Probe};
43use timely::dataflow::{ProbeHandle, Scope, StreamVec};
44use timely::order::TotalOrder;
45use timely::progress::{Antichain, Timestamp};
46use timely::worker::Worker;
47use timely::{PartialOrder, WorkerConfig};
48use tracing::debug;
49
50use crate::TxnsCodecDefault;
51use crate::txn_cache::TxnsCache;
52use crate::txn_read::{DataRemapEntry, TxnsRead};
53
54/// An operator for translating physical data shard frontiers into logical ones.
55///
56/// A data shard in the txns set logically advances its upper each time a txn is
57/// committed, but the upper is not physically advanced unless that data shard
58/// was involved in the txn. This means that a shard_source (or any read)
59/// pointed at a data shard would appear to stall at the time of the most recent
60/// write. We fix this for shard_source by flowing its output through a new
61/// `txns_progress` dataflow operator, which ensures that the
62/// frontier/capability is advanced as the txns shard progresses, as long as the
63/// shard_source is up to date with the latest committed write to that data
64/// shard.
65///
66/// Example:
67///
68/// - A data shard has most recently been written to at 3.
69/// - The txns shard's upper is at 6.
70/// - We render a dataflow containing a shard_source with an as_of of 5.
71/// - A txn NOT involving the data shard is committed at 7.
72/// - A txn involving the data shard is committed at 9.
73///
74/// How it works:
75///
76/// - The shard_source operator is rendered. Its single output is hooked up as a
77/// _disconnected_ input to txns_progress. The txns_progress single output is
78/// a stream of the same type, which is used by downstream operators. This
79/// txns_progress operator is targeted at one data_shard; rendering a
80/// shard_source for a second data shard requires a second txns_progress
81/// operator.
82/// - The shard_source operator emits data through 3 and advances the frontier.
83/// - The txns_progress operator passes through these writes and frontier
84/// advancements unchanged. (Recall that it's always correct to read a data
85/// shard "normally", it just might stall.) Because the txns_progress operator
86/// knows there are no writes in `[3,5]`, it then downgrades its own
87/// capability past 5 (to 6). Because the input is disconnected, this means
88/// the overall frontier of the output is downgraded to 6.
89/// - The txns_progress operator learns about the write at 7 (the upper is now
90/// 8). Because it knows that the data shard was not involved in this, it's
91/// free to downgrade its capability to 8.
92/// - The txns_progress operator learns about the write at 9 (the upper is now
93/// 10). It knows that the data shard _WAS_ involved in this, so it forwards
94/// on data from its input until the input has progressed to 10, at which
95/// point it can itself downgrade to 10.
96pub fn txns_progress<'scope, K, V, T, D, P, C, F>(
97 passthrough: StreamVec<'scope, T, P>,
98 name: &str,
99 ctx: &TxnsContext,
100 client_fn: impl Fn() -> F,
101 txns_id: ShardId,
102 data_id: ShardId,
103 as_of: T,
104 until: Antichain<T>,
105 data_key_schema: Arc<K::Schema>,
106 data_val_schema: Arc<V::Schema>,
107) -> (StreamVec<'scope, T, P>, Vec<PressOnDropButton>)
108where
109 K: Debug + Codec + Send + Sync,
110 V: Debug + Codec + Send + Sync,
111 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
112 D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync,
113 P: Debug + Clone + 'static,
114 C: TxnsCodec + 'static,
115 F: Future<Output = PersistClient> + Send + 'static,
116{
117 let unique_id = (name, passthrough.scope().addr()).hashed();
118 let (remap, source_button) = txns_progress_source_global::<K, V, T, D, P, C>(
119 passthrough.scope(),
120 name,
121 ctx.clone(),
122 client_fn(),
123 txns_id,
124 data_id,
125 as_of,
126 data_key_schema,
127 data_val_schema,
128 unique_id,
129 );
130 // Each of the `txns_frontiers` workers wants the full copy of the remap
131 // information.
132 let remap = remap.broadcast();
133 let (passthrough, frontiers_button) = txns_progress_frontiers::<K, V, T, D, P, C>(
134 remap,
135 passthrough,
136 name,
137 data_id,
138 until,
139 unique_id,
140 );
141 (passthrough, vec![source_button, frontiers_button])
142}
143
144/// Event sent from the subscribe Tokio task to the sync `txns_progress_source`
145/// operator. The task owns the persist resources and the `data_subscribe`
146/// receiver. The operator owns the output capability and drives the frontier.
147enum SourceEvent<T> {
148 /// A `DataRemapEntry` read from the data shard subscription.
149 Remap(DataRemapEntry<T>),
150 /// The subscription closed cleanly. The operator drops its capability and
151 /// treats a later channel disconnect as expected rather than a task panic.
152 Finished,
153}
154
155/// TODO: I'd much prefer the communication protocol between the two operators
156/// to be exactly remap as defined in the [reclocking design doc]. However, we
157/// can't quite recover exactly the information necessary to construct that at
158/// the moment. Seems worth doing, but in the meantime, intentionally make this
159/// look fairly different (`Stream` of `DataRemapEntry` instead of
160/// `Collection<FromTime>`) to hopefully minimize confusion. As a performance
161/// optimization, we only re-emit this when the _physical_ upper has changed,
162/// which means that the frontier of the `Stream<DataRemapEntry<T>>` indicates
163/// updates to the logical_upper of the most recent `DataRemapEntry` (i.e. the
164/// one with the largest physical_upper).
165///
166/// [reclocking design doc]:
167/// https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210714_reclocking.md
168fn txns_progress_source_global<'scope, K, V, T, D, P, C>(
169 scope: Scope<'scope, T>,
170 name: &str,
171 ctx: TxnsContext,
172 client: impl Future<Output = PersistClient> + Send + 'static,
173 txns_id: ShardId,
174 data_id: ShardId,
175 as_of: T,
176 data_key_schema: Arc<K::Schema>,
177 data_val_schema: Arc<V::Schema>,
178 unique_id: u64,
179) -> (StreamVec<'scope, T, DataRemapEntry<T>>, PressOnDropButton)
180where
181 K: Debug + Codec + Send + Sync,
182 V: Debug + Codec + Send + Sync,
183 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
184 D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync,
185 P: Debug + Clone + 'static,
186 C: TxnsCodec + 'static,
187{
188 let worker_idx = scope.index();
189 let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
190 let name = format!("txns_progress_source({})", name);
191 let mut builder = OperatorBuilderRc::new(name.clone(), scope.clone());
192 let info = builder.operator_info();
193 let name = format!("{} [{}] {:.9}", name, unique_id, data_id.to_string());
194 let (remap_output, remap_stream) = builder.new_output::<Vec<DataRemapEntry<T>>>();
195 let mut remap_output = OutputBuilder::from(remap_output);
196
197 let (mut shutdown_handle, shutdown_button) = button(scope.clone(), Rc::clone(&info.address));
198
199 builder.build_reschedule(move |capabilities| {
200 // The output capability's time tracks the `logical_upper` we've advanced
201 // to. `None` indicates that we've dropped the capability to shut down.
202 let [cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
203 let mut capability = Some(cap);
204
205 // The most recently observed physical upper. We emit a `DataRemapEntry`
206 // only when the physical upper changes.
207 let mut physical_upper = T::minimum();
208
209 // Per-worker state. Only the chosen worker subscribes to the data shard
210 // (via a Tokio task that owns the blocking persist I/O) and produces
211 // output. Non-chosen workers drop their capability immediately and only
212 // participate in the shutdown handshake below. `Some` holds the receiver
213 // of `SourceEvent`s, the activation ack, and the task handle, kept alive
214 // so the task is aborted when the operator is dropped.
215 let mut chosen_state = if worker_idx == chosen_worker {
216 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<SourceEvent<T>>();
217 let (activator, activation_ack) = ArcActivator::new(scope, &info);
218
219 let task_name = name.clone();
220 let task = mz_ore::task::spawn(|| name.clone(), async move {
221 let client = client.await;
222 let txns_read = ctx.get_or_init::<T, C>(&client, txns_id).await;
223
224 let _ = txns_read.update_gt(as_of.clone()).await;
225 let data_write = client
226 .open_writer::<K, V, T, D>(
227 data_id,
228 Arc::clone(&data_key_schema),
229 Arc::clone(&data_val_schema),
230 Diagnostics::from_purpose("data read physical upper"),
231 )
232 .await
233 .expect("schema shouldn't change");
234 let mut rx = txns_read
235 .data_subscribe(data_id, as_of.clone(), data_write)
236 .await;
237 debug!("{} starting as_of={:?}", task_name, as_of);
238
239 while let Some(remap) = rx.recv().await {
240 if event_tx.send(SourceEvent::Remap(remap)).is_err() {
241 // The operator is gone. Stop.
242 return;
243 }
244 activator.activate();
245 }
246 // The subscription closed. Signal the operator so it drops its
247 // output capability.
248 let _ = event_tx.send(SourceEvent::Finished);
249 activator.activate();
250 })
251 .abort_on_drop();
252
253 Some((event_rx, activation_ack, task))
254 } else {
255 // Non-chosen workers contribute nothing to the output frontier.
256 capability = None;
257 None
258 };
259
260 // Whether we've observed `SourceEvent::Finished`, so a subsequent
261 // channel disconnect is expected rather than a task panic.
262 let mut finished = false;
263
264 move |_frontiers| {
265 // On a local shutdown press, hold the capability and stay scheduled
266 // until all workers have pressed, then release. Dropping the
267 // capability on the local press alone would let the downstream
268 // frontier advance during cross-worker teardown skew, past times
269 // whose input this worker has already discarded.
270 if shutdown_handle.local_pressed() {
271 return if shutdown_handle.all_pressed() {
272 capability = None;
273 // Drop the receiver, ack, and task handle, aborting the task.
274 chosen_state = None;
275 false
276 } else {
277 true
278 };
279 }
280
281 let Some((event_rx, activation_ack, _task)) = chosen_state.as_mut() else {
282 // Non-chosen worker: nothing to do. Stay alive (the button
283 // channel reschedules us) for the shutdown handshake above.
284 return false;
285 };
286 // Acknowledge the activation so the Tokio task can activate us again.
287 activation_ack.ack();
288
289 let mut output = remap_output.activate();
290 loop {
291 match event_rx.try_recv() {
292 Ok(SourceEvent::Remap(remap)) => {
293 let Some(cap) = capability.as_mut() else {
294 // Already shut down, so drop any straggling events.
295 continue;
296 };
297 assert!(physical_upper <= remap.physical_upper);
298 assert!(physical_upper < remap.logical_upper);
299
300 let logical_upper = remap.logical_upper.clone();
301 // Emit at the pre-downgrade capability, then downgrade.
302 if remap.physical_upper != physical_upper {
303 physical_upper = remap.physical_upper.clone();
304 debug!("{} emitting {:?}", name, remap);
305 output.session(&*cap).give(remap);
306 } else {
307 debug!("{} not emitting {:?}", name, remap);
308 }
309 cap.downgrade(&logical_upper);
310 }
311 Ok(SourceEvent::Finished) => {
312 // Subscription closed cleanly. Drop the capability.
313 finished = true;
314 capability = None;
315 }
316 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
317 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
318 // A task panic aborts the process via the enhanced panic
319 // handler, so this assert is only a safety net for
320 // environments that do not abort. On the panic path the
321 // task never calls `activate()`, so it fires only if the
322 // operator is rescheduled for another reason.
323 assert!(finished, "txns_progress_source task unexpectedly gone");
324 break;
325 }
326 }
327 }
328
329 false
330 }
331 });
332
333 (remap_stream, shutdown_button.press_on_drop())
334}
335
336/// The block ordering inside the schedule closure is load-bearing: pending
337/// passthrough input is emitted at the pre-activation capability BEFORE any
338/// capability downgrade, which keeps the differential invariant `send_time <=
339/// record_time` and avoids dropping in-flight rows when the passthrough
340/// frontier crosses `until` in the same activation (SQL-299). Do not reorder.
341fn txns_progress_frontiers<'scope, K, V, T, D, P, C>(
342 remap: StreamVec<'scope, T, DataRemapEntry<T>>,
343 passthrough: StreamVec<'scope, T, P>,
344 name: &str,
345 data_id: ShardId,
346 until: Antichain<T>,
347 unique_id: u64,
348) -> (StreamVec<'scope, T, P>, PressOnDropButton)
349where
350 K: Debug + Codec,
351 V: Debug + Codec,
352 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
353 D: Clone + 'static + Monoid + Codec64 + Send + Sync,
354 P: Debug + Clone + 'static,
355 C: TxnsCodec,
356{
357 let scope = passthrough.scope();
358 let name = format!("txns_progress_frontiers({})", name);
359 let mut builder = OperatorBuilderRc::new(name.clone(), scope.clone());
360 let info = builder.operator_info();
361 let name = format!(
362 "{} [{}] {}/{} {:.9}",
363 name,
364 unique_id,
365 scope.index(),
366 scope.peers(),
367 data_id.to_string(),
368 );
369 let (passthrough_output, passthrough_stream) = builder.new_output::<Vec<P>>();
370 let mut passthrough_output = OutputBuilder::from(passthrough_output);
371 // Both inputs are disconnected from the output: capability advancement is
372 // driven manually based on the remap stream and the passthrough frontier.
373 // NB: the output is created BEFORE the inputs on purpose. `new_output`
374 // connects to whatever inputs already exist (here, none); the `[]`
375 // connection arg below records the input-to-output summary but does not by
376 // itself disconnect the output. Creating an input before the output would
377 // silently connect them and break the manual capability management.
378 let mut remap_input = builder.new_input_connection(remap, Pipeline, []);
379 let mut passthrough_input = builder.new_input_connection(passthrough, Pipeline, []);
380
381 let (mut shutdown_handle, shutdown_button) = button(scope, info.address);
382
383 builder.build_reschedule(move |capabilities| {
384 // The output capability's time tracks how far we've progressed in
385 // copying along the passthrough input. `None` indicates that we've
386 // dropped the capability to shut down.
387 let [cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
388 let mut capability = Some(cap);
389 // The most recently observed remap state. Retained even after the remap
390 // input closes so we can still advance the output capability to the
391 // last known `logical_upper` while the passthrough input is draining.
392 // This deliberately diverges from the async impl, which dropped the
393 // entry on close and stalled (PER-4).
394 let mut remap = DataRemapEntry {
395 physical_upper: T::minimum(),
396 logical_upper: T::minimum(),
397 };
398 // Whether the remap input has reached the empty antichain.
399 let mut remap_closed = false;
400
401 move |frontiers| {
402 // If our worker pressed the button we stop producing data and
403 // frontier updates downstream, but mirror `builder_async`: hold the
404 // capability and stop draining the inputs until ALL workers have
405 // pressed. Dropping the capability on the local press alone would
406 // let the downstream frontier advance during cross-worker teardown
407 // skew, past times whose data this worker has discarded, while
408 // other workers' operator instances still feed downstream.
409 if shutdown_handle.local_pressed() {
410 return if shutdown_handle.all_pressed() {
411 // All workers pressed: drop the capability and drain the
412 // inputs so teardown does not stall the dataflow.
413 capability = None;
414 remap_input.for_each(|_input_cap, _data| {});
415 passthrough_input.for_each(|_input_cap, _data| {});
416 false
417 } else {
418 // Wedge: keep the capability, leave the inputs undrained
419 // (their pending messages hold the frontier), and ask to be
420 // rescheduled until the remaining workers press.
421 true
422 };
423 }
424
425 // Fold new DataRemapEntries, keeping the one with the largest
426 // logical_upper. The ordering of incoming entries is not assumed.
427 remap_input.for_each(|_input_cap, data| {
428 for x in data.drain(..) {
429 debug!("{} got remap {:?}", name, x);
430 if remap.logical_upper < x.logical_upper {
431 assert!(
432 remap.physical_upper <= x.physical_upper,
433 "previous remap physical upper {:?} is ahead of new remap physical upper {:?}",
434 remap.physical_upper,
435 x.physical_upper,
436 );
437 // TODO: If the physical upper has advanced, that's a very
438 // strong hint that the data shard is about to be written to.
439 // Because the data shard's upper advances sparsely (on write,
440 // but not on passage of time) which invalidates the "every 1s"
441 // assumption of the default tuning, we've had to de-tune the
442 // listen sleeps on the paired persist_source. Maybe we use "one
443 // state" to wake it up in case pubsub doesn't and remove the
444 // listen polling entirely? (NB: This would have to happen in
445 // each worker so that it's guaranteed to happen in each
446 // process.)
447 remap = x;
448 }
449 }
450 });
451
452 // Apply the remap input's frontier as a `logical_upper` bump. We do
453 // not discard `remap` on the empty antichain: the last observed
454 // entry remains valid and lets the capability still advance past
455 // `physical_upper` while the passthrough input drains.
456 if let Some(logical_upper) = frontiers[0].frontier().as_option() {
457 if remap.logical_upper < *logical_upper {
458 remap.logical_upper = logical_upper.clone();
459 }
460 } else {
461 remap_closed = true;
462 }
463
464 debug!("{} remap {:?} remap_closed={}", name, remap, remap_closed);
465
466 // Pass through any data the passthrough input has pending, at the
467 // current (pre-downgrade) capability, BEFORE any downgrade below.
468 // `cap.time()` here equals the pre-activation frontier, which is
469 // `<=` every pending record's time, so the differential invariant
470 // `send_time <= record_time` holds. Doing this before the
471 // `until`-driven drop is the SQL-299 fix. NB: nothing to do for
472 // `until` because the shard_source (before) and mfp_and_decode
473 // (after) filter.
474 if let Some(cap) = capability.as_ref() {
475 let mut output = passthrough_output.activate();
476 passthrough_input.for_each(|_input_cap, data| {
477 debug!("{} emitting data {:?}", name, data);
478 output.session(cap).give_container(data);
479 });
480 } else {
481 // Still drain to avoid stalling the dataflow.
482 passthrough_input.for_each(|_input_cap, _data| {});
483 }
484
485 // Only consult the passthrough frontier when not waiting on remap to
486 // push `physical_upper` past the capability. While `physical_upper
487 // <= cap.time()` and the remap input is open, the next expected
488 // event is a remap update that jumps `cap` to `logical_upper`, not a
489 // passthrough advance. Consulting the passthrough frontier then can
490 // drop the capability prematurely (e.g. `SELECT AS OF MAX`, where no
491 // remap update ever arrives and the passthrough side reports the
492 // empty antichain). Once remap is closed, the passthrough frontier
493 // is the only remaining driver.
494 let waiting_for_remap = match capability.as_ref() {
495 Some(cap) => !remap_closed && remap.physical_upper.less_equal(cap.time()),
496 None => false,
497 };
498 if !waiting_for_remap {
499 // Apply the passthrough input's frontier.
500 //
501 // If `until.less_equal(pass_frontier)`, it means that all
502 // subsequent batches will contain only times greater or equal
503 // to `until`, which means they can be dropped in their entirety.
504 //
505 // Ideally this check would live in `txns_progress_source`, but
506 // that turns out to be much more invasive (requires replacing
507 // lots of `T`s with `Antichain<T>`s). Given that we've been
508 // thinking about reworking the operators, do the easy but more
509 // wasteful thing for now.
510 let pass_frontier = frontiers[1].frontier();
511 if PartialOrder::less_equal(&until.borrow(), &pass_frontier) {
512 debug!(
513 "{} progress {:?} has passed until {:?}",
514 name,
515 pass_frontier,
516 until.elements(),
517 );
518 capability = None;
519 } else if let Some(new_progress) = pass_frontier.as_option() {
520 // Recall that any reads of the data shard are always
521 // correct, so given that we've passed through any data from
522 // the input, that means we're free to pass through frontier
523 // updates too.
524 if let Some(cap) = capability.as_mut() {
525 if cap.time() < new_progress {
526 debug!("{} downgrading cap to {:?}", name, new_progress);
527 cap.downgrade(new_progress);
528 }
529 }
530 } else {
531 // Reached the empty frontier; shut down.
532 capability = None;
533 }
534 }
535
536 // If we've copied passthrough data to at least `physical_upper`, we
537 // can artificially advance the output to `logical_upper`. By the
538 // emptiness of `[physical_upper, logical_upper)`, no record still in
539 // flight lies below `logical_upper`, so this never strands data.
540 if let Some(cap) = capability.as_mut() {
541 assert!(remap.physical_upper <= remap.logical_upper);
542 let phys_reached = remap.physical_upper.less_equal(cap.time());
543 let logical_ahead = cap.time() < &remap.logical_upper;
544 if phys_reached && logical_ahead {
545 cap.downgrade(&remap.logical_upper);
546 }
547 }
548
549 false
550 }
551 });
552
553 (passthrough_stream, shutdown_button.press_on_drop())
554}
555
556/// The process global [`TxnsRead`] that any operator can communicate with.
557#[derive(Default, Debug, Clone)]
558pub struct TxnsContext {
559 read: Arc<tokio::sync::OnceCell<Box<dyn Any + Send + Sync>>>,
560}
561
562impl TxnsContext {
563 async fn get_or_init<T, C>(&self, client: &PersistClient, txns_id: ShardId) -> TxnsRead<T>
564 where
565 T: Timestamp + Lattice + Codec64 + TotalOrder + StepForward + Sync,
566 C: TxnsCodec + 'static,
567 {
568 let read = self
569 .read
570 .get_or_init(|| {
571 let client = client.clone();
572 async move {
573 let read: Box<dyn Any + Send + Sync> =
574 Box::new(TxnsRead::<T>::start::<C>(client, txns_id).await);
575 read
576 }
577 })
578 .await
579 .downcast_ref::<TxnsRead<T>>()
580 .expect("timestamp types should match");
581 // We initially only have one txns shard in the system.
582 assert_eq!(&txns_id, read.txns_id());
583 read.clone()
584 }
585}
586
587// Existing configs use the prefix "persist_txns_" for historical reasons. New
588// configs should use the prefix "txn_wal_".
589
590pub(crate) const DATA_SHARD_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
591 "persist_txns_data_shard_retryer_initial_backoff",
592 Duration::from_millis(1024),
593 "The initial backoff when polling for new batches from a txns data shard persist_source.",
594);
595
596pub(crate) const DATA_SHARD_RETRYER_MULTIPLIER: Config<u32> = Config::new(
597 "persist_txns_data_shard_retryer_multiplier",
598 2,
599 "The backoff multiplier when polling for new batches from a txns data shard persist_source.",
600);
601
602pub(crate) const DATA_SHARD_RETRYER_CLAMP: Config<Duration> = Config::new(
603 "persist_txns_data_shard_retryer_clamp",
604 Duration::from_secs(16),
605 "The backoff clamp duration when polling for new batches from a txns data shard persist_source.",
606);
607
608/// Retry configuration for txn-wal data shard override of
609/// `next_listen_batch`.
610pub fn txns_data_shard_retry_params(cfg: &ConfigSet) -> RetryParameters {
611 RetryParameters {
612 fixed_sleep: Duration::ZERO,
613 initial_backoff: DATA_SHARD_RETRYER_INITIAL_BACKOFF.get(cfg),
614 multiplier: DATA_SHARD_RETRYER_MULTIPLIER.get(cfg),
615 clamp: DATA_SHARD_RETRYER_CLAMP.get(cfg),
616 }
617}
618
619/// A helper for subscribing to a data shard using the timely operators.
620///
621/// This could instead be a wrapper around a [Subscribe], but it's only used in
622/// tests and maelstrom, so do it by wrapping the timely operators to get
623/// additional coverage. For the same reason, hardcode the K, V, T, D types.
624///
625/// [Subscribe]: mz_persist_client::read::Subscribe
626pub struct DataSubscribe {
627 pub(crate) as_of: u64,
628 pub(crate) worker: Worker,
629 data: ProbeHandle<u64>,
630 txns: ProbeHandle<u64>,
631 capture: mpsc::Receiver<Event<u64, Vec<(String, u64, i64)>>>,
632 output: Vec<(String, u64, i64)>,
633
634 _tokens: Vec<PressOnDropButton>,
635}
636
637impl std::fmt::Debug for DataSubscribe {
638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639 let DataSubscribe {
640 as_of,
641 worker: _,
642 data,
643 txns,
644 capture: _,
645 output,
646 _tokens: _,
647 } = self;
648 f.debug_struct("DataSubscribe")
649 .field("as_of", as_of)
650 .field("data", data)
651 .field("txns", txns)
652 .field("output", output)
653 .finish_non_exhaustive()
654 }
655}
656
657impl DataSubscribe {
658 /// Creates a new [DataSubscribe].
659 pub fn new(
660 name: &str,
661 client: PersistClient,
662 txns_id: ShardId,
663 data_id: ShardId,
664 as_of: u64,
665 until: Antichain<u64>,
666 ) -> Self {
667 let mut worker = Worker::new(
668 WorkerConfig::default(),
669 timely::communication::Allocator::Thread(
670 timely::communication::allocator::Thread::default(),
671 ),
672 Some(std::time::Instant::now()),
673 );
674 let (data, txns, capture, tokens) = worker.dataflow::<u64, _, _>(|outer| {
675 let (data_stream, shard_source_token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
676 let client = client.clone();
677 let (data_stream, token) = shard_source::<String, (), u64, i64, _, _, _>(
678 outer,
679 scope,
680 name,
681 move || std::future::ready(client.clone()),
682 data_id,
683 Some(Antichain::from_elem(as_of)),
684 SnapshotMode::Include,
685 until.clone(),
686 false.then_some(|_, _, _| unreachable!()),
687 Arc::new(StringSchema),
688 Arc::new(UnitSchema),
689 FilterResult::keep_all,
690 false.then_some(|| unreachable!()),
691 async {},
692 ErrorHandler::Halt("data_subscribe"),
693 );
694 (data_stream.leave(outer), token)
695 });
696 let (data, txns) = (ProbeHandle::new(), ProbeHandle::new());
697 let data_stream = data_stream.flat_map(|part| {
698 let part = part.parse();
699 part.part.map(|((k, ()), t, d)| (k, t, d))
700 });
701 let data_stream = data_stream.probe_with(&data);
702 let (data_stream, mut txns_progress_token) =
703 txns_progress::<String, (), u64, i64, _, TxnsCodecDefault, _>(
704 data_stream,
705 name,
706 &TxnsContext::default(),
707 || std::future::ready(client.clone()),
708 txns_id,
709 data_id,
710 as_of,
711 until,
712 Arc::new(StringSchema),
713 Arc::new(UnitSchema),
714 );
715 let data_stream = data_stream.probe_with(&txns);
716 let mut tokens = shard_source_token;
717 tokens.append(&mut txns_progress_token);
718 (data, txns, data_stream.capture(), tokens)
719 });
720 Self {
721 as_of,
722 worker,
723 data,
724 txns,
725 capture,
726 output: Vec::new(),
727 _tokens: tokens,
728 }
729 }
730
731 /// Returns the exclusive progress of the dataflow.
732 pub fn progress(&self) -> u64 {
733 self.txns
734 .with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX))
735 }
736
737 /// Steps the dataflow, capturing output.
738 pub fn step(&mut self) {
739 self.worker.step();
740 self.capture_output()
741 }
742
743 pub(crate) fn capture_output(&mut self) {
744 loop {
745 let event = match self.capture.try_recv() {
746 Ok(x) => x,
747 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
748 };
749 match event {
750 Event::Progress(_) => {}
751 Event::Messages(_, mut msgs) => self.output.append(&mut msgs),
752 }
753 }
754 }
755
756 /// Steps the dataflow past the given time, capturing output.
757 #[cfg(test)]
758 pub async fn step_past(&mut self, ts: u64) {
759 while self.txns.less_equal(&ts) {
760 tracing::trace!(
761 "progress at {:?}",
762 self.txns.with_frontier(|x| x.to_owned()).elements()
763 );
764 self.step();
765 tokio::task::yield_now().await;
766 }
767 }
768
769 /// Returns captured output.
770 pub fn output(&self) -> &Vec<(String, u64, i64)> {
771 &self.output
772 }
773}
774
775/// A handle to a [DataSubscribe] running in a task.
776#[derive(Debug)]
777pub struct DataSubscribeTask {
778 /// Carries step requests. A `None` timestamp requests one step, a
779 /// `Some(ts)` requests stepping until we progress beyond `ts`.
780 tx: std::sync::mpsc::Sender<(
781 Option<u64>,
782 tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
783 )>,
784 task: mz_ore::task::JoinHandle<Vec<(String, u64, i64)>>,
785 output: Vec<(String, u64, i64)>,
786 progress: u64,
787}
788
789impl DataSubscribeTask {
790 /// Creates a new [DataSubscribeTask].
791 pub async fn new(
792 client: PersistClient,
793 txns_id: ShardId,
794 data_id: ShardId,
795 as_of: u64,
796 ) -> Self {
797 let cache = TxnsCache::open(&client, txns_id, Some(data_id)).await;
798 let (tx, rx) = std::sync::mpsc::channel();
799 let task = mz_ore::task::spawn_blocking(
800 || "data_subscribe task",
801 move || Self::task(client, cache, data_id, as_of, rx),
802 );
803 DataSubscribeTask {
804 tx,
805 task,
806 output: Vec::new(),
807 progress: 0,
808 }
809 }
810
811 #[cfg(test)]
812 async fn step(&mut self) {
813 self.send(None).await;
814 }
815
816 /// Steps the dataflow past the given time, capturing output.
817 pub async fn step_past(&mut self, ts: u64) -> u64 {
818 self.send(Some(ts)).await;
819 self.progress
820 }
821
822 /// Returns captured output.
823 pub fn output(&self) -> &Vec<(String, u64, i64)> {
824 &self.output
825 }
826
827 async fn send(&mut self, ts: Option<u64>) {
828 let (tx, rx) = tokio::sync::oneshot::channel();
829 self.tx.send((ts, tx)).expect("task should be running");
830 let (mut new_output, new_progress) = rx.await.expect("task should be running");
831 self.output.append(&mut new_output);
832 assert!(self.progress <= new_progress);
833 self.progress = new_progress;
834 }
835
836 /// Signals for the task to exit, and then waits for this to happen.
837 ///
838 /// _All_ output from the lifetime of the task (not just what was previously
839 /// captured) is returned.
840 pub async fn finish(self) -> Vec<(String, u64, i64)> {
841 // Closing the channel signals the task to exit.
842 drop(self.tx);
843 self.task.await
844 }
845
846 fn task(
847 client: PersistClient,
848 cache: TxnsCache<u64>,
849 data_id: ShardId,
850 as_of: u64,
851 rx: std::sync::mpsc::Receiver<(
852 Option<u64>,
853 tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
854 )>,
855 ) -> Vec<(String, u64, i64)> {
856 let mut subscribe = DataSubscribe::new(
857 "DataSubscribeTask",
858 client.clone(),
859 cache.txns_id(),
860 data_id,
861 as_of,
862 Antichain::new(),
863 );
864 let mut output = Vec::new();
865 loop {
866 let (ts, tx) = match rx.try_recv() {
867 Ok(x) => x,
868 Err(TryRecvError::Empty) => {
869 // No requests, continue stepping so nothing deadlocks.
870 subscribe.step();
871 continue;
872 }
873 Err(TryRecvError::Disconnected) => {
874 // All done! Return our output.
875 return output;
876 }
877 };
878 // Always step at least once.
879 subscribe.step();
880 // If we got a ts, make sure to step past it.
881 if let Some(ts) = ts {
882 while subscribe.progress() <= ts {
883 subscribe.step();
884 }
885 }
886 let new_output = std::mem::take(&mut subscribe.output);
887 output.extend(new_output.iter().cloned());
888 let _ = tx.send((new_output, subscribe.progress()));
889 }
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use itertools::{Either, Itertools};
896
897 use crate::tests::writer;
898 use crate::txns::TxnsHandle;
899
900 use super::*;
901
902 /// One scripted action applied to the operator's two inputs.
903 #[derive(Debug, Clone)]
904 enum Action {
905 /// Send a `DataRemapEntry` on the remap input.
906 Remap {
907 physical_upper: u64,
908 logical_upper: u64,
909 },
910 /// Advance the remap input frontier to `ts` (empty antichain if `None`).
911 RemapFrontier(Option<u64>),
912 /// Send passthrough data records (as `(payload, time)`), then leave them buffered.
913 Pass { records: Vec<(i64, u64)> },
914 /// Advance the passthrough input frontier to `ts` (empty antichain if `None`).
915 PassFrontier(Option<u64>),
916 /// Step the worker once.
917 Step,
918 }
919
920 /// Runs `schedule` against the operator built by `build`, returning the
921 /// captured output events and the final exclusive output frontier. Each
922 /// event is shaped as `(payload, time, count)`, where `count` is synthesized
923 /// as `1` so the output looks like a differential collection.
924 fn run_schedule(
925 build: impl for<'a> Fn(
926 StreamVec<'a, u64, DataRemapEntry<u64>>,
927 StreamVec<'a, u64, i64>,
928 Antichain<u64>,
929 ) -> (StreamVec<'a, u64, i64>, PressOnDropButton),
930 until: Antichain<u64>,
931 schedule: &[Action],
932 ) -> (Vec<(i64, u64, i64)>, u64) {
933 let mut worker = Worker::new(
934 WorkerConfig::default(),
935 timely::communication::Allocator::Thread(
936 timely::communication::allocator::Thread::default(),
937 ),
938 Some(std::time::Instant::now()),
939 );
940
941 // The button must outlive the run: dropping it presses the shutdown
942 // handle, which makes the operator drop its capability on the next
943 // activation. Hold it until after the drain loop completes.
944 let (remap_handle, pass_handle, probe, capture, _button) =
945 worker.dataflow::<u64, _, _>(|scope| {
946 let (remap_handle, remap_stream) = scope.new_input::<Vec<DataRemapEntry<u64>>>();
947 let (pass_handle, pass_stream) = scope.new_input::<Vec<i64>>();
948 let (out, button) = build(remap_stream, pass_stream, until.clone());
949 let probe = ProbeHandle::new();
950 let out = out.probe_with(&probe);
951 (remap_handle, pass_handle, probe, out.capture(), button)
952 });
953
954 // timely input handles can only `advance_to` forward in time. Track the
955 // last time used on each input so we can fail loudly with a useful
956 // message instead of panicking deep inside timely on a decreasing time.
957 let mut last_remap_ts = 0u64;
958 let mut last_pass_ts = 0u64;
959 // Held in `Option`s so a `*Frontier(None)` action can `take` and drop the
960 // handle, which closes the input to the empty antichain. Advancing to
961 // `u64::MAX` is NOT equivalent: it leaves the input's frontier at
962 // `Some(u64::MAX)`, which the operator (correctly) treats as a finite
963 // `logical_upper`/passthrough advance rather than a closed input.
964 let mut remap_handle = Some(remap_handle);
965 let mut pass_handle = Some(pass_handle);
966 for action in schedule {
967 match action.clone() {
968 // `Remap` is a `send` at the handle's current time, so it carries
969 // no explicit time and needs no monotonicity assert.
970 Action::Remap {
971 physical_upper,
972 logical_upper,
973 } => remap_handle
974 .as_mut()
975 .expect("remap input still open")
976 .send(DataRemapEntry {
977 physical_upper,
978 logical_upper,
979 }),
980 Action::RemapFrontier(Some(ts)) => {
981 assert!(
982 ts >= last_remap_ts,
983 "Action::RemapFrontier time {ts} < previous remap time {last_remap_ts}; per-input times must be non-decreasing"
984 );
985 last_remap_ts = ts;
986 remap_handle
987 .as_mut()
988 .expect("remap input still open")
989 .advance_to(ts);
990 }
991 // Drop the handle to close the input to the empty antichain.
992 Action::RemapFrontier(None) => {
993 last_remap_ts = u64::MAX;
994 drop(remap_handle.take());
995 }
996 Action::Pass { records } => {
997 let handle = pass_handle.as_mut().expect("passthrough input still open");
998 for (payload, time) in records {
999 assert!(
1000 time >= last_pass_ts,
1001 "Action::Pass time {time} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing"
1002 );
1003 last_pass_ts = time;
1004 // `advance_to` is what makes each record's time visible to
1005 // the operator; the subsequent `send` emits the payload at
1006 // that time. Both impls consume the identical schedule, so
1007 // the exact send mechanics need only be self-consistent.
1008 handle.advance_to(time);
1009 handle.send(payload);
1010 }
1011 }
1012 Action::PassFrontier(Some(ts)) => {
1013 assert!(
1014 ts >= last_pass_ts,
1015 "Action::PassFrontier time {ts} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing"
1016 );
1017 last_pass_ts = ts;
1018 pass_handle
1019 .as_mut()
1020 .expect("passthrough input still open")
1021 .advance_to(ts);
1022 }
1023 // Drop the handle to close the input to the empty antichain.
1024 Action::PassFrontier(None) => {
1025 last_pass_ts = u64::MAX;
1026 drop(pass_handle.take());
1027 }
1028 Action::Step => {
1029 worker.step();
1030 }
1031 }
1032 }
1033 // Drain: flush inputs and step until the output probe frontier stops
1034 // advancing. A hard cap PANICS so a buggy operator that never settles
1035 // fails loudly instead of silently returning partial results.
1036 if let Some(handle) = remap_handle.as_mut() {
1037 handle.flush();
1038 }
1039 if let Some(handle) = pass_handle.as_mut() {
1040 handle.flush();
1041 }
1042 let mut last = probe.with_frontier(|f| f.to_owned());
1043 let mut stable = 0;
1044 for step in 0.. {
1045 assert!(
1046 step < 4096,
1047 "run_schedule did not quiesce within 4096 steps"
1048 );
1049 worker.step();
1050 let now = probe.with_frontier(|f| f.to_owned());
1051 if now == last {
1052 stable += 1;
1053 // Require a few consecutive no-change steps so in-flight messages flush.
1054 if stable >= 8 {
1055 break;
1056 }
1057 } else {
1058 stable = 0;
1059 last = now;
1060 }
1061 }
1062
1063 let frontier = probe.with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX));
1064 let mut output = Vec::new();
1065 while let Ok(event) = capture.try_recv() {
1066 if let Event::Messages(time, msgs) = event {
1067 for payload in msgs {
1068 output.push((payload, time, 1));
1069 }
1070 }
1071 }
1072 (output, frontier)
1073 }
1074
1075 impl<K, V, T, D, C> TxnsHandle<K, V, T, D, C>
1076 where
1077 K: Debug + Codec,
1078 V: Debug + Codec,
1079 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
1080 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
1081 C: TxnsCodec,
1082 {
1083 async fn subscribe_task(
1084 &self,
1085 client: &PersistClient,
1086 data_id: ShardId,
1087 as_of: u64,
1088 ) -> DataSubscribeTask {
1089 DataSubscribeTask::new(client.clone(), self.txns_id(), data_id, as_of).await
1090 }
1091 }
1092
1093 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1094 #[cfg_attr(miri, ignore)] // too slow
1095 async fn data_subscribe() {
1096 async fn step(subs: &mut Vec<DataSubscribeTask>) {
1097 for sub in subs.iter_mut() {
1098 sub.step().await;
1099 }
1100 }
1101
1102 let client = PersistClient::new_for_tests().await;
1103 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1104 let log = txns.new_log();
1105 let d0 = ShardId::new();
1106
1107 // Start a subscription before the shard gets registered.
1108 let mut subs = Vec::new();
1109 subs.push(txns.subscribe_task(&client, d0, 5).await);
1110 step(&mut subs).await;
1111
1112 // Now register the shard. Also start a new subscription and step the
1113 // previous one (plus repeat this for every later step).
1114 txns.register(1, [writer(&client, d0).await]).await.unwrap();
1115 subs.push(txns.subscribe_task(&client, d0, 5).await);
1116 step(&mut subs).await;
1117
1118 // Now write something unrelated.
1119 let d1 = txns.expect_register(2).await;
1120 txns.expect_commit_at(3, d1, &["nope"], &log).await;
1121 subs.push(txns.subscribe_task(&client, d0, 5).await);
1122 step(&mut subs).await;
1123
1124 // Now write to our shard before.
1125 txns.expect_commit_at(4, d0, &["4"], &log).await;
1126 subs.push(txns.subscribe_task(&client, d0, 5).await);
1127 step(&mut subs).await;
1128
1129 // Now write to our shard at the as_of.
1130 txns.expect_commit_at(5, d0, &["5"], &log).await;
1131 subs.push(txns.subscribe_task(&client, d0, 5).await);
1132 step(&mut subs).await;
1133
1134 // Now write to our shard past the as_of.
1135 txns.expect_commit_at(6, d0, &["6"], &log).await;
1136 subs.push(txns.subscribe_task(&client, d0, 5).await);
1137 step(&mut subs).await;
1138
1139 // Now write something unrelated again.
1140 txns.expect_commit_at(7, d1, &["nope"], &log).await;
1141 subs.push(txns.subscribe_task(&client, d0, 5).await);
1142 step(&mut subs).await;
1143
1144 // Verify that the dataflows can progress to the expected point and that
1145 // we read the right thing no matter when the dataflow started.
1146 for mut sub in subs {
1147 let progress = sub.step_past(7).await;
1148 assert_eq!(progress, 8);
1149 log.assert_eq(d0, 5, 8, sub.finish().await);
1150 }
1151 }
1152
1153 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1154 #[cfg_attr(miri, ignore)] // too slow
1155 async fn subscribe_shard_finalize() {
1156 let client = PersistClient::new_for_tests().await;
1157 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1158 let log = txns.new_log();
1159 let d0 = txns.expect_register(1).await;
1160
1161 // Start the operator as_of the register ts.
1162 let mut sub = txns.read_cache().expect_subscribe(&client, d0, 1);
1163 sub.step_past(1).await;
1164
1165 // Write to it via txns.
1166 txns.expect_commit_at(2, d0, &["foo"], &log).await;
1167 sub.step_past(2).await;
1168
1169 // Unregister it.
1170 txns.forget(3, [d0]).await.unwrap();
1171 sub.step_past(3).await;
1172
1173 // TODO: Hard mode, see if we can get the rest of this test to work even
1174 // _without_ the txns shard advancing.
1175 txns.begin().commit_at(&mut txns, 7).await.unwrap();
1176
1177 // The operator should continue to emit data written directly even
1178 // though it's no longer in the txns set.
1179 let mut d0_write = writer(&client, d0).await;
1180 let key = "bar".to_owned();
1181 crate::small_caa(|| "test", &mut d0_write, &[((&key, &()), &5, 1)], 4, 6)
1182 .await
1183 .unwrap();
1184 log.record((d0, key, 5, 1));
1185 sub.step_past(4).await;
1186
1187 // Now finalize the shard to writes.
1188 let () = d0_write
1189 .compare_and_append_batch(&mut [], Antichain::from_elem(6), Antichain::new(), true)
1190 .await
1191 .unwrap()
1192 .unwrap();
1193 while sub.txns.less_than(&u64::MAX) {
1194 sub.step();
1195 tokio::task::yield_now().await;
1196 }
1197
1198 // Make sure we read the correct things.
1199 log.assert_eq(d0, 1, u64::MAX, sub.output().clone());
1200
1201 // Also make sure that we can read the right things if we start up after
1202 // the forget but before the direct write and ditto after the direct
1203 // write.
1204 log.assert_subscribe(d0, 4, u64::MAX).await;
1205 log.assert_subscribe(d0, 6, u64::MAX).await;
1206 }
1207
1208 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1209 #[cfg_attr(miri, ignore)] // too slow
1210 async fn subscribe_shard_register_forget() {
1211 let client = PersistClient::new_for_tests().await;
1212 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1213 let d0 = ShardId::new();
1214
1215 // Start a subscription on the data shard.
1216 let mut sub = txns.read_cache().expect_subscribe(&client, d0, 0);
1217 assert_eq!(sub.progress(), 0);
1218
1219 // Register the shard at 10.
1220 txns.register(10, [writer(&client, d0).await])
1221 .await
1222 .unwrap();
1223 sub.step_past(10).await;
1224 assert!(
1225 sub.progress() > 10,
1226 "operator should advance past 10 when shard is registered"
1227 );
1228
1229 // Forget the shard at 20.
1230 txns.forget(20, [d0]).await.unwrap();
1231 sub.step_past(20).await;
1232 assert!(
1233 sub.progress() > 20,
1234 "operator should advance past 20 when shard is forgotten"
1235 );
1236 }
1237
1238 #[mz_ore::test(tokio::test)]
1239 #[cfg_attr(miri, ignore)] // too slow
1240 async fn as_of_until() {
1241 let client = PersistClient::new_for_tests().await;
1242 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1243 let log = txns.new_log();
1244
1245 let d0 = txns.expect_register(1).await;
1246 txns.expect_commit_at(2, d0, &["2"], &log).await;
1247 txns.expect_commit_at(3, d0, &["3"], &log).await;
1248 txns.expect_commit_at(4, d0, &["4"], &log).await;
1249 txns.expect_commit_at(5, d0, &["5"], &log).await;
1250 txns.expect_commit_at(6, d0, &["6"], &log).await;
1251 txns.expect_commit_at(7, d0, &["7"], &log).await;
1252
1253 let until = 5;
1254 let mut sub = DataSubscribe::new(
1255 "as_of_until",
1256 client,
1257 txns.txns_id(),
1258 d0,
1259 3,
1260 Antichain::from_elem(until),
1261 );
1262 // Manually step the dataflow, instead of going through the
1263 // `DataSubscribe` helper because we're interested in all captured
1264 // events.
1265 while sub.txns.less_equal(&5) {
1266 sub.worker.step();
1267 tokio::task::yield_now().await;
1268 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1269 }
1270 let (actual_progresses, actual_events): (Vec<_>, Vec<_>) =
1271 sub.capture.into_iter().partition_map(|event| match event {
1272 Event::Progress(progress) => Either::Left(progress),
1273 Event::Messages(ts, data) => Either::Right((ts, data)),
1274 });
1275 // Aggregate the captured records, ignoring the stream-level
1276 // timestamp on each batch. The operator emits each container at
1277 // whatever capability it currently holds (which is determined by
1278 // its scheduling cadence and the upstream frontiers it has
1279 // observed), so the per-batch `ts` is not deterministic and not
1280 // part of the operator's contract. Per-record `(key, time, diff)`
1281 // tuples are what callers see, and the differential invariant
1282 // (stream `ts <= record time`) is checked separately below.
1283 let mut actual_records: Vec<(String, u64, i64)> = actual_events
1284 .iter()
1285 .flat_map(|(_ts, data)| data.iter().cloned())
1286 .collect();
1287 actual_records.sort();
1288 let expected_records: Vec<(String, u64, i64)> = vec![
1289 ("2".to_owned(), 3, 1),
1290 ("3".to_owned(), 3, 1),
1291 ("4".to_owned(), 4, 1),
1292 ];
1293 assert_eq!(actual_records, expected_records);
1294
1295 // Verify the differential invariant: each batch's stream
1296 // timestamp `ts` must be `<= record_time` for every record it
1297 // carries. The operator's contract requires this so that
1298 // downstream differential operators can integrate the records
1299 // at their declared times.
1300 for (ts, data) in &actual_events {
1301 for (_key, record_ts, _diff) in data {
1302 assert!(
1303 ts <= record_ts,
1304 "differential invariant violated: stream ts {ts} > record time {record_ts}",
1305 );
1306 }
1307 }
1308
1309 // The number and contents of progress messages is not guaranteed and
1310 // depends on the downgrade behavior. The only thing we can assert is
1311 // the max progress timestamp, if there is one, is less than the until.
1312 if let Some(max_progress_ts) = actual_progresses
1313 .into_iter()
1314 .flatten()
1315 .map(|(ts, _diff)| ts)
1316 .max()
1317 {
1318 assert!(max_progress_ts < until, "{max_progress_ts} < {until}");
1319 }
1320 }
1321
1322 /// Builds the sync operator for the harness.
1323 fn build_sync<'a>(
1324 remap: StreamVec<'a, u64, DataRemapEntry<u64>>,
1325 pass: StreamVec<'a, u64, i64>,
1326 until: Antichain<u64>,
1327 ) -> (StreamVec<'a, u64, i64>, PressOnDropButton) {
1328 txns_progress_frontiers::<String, (), u64, i64, i64, TxnsCodecDefault>(
1329 remap,
1330 pass,
1331 "test",
1332 ShardId::new(),
1333 until,
1334 0,
1335 )
1336 }
1337
1338 /// Generates a random schedule for the no-data-loss fuzz test. Interleaves
1339 /// remap entries/frontiers with passthrough data/frontiers. Payloads are
1340 /// unique and increasing so a single dropped or duplicated record is
1341 /// detectable; per-input times are non-decreasing (the harness requires
1342 /// this). The schedule never closes the passthrough input, and the test
1343 /// uses `until = ∅`, so the operator never has a legitimate reason to shut
1344 /// down and must pass through every record it is given.
1345 ///
1346 /// Schedules are intentionally NOT constrained to respect the remap
1347 /// "[physical_upper, logical_upper) is empty" contract. The no-data-loss
1348 /// property must hold under arbitrary interleavings, so feeding
1349 /// contract-violating schedules only strengthens the test.
1350 fn gen_schedule(seed: u64) -> Vec<Action> {
1351 // Simple xorshift RNG for determinism without extra deps.
1352 let mut state = seed.wrapping_add(0x9E3779B97F4A7C15).max(1);
1353 let mut next = || {
1354 state ^= state << 13;
1355 state ^= state >> 7;
1356 state ^= state << 17;
1357 state
1358 };
1359
1360 let mut schedule = Vec::new();
1361 let mut physical = 0u64;
1362 let mut logical = 0u64;
1363 let mut pass_frontier = 0u64;
1364 let mut payload = 0i64;
1365 let mut remap_closed = false;
1366 let steps = 8 + (next() % 16);
1367 for _ in 0..steps {
1368 match next() % 5 {
1369 0 if !remap_closed => {
1370 physical += next() % 3;
1371 logical = logical.max(physical) + (next() % 4);
1372 schedule.push(Action::Remap {
1373 physical_upper: physical,
1374 logical_upper: logical,
1375 });
1376 }
1377 1 if !remap_closed => {
1378 if next() % 8 == 0 {
1379 remap_closed = true;
1380 schedule.push(Action::RemapFrontier(None));
1381 } else {
1382 logical += next() % 3;
1383 schedule.push(Action::RemapFrontier(Some(logical)));
1384 }
1385 }
1386 2 => {
1387 let t = pass_frontier + (next() % 3);
1388 pass_frontier = t;
1389 payload += 1;
1390 schedule.push(Action::Pass {
1391 records: vec![(payload, t)],
1392 });
1393 }
1394 3 => {
1395 pass_frontier += next() % 3;
1396 schedule.push(Action::PassFrontier(Some(pass_frontier)));
1397 }
1398 _ => schedule.push(Action::Step),
1399 }
1400 schedule.push(Action::Step);
1401 }
1402 schedule
1403 }
1404
1405 /// Fuzz: under any random interleaving, the deasynced operator must emit
1406 /// every passthrough record it is given (no loss, no duplication) and must
1407 /// not prematurely shut down. With `until = ∅` and no passthrough close, the
1408 /// operator never legitimately drops its capability, so the output frontier
1409 /// must stay finite.
1410 #[mz_ore::test]
1411 #[cfg_attr(miri, ignore)] // too slow
1412 fn frontiers_fuzz_no_data_loss() {
1413 for seed in 0..500u64 {
1414 let schedule = gen_schedule(seed);
1415 let mut sent: Vec<i64> = schedule
1416 .iter()
1417 .flat_map(|a| match a {
1418 Action::Pass { records } => records.iter().map(|(p, _)| *p).collect(),
1419 _ => Vec::new(),
1420 })
1421 .collect();
1422 let (out, frontier) = run_schedule(build_sync, Antichain::new(), &schedule);
1423 let mut emitted: Vec<i64> = out.iter().map(|(p, _, _)| *p).collect();
1424 sent.sort();
1425 emitted.sort();
1426 assert_eq!(
1427 emitted, sent,
1428 "seed {seed}: operator lost or duplicated data\nschedule={schedule:?}\nout={out:?}"
1429 );
1430 assert_ne!(
1431 frontier,
1432 u64::MAX,
1433 "seed {seed}: operator prematurely shut down (empty output frontier)\nschedule={schedule:?}"
1434 );
1435 }
1436 }
1437
1438 #[mz_ore::test]
1439 #[cfg_attr(miri, ignore)] // too slow
1440 fn frontiers_sql_299_up_to_no_tail_loss() {
1441 // until = 0. A remap entry with physical_upper = 5 keeps the operator
1442 // out of the `waiting_for_remap` state (5 > cap.time() = 0), so the
1443 // until check actually fires. Buffer a record at time 0 (payload 4) and
1444 // leave it pending. In the single activation, the operator sees both the
1445 // buffered record and the passthrough frontier at 0, which already
1446 // satisfies `until <= pass_frontier` and drops the capability. The
1447 // record must be emitted before that drop, not discarded. Buffering at
1448 // time 0 (the cap's time) is what makes the record and the
1449 // until-crossing land in the same activation — with the ordered
1450 // `new_input` handle, advancing the passthrough frontier past the record
1451 // would deliver the record in an earlier activation and mask the bug.
1452 let schedule = vec![
1453 Action::Remap {
1454 physical_upper: 5,
1455 logical_upper: 5,
1456 },
1457 Action::RemapFrontier(Some(5)),
1458 Action::Pass {
1459 records: vec![(4, 0)],
1460 },
1461 Action::PassFrontier(None),
1462 Action::Step,
1463 ];
1464 let (output, _frontier) = run_schedule(build_sync, Antichain::from_elem(0), &schedule);
1465 let payloads: Vec<i64> = output.iter().map(|(p, _, _)| *p).collect();
1466 assert!(
1467 payloads.contains(&4),
1468 "buffered record at time 0 must be emitted before until-driven shutdown, got {output:?}"
1469 );
1470 }
1471
1472 #[mz_ore::test]
1473 #[cfg_attr(miri, ignore)] // too slow
1474 fn frontiers_per4_advance_after_remap_close() {
1475 // Emit a remap entry whose logical_upper (10) exceeds its physical_upper
1476 // (5). Close the remap input while the passthrough frontier is still
1477 // below physical_upper (so the capability has NOT yet advanced to
1478 // logical_upper), then advance the passthrough frontier up to
1479 // physical_upper (5). The capability must still advance to logical_upper
1480 // (10) using the remap entry retained across the close, not stall at the
1481 // passthrough frontier (5). The async impl dropped the entry on close and
1482 // stalled here (PER-4).
1483 let schedule = vec![
1484 Action::Remap {
1485 physical_upper: 5,
1486 logical_upper: 10,
1487 },
1488 Action::RemapFrontier(Some(10)),
1489 Action::Step,
1490 // Close remap before the passthrough frontier reaches physical_upper.
1491 Action::RemapFrontier(None),
1492 Action::Step,
1493 // Only now does the passthrough frontier reach physical_upper.
1494 Action::PassFrontier(Some(5)),
1495 Action::Step,
1496 ];
1497 let (_output, frontier) = run_schedule(build_sync, Antichain::new(), &schedule);
1498 assert_eq!(
1499 frontier, 10,
1500 "capability must advance to logical_upper after remap close, got {frontier}"
1501 );
1502 }
1503
1504 #[mz_ore::test]
1505 #[cfg_attr(miri, ignore)] // too slow
1506 fn frontiers_select_as_of_max_blocks() {
1507 // Mimic `SELECT AS OF MAX`: a remap entry exists with physical_upper == 0
1508 // (so physical_upper <= cap.time() and the operator waits for remap), no
1509 // further remap update arrives, and the passthrough frontier reaches the
1510 // empty antichain. The operator must NOT drop its capability (must keep
1511 // blocking), so the output frontier stays finite (0), not u64::MAX.
1512 let schedule = vec![
1513 Action::Remap {
1514 physical_upper: 0,
1515 logical_upper: 0,
1516 },
1517 Action::RemapFrontier(Some(0)),
1518 Action::PassFrontier(None),
1519 Action::Step,
1520 ];
1521 let (_output, frontier) = run_schedule(build_sync, Antichain::new(), &schedule);
1522 assert_eq!(
1523 frontier, 0,
1524 "operator must block (retain capability) while waiting for remap, got {frontier}"
1525 );
1526 }
1527}