Skip to main content

mz_txn_wal/
operator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Timely operators for the crate
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::future::Future;
15use std::rc::Rc;
16use std::sync::mpsc::TryRecvError;
17use std::sync::{Arc, mpsc};
18use std::time::Duration;
19
20use differential_dataflow::Hashable;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use mz_dyncfg::{Config, ConfigSet};
24use mz_ore::cast::CastFrom;
25use mz_persist_client::cfg::RetryParameters;
26use mz_persist_client::operators::shard_source::{
27    ErrorHandler, FilterResult, SnapshotMode, shard_source,
28};
29use mz_persist_client::{Diagnostics, PersistClient, ShardId};
30use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
31use mz_persist_types::txn::TxnsCodec;
32use mz_persist_types::{Codec, Codec64, StepForward};
33use mz_timely_util::activator::ArcActivator;
34use mz_timely_util::builder_async::{PressOnDropButton, button};
35use timely::dataflow::channels::pact::Pipeline;
36#[cfg(test)]
37use timely::dataflow::operators::Input;
38use timely::dataflow::operators::capture::Event;
39use timely::dataflow::operators::generic::OutputBuilder;
40use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
41use timely::dataflow::operators::vec::{Broadcast, Map};
42use timely::dataflow::operators::{Capture, Leave, Probe};
43use timely::dataflow::{ProbeHandle, Scope, StreamVec};
44use timely::order::TotalOrder;
45use timely::progress::{Antichain, Timestamp};
46use timely::worker::Worker;
47use timely::{PartialOrder, WorkerConfig};
48use tracing::debug;
49
50use crate::TxnsCodecDefault;
51use crate::txn_cache::TxnsCache;
52use crate::txn_read::{DataRemapEntry, TxnsRead};
53
54/// An operator for translating physical data shard frontiers into logical ones.
55///
56/// A data shard in the txns set logically advances its upper each time a txn is
57/// committed, but the upper is not physically advanced unless that data shard
58/// was involved in the txn. This means that a shard_source (or any read)
59/// pointed at a data shard would appear to stall at the time of the most recent
60/// write. We fix this for shard_source by flowing its output through a new
61/// `txns_progress` dataflow operator, which ensures that the
62/// frontier/capability is advanced as the txns shard progresses, as long as the
63/// shard_source is up to date with the latest committed write to that data
64/// shard.
65///
66/// Example:
67///
68/// - A data shard has most recently been written to at 3.
69/// - The txns shard's upper is at 6.
70/// - We render a dataflow containing a shard_source with an as_of of 5.
71/// - A txn NOT involving the data shard is committed at 7.
72/// - A txn involving the data shard is committed at 9.
73///
74/// How it works:
75///
76/// - The shard_source operator is rendered. Its single output is hooked up as a
77///   _disconnected_ input to txns_progress. The txns_progress single output is
78///   a stream of the same type, which is used by downstream operators. This
79///   txns_progress operator is targeted at one data_shard; rendering a
80///   shard_source for a second data shard requires a second txns_progress
81///   operator.
82/// - The shard_source operator emits data through 3 and advances the frontier.
83/// - The txns_progress operator passes through these writes and frontier
84///   advancements unchanged. (Recall that it's always correct to read a data
85///   shard "normally", it just might stall.) Because the txns_progress operator
86///   knows there are no writes in `[3,5]`, it then downgrades its own
87///   capability past 5 (to 6). Because the input is disconnected, this means
88///   the overall frontier of the output is downgraded to 6.
89/// - The txns_progress operator learns about the write at 7 (the upper is now
90///   8). Because it knows that the data shard was not involved in this, it's
91///   free to downgrade its capability to 8.
92/// - The txns_progress operator learns about the write at 9 (the upper is now
93///   10). It knows that the data shard _WAS_ involved in this, so it forwards
94///   on data from its input until the input has progressed to 10, at which
95///   point it can itself downgrade to 10.
96pub fn txns_progress<'scope, K, V, T, D, P, C, F>(
97    passthrough: StreamVec<'scope, T, P>,
98    name: &str,
99    ctx: &TxnsContext,
100    client_fn: impl Fn() -> F,
101    txns_id: ShardId,
102    data_id: ShardId,
103    as_of: T,
104    until: Antichain<T>,
105    data_key_schema: Arc<K::Schema>,
106    data_val_schema: Arc<V::Schema>,
107) -> (StreamVec<'scope, T, P>, Vec<PressOnDropButton>)
108where
109    K: Debug + Codec + Send + Sync,
110    V: Debug + Codec + Send + Sync,
111    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
112    D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync,
113    P: Debug + Clone + 'static,
114    C: TxnsCodec + 'static,
115    F: Future<Output = PersistClient> + Send + 'static,
116{
117    let unique_id = (name, passthrough.scope().addr()).hashed();
118    let (remap, source_button) = txns_progress_source_global::<K, V, T, D, P, C>(
119        passthrough.scope(),
120        name,
121        ctx.clone(),
122        client_fn(),
123        txns_id,
124        data_id,
125        as_of,
126        data_key_schema,
127        data_val_schema,
128        unique_id,
129    );
130    // Each of the `txns_frontiers` workers wants the full copy of the remap
131    // information.
132    let remap = remap.broadcast();
133    let (passthrough, frontiers_button) = txns_progress_frontiers::<K, V, T, D, P, C>(
134        remap,
135        passthrough,
136        name,
137        data_id,
138        until,
139        unique_id,
140    );
141    (passthrough, vec![source_button, frontiers_button])
142}
143
144/// Event sent from the subscribe Tokio task to the sync `txns_progress_source`
145/// operator. The task owns the persist resources and the `data_subscribe`
146/// receiver. The operator owns the output capability and drives the frontier.
147enum SourceEvent<T> {
148    /// A `DataRemapEntry` read from the data shard subscription.
149    Remap(DataRemapEntry<T>),
150    /// The subscription closed cleanly. The operator drops its capability and
151    /// treats a later channel disconnect as expected rather than a task panic.
152    Finished,
153}
154
155/// TODO: I'd much prefer the communication protocol between the two operators
156/// to be exactly remap as defined in the [reclocking design doc]. However, we
157/// can't quite recover exactly the information necessary to construct that at
158/// the moment. Seems worth doing, but in the meantime, intentionally make this
159/// look fairly different (`Stream` of `DataRemapEntry` instead of
160/// `Collection<FromTime>`) to hopefully minimize confusion. As a performance
161/// optimization, we only re-emit this when the _physical_ upper has changed,
162/// which means that the frontier of the `Stream<DataRemapEntry<T>>` indicates
163/// updates to the logical_upper of the most recent `DataRemapEntry` (i.e. the
164/// one with the largest physical_upper).
165///
166/// [reclocking design doc]:
167///     https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210714_reclocking.md
168fn txns_progress_source_global<'scope, K, V, T, D, P, C>(
169    scope: Scope<'scope, T>,
170    name: &str,
171    ctx: TxnsContext,
172    client: impl Future<Output = PersistClient> + Send + 'static,
173    txns_id: ShardId,
174    data_id: ShardId,
175    as_of: T,
176    data_key_schema: Arc<K::Schema>,
177    data_val_schema: Arc<V::Schema>,
178    unique_id: u64,
179) -> (StreamVec<'scope, T, DataRemapEntry<T>>, PressOnDropButton)
180where
181    K: Debug + Codec + Send + Sync,
182    V: Debug + Codec + Send + Sync,
183    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
184    D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync,
185    P: Debug + Clone + 'static,
186    C: TxnsCodec + 'static,
187{
188    let worker_idx = scope.index();
189    let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
190    let name = format!("txns_progress_source({})", name);
191    let mut builder = OperatorBuilderRc::new(name.clone(), scope.clone());
192    let info = builder.operator_info();
193    let name = format!("{} [{}] {:.9}", name, unique_id, data_id.to_string());
194    let (remap_output, remap_stream) = builder.new_output::<Vec<DataRemapEntry<T>>>();
195    let mut remap_output = OutputBuilder::from(remap_output);
196
197    let (mut shutdown_handle, shutdown_button) = button(scope.clone(), Rc::clone(&info.address));
198
199    builder.build_reschedule(move |capabilities| {
200        // The output capability's time tracks the `logical_upper` we've advanced
201        // to. `None` indicates that we've dropped the capability to shut down.
202        let [cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
203        let mut capability = Some(cap);
204
205        // The most recently observed physical upper. We emit a `DataRemapEntry`
206        // only when the physical upper changes.
207        let mut physical_upper = T::minimum();
208
209        // Per-worker state. Only the chosen worker subscribes to the data shard
210        // (via a Tokio task that owns the blocking persist I/O) and produces
211        // output. Non-chosen workers drop their capability immediately and only
212        // participate in the shutdown handshake below. `Some` holds the receiver
213        // of `SourceEvent`s, the activation ack, and the task handle, kept alive
214        // so the task is aborted when the operator is dropped.
215        let mut chosen_state = if worker_idx == chosen_worker {
216            let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<SourceEvent<T>>();
217            let (activator, activation_ack) = ArcActivator::new(scope, &info);
218
219            let task_name = name.clone();
220            let task = mz_ore::task::spawn(|| name.clone(), async move {
221                let client = client.await;
222                let txns_read = ctx.get_or_init::<T, C>(&client, txns_id).await;
223
224                let _ = txns_read.update_gt(as_of.clone()).await;
225                let data_write = client
226                    .open_writer::<K, V, T, D>(
227                        data_id,
228                        Arc::clone(&data_key_schema),
229                        Arc::clone(&data_val_schema),
230                        Diagnostics::from_purpose("data read physical upper"),
231                    )
232                    .await
233                    .expect("schema shouldn't change");
234                let mut rx = txns_read
235                    .data_subscribe(data_id, as_of.clone(), data_write)
236                    .await;
237                debug!("{} starting as_of={:?}", task_name, as_of);
238
239                while let Some(remap) = rx.recv().await {
240                    if event_tx.send(SourceEvent::Remap(remap)).is_err() {
241                        // The operator is gone. Stop.
242                        return;
243                    }
244                    activator.activate();
245                }
246                // The subscription closed. Signal the operator so it drops its
247                // output capability.
248                let _ = event_tx.send(SourceEvent::Finished);
249                activator.activate();
250            })
251            .abort_on_drop();
252
253            Some((event_rx, activation_ack, task))
254        } else {
255            // Non-chosen workers contribute nothing to the output frontier.
256            capability = None;
257            None
258        };
259
260        // Whether we've observed `SourceEvent::Finished`, so a subsequent
261        // channel disconnect is expected rather than a task panic.
262        let mut finished = false;
263
264        move |_frontiers| {
265            // On a local shutdown press, hold the capability and stay scheduled
266            // until all workers have pressed, then release. Dropping the
267            // capability on the local press alone would let the downstream
268            // frontier advance during cross-worker teardown skew, past times
269            // whose input this worker has already discarded.
270            if shutdown_handle.local_pressed() {
271                return if shutdown_handle.all_pressed() {
272                    capability = None;
273                    // Drop the receiver, ack, and task handle, aborting the task.
274                    chosen_state = None;
275                    false
276                } else {
277                    true
278                };
279            }
280
281            let Some((event_rx, activation_ack, _task)) = chosen_state.as_mut() else {
282                // Non-chosen worker: nothing to do. Stay alive (the button
283                // channel reschedules us) for the shutdown handshake above.
284                return false;
285            };
286            // Acknowledge the activation so the Tokio task can activate us again.
287            activation_ack.ack();
288
289            let mut output = remap_output.activate();
290            loop {
291                match event_rx.try_recv() {
292                    Ok(SourceEvent::Remap(remap)) => {
293                        let Some(cap) = capability.as_mut() else {
294                            // Already shut down, so drop any straggling events.
295                            continue;
296                        };
297                        assert!(physical_upper <= remap.physical_upper);
298                        assert!(physical_upper < remap.logical_upper);
299
300                        let logical_upper = remap.logical_upper.clone();
301                        // Emit at the pre-downgrade capability, then downgrade.
302                        if remap.physical_upper != physical_upper {
303                            physical_upper = remap.physical_upper.clone();
304                            debug!("{} emitting {:?}", name, remap);
305                            output.session(&*cap).give(remap);
306                        } else {
307                            debug!("{} not emitting {:?}", name, remap);
308                        }
309                        cap.downgrade(&logical_upper);
310                    }
311                    Ok(SourceEvent::Finished) => {
312                        // Subscription closed cleanly. Drop the capability.
313                        finished = true;
314                        capability = None;
315                    }
316                    Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
317                    Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
318                        // A task panic aborts the process via the enhanced panic
319                        // handler, so this assert is only a safety net for
320                        // environments that do not abort. On the panic path the
321                        // task never calls `activate()`, so it fires only if the
322                        // operator is rescheduled for another reason.
323                        assert!(finished, "txns_progress_source task unexpectedly gone");
324                        break;
325                    }
326                }
327            }
328
329            false
330        }
331    });
332
333    (remap_stream, shutdown_button.press_on_drop())
334}
335
336/// The block ordering inside the schedule closure is load-bearing: pending
337/// passthrough input is emitted at the pre-activation capability BEFORE any
338/// capability downgrade, which keeps the differential invariant `send_time <=
339/// record_time` and avoids dropping in-flight rows when the passthrough
340/// frontier crosses `until` in the same activation (SQL-299). Do not reorder.
341fn txns_progress_frontiers<'scope, K, V, T, D, P, C>(
342    remap: StreamVec<'scope, T, DataRemapEntry<T>>,
343    passthrough: StreamVec<'scope, T, P>,
344    name: &str,
345    data_id: ShardId,
346    until: Antichain<T>,
347    unique_id: u64,
348) -> (StreamVec<'scope, T, P>, PressOnDropButton)
349where
350    K: Debug + Codec,
351    V: Debug + Codec,
352    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
353    D: Clone + 'static + Monoid + Codec64 + Send + Sync,
354    P: Debug + Clone + 'static,
355    C: TxnsCodec,
356{
357    let scope = passthrough.scope();
358    let name = format!("txns_progress_frontiers({})", name);
359    let mut builder = OperatorBuilderRc::new(name.clone(), scope.clone());
360    let info = builder.operator_info();
361    let name = format!(
362        "{} [{}] {}/{} {:.9}",
363        name,
364        unique_id,
365        scope.index(),
366        scope.peers(),
367        data_id.to_string(),
368    );
369    let (passthrough_output, passthrough_stream) = builder.new_output::<Vec<P>>();
370    let mut passthrough_output = OutputBuilder::from(passthrough_output);
371    // Both inputs are disconnected from the output: capability advancement is
372    // driven manually based on the remap stream and the passthrough frontier.
373    // NB: the output is created BEFORE the inputs on purpose. `new_output`
374    // connects to whatever inputs already exist (here, none); the `[]`
375    // connection arg below records the input-to-output summary but does not by
376    // itself disconnect the output. Creating an input before the output would
377    // silently connect them and break the manual capability management.
378    let mut remap_input = builder.new_input_connection(remap, Pipeline, []);
379    let mut passthrough_input = builder.new_input_connection(passthrough, Pipeline, []);
380
381    let (mut shutdown_handle, shutdown_button) = button(scope, info.address);
382
383    builder.build_reschedule(move |capabilities| {
384        // The output capability's time tracks how far we've progressed in
385        // copying along the passthrough input. `None` indicates that we've
386        // dropped the capability to shut down.
387        let [cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
388        let mut capability = Some(cap);
389        // The most recently observed remap state. Retained even after the remap
390        // input closes so we can still advance the output capability to the
391        // last known `logical_upper` while the passthrough input is draining.
392        // This deliberately diverges from the async impl, which dropped the
393        // entry on close and stalled (PER-4).
394        let mut remap = DataRemapEntry {
395            physical_upper: T::minimum(),
396            logical_upper: T::minimum(),
397        };
398        // Whether the remap input has reached the empty antichain.
399        let mut remap_closed = false;
400
401        move |frontiers| {
402            // If our worker pressed the button we stop producing data and
403            // frontier updates downstream, but mirror `builder_async`: hold the
404            // capability and stop draining the inputs until ALL workers have
405            // pressed. Dropping the capability on the local press alone would
406            // let the downstream frontier advance during cross-worker teardown
407            // skew, past times whose data this worker has discarded, while
408            // other workers' operator instances still feed downstream.
409            if shutdown_handle.local_pressed() {
410                return if shutdown_handle.all_pressed() {
411                    // All workers pressed: drop the capability and drain the
412                    // inputs so teardown does not stall the dataflow.
413                    capability = None;
414                    remap_input.for_each(|_input_cap, _data| {});
415                    passthrough_input.for_each(|_input_cap, _data| {});
416                    false
417                } else {
418                    // Wedge: keep the capability, leave the inputs undrained
419                    // (their pending messages hold the frontier), and ask to be
420                    // rescheduled until the remaining workers press.
421                    true
422                };
423            }
424
425            // Fold new DataRemapEntries, keeping the one with the largest
426            // logical_upper. The ordering of incoming entries is not assumed.
427            remap_input.for_each(|_input_cap, data| {
428                for x in data.drain(..) {
429                    debug!("{} got remap {:?}", name, x);
430                    if remap.logical_upper < x.logical_upper {
431                        assert!(
432                            remap.physical_upper <= x.physical_upper,
433                            "previous remap physical upper {:?} is ahead of new remap physical upper {:?}",
434                            remap.physical_upper,
435                            x.physical_upper,
436                        );
437                        // TODO: If the physical upper has advanced, that's a very
438                        // strong hint that the data shard is about to be written to.
439                        // Because the data shard's upper advances sparsely (on write,
440                        // but not on passage of time) which invalidates the "every 1s"
441                        // assumption of the default tuning, we've had to de-tune the
442                        // listen sleeps on the paired persist_source. Maybe we use "one
443                        // state" to wake it up in case pubsub doesn't and remove the
444                        // listen polling entirely? (NB: This would have to happen in
445                        // each worker so that it's guaranteed to happen in each
446                        // process.)
447                        remap = x;
448                    }
449                }
450            });
451
452            // Apply the remap input's frontier as a `logical_upper` bump. We do
453            // not discard `remap` on the empty antichain: the last observed
454            // entry remains valid and lets the capability still advance past
455            // `physical_upper` while the passthrough input drains.
456            if let Some(logical_upper) = frontiers[0].frontier().as_option() {
457                if remap.logical_upper < *logical_upper {
458                    remap.logical_upper = logical_upper.clone();
459                }
460            } else {
461                remap_closed = true;
462            }
463
464            debug!("{} remap {:?} remap_closed={}", name, remap, remap_closed);
465
466            // Pass through any data the passthrough input has pending, at the
467            // current (pre-downgrade) capability, BEFORE any downgrade below.
468            // `cap.time()` here equals the pre-activation frontier, which is
469            // `<=` every pending record's time, so the differential invariant
470            // `send_time <= record_time` holds. Doing this before the
471            // `until`-driven drop is the SQL-299 fix. NB: nothing to do for
472            // `until` because the shard_source (before) and mfp_and_decode
473            // (after) filter.
474            if let Some(cap) = capability.as_ref() {
475                let mut output = passthrough_output.activate();
476                passthrough_input.for_each(|_input_cap, data| {
477                    debug!("{} emitting data {:?}", name, data);
478                    output.session(cap).give_container(data);
479                });
480            } else {
481                // Still drain to avoid stalling the dataflow.
482                passthrough_input.for_each(|_input_cap, _data| {});
483            }
484
485            // Only consult the passthrough frontier when not waiting on remap to
486            // push `physical_upper` past the capability. While `physical_upper
487            // <= cap.time()` and the remap input is open, the next expected
488            // event is a remap update that jumps `cap` to `logical_upper`, not a
489            // passthrough advance. Consulting the passthrough frontier then can
490            // drop the capability prematurely (e.g. `SELECT AS OF MAX`, where no
491            // remap update ever arrives and the passthrough side reports the
492            // empty antichain). Once remap is closed, the passthrough frontier
493            // is the only remaining driver.
494            let waiting_for_remap = match capability.as_ref() {
495                Some(cap) => !remap_closed && remap.physical_upper.less_equal(cap.time()),
496                None => false,
497            };
498            if !waiting_for_remap {
499                // Apply the passthrough input's frontier.
500                //
501                // If `until.less_equal(pass_frontier)`, it means that all
502                // subsequent batches will contain only times greater or equal
503                // to `until`, which means they can be dropped in their entirety.
504                //
505                // Ideally this check would live in `txns_progress_source`, but
506                // that turns out to be much more invasive (requires replacing
507                // lots of `T`s with `Antichain<T>`s). Given that we've been
508                // thinking about reworking the operators, do the easy but more
509                // wasteful thing for now.
510                let pass_frontier = frontiers[1].frontier();
511                if PartialOrder::less_equal(&until.borrow(), &pass_frontier) {
512                    debug!(
513                        "{} progress {:?} has passed until {:?}",
514                        name,
515                        pass_frontier,
516                        until.elements(),
517                    );
518                    capability = None;
519                } else if let Some(new_progress) = pass_frontier.as_option() {
520                    // Recall that any reads of the data shard are always
521                    // correct, so given that we've passed through any data from
522                    // the input, that means we're free to pass through frontier
523                    // updates too.
524                    if let Some(cap) = capability.as_mut() {
525                        if cap.time() < new_progress {
526                            debug!("{} downgrading cap to {:?}", name, new_progress);
527                            cap.downgrade(new_progress);
528                        }
529                    }
530                } else {
531                    // Reached the empty frontier; shut down.
532                    capability = None;
533                }
534            }
535
536            // If we've copied passthrough data to at least `physical_upper`, we
537            // can artificially advance the output to `logical_upper`. By the
538            // emptiness of `[physical_upper, logical_upper)`, no record still in
539            // flight lies below `logical_upper`, so this never strands data.
540            if let Some(cap) = capability.as_mut() {
541                assert!(remap.physical_upper <= remap.logical_upper);
542                let phys_reached = remap.physical_upper.less_equal(cap.time());
543                let logical_ahead = cap.time() < &remap.logical_upper;
544                if phys_reached && logical_ahead {
545                    cap.downgrade(&remap.logical_upper);
546                }
547            }
548
549            false
550        }
551    });
552
553    (passthrough_stream, shutdown_button.press_on_drop())
554}
555
556/// The process global [`TxnsRead`] that any operator can communicate with.
557#[derive(Default, Debug, Clone)]
558pub struct TxnsContext {
559    read: Arc<tokio::sync::OnceCell<Box<dyn Any + Send + Sync>>>,
560}
561
562impl TxnsContext {
563    async fn get_or_init<T, C>(&self, client: &PersistClient, txns_id: ShardId) -> TxnsRead<T>
564    where
565        T: Timestamp + Lattice + Codec64 + TotalOrder + StepForward + Sync,
566        C: TxnsCodec + 'static,
567    {
568        let read = self
569            .read
570            .get_or_init(|| {
571                let client = client.clone();
572                async move {
573                    let read: Box<dyn Any + Send + Sync> =
574                        Box::new(TxnsRead::<T>::start::<C>(client, txns_id).await);
575                    read
576                }
577            })
578            .await
579            .downcast_ref::<TxnsRead<T>>()
580            .expect("timestamp types should match");
581        // We initially only have one txns shard in the system.
582        assert_eq!(&txns_id, read.txns_id());
583        read.clone()
584    }
585}
586
587// Existing configs use the prefix "persist_txns_" for historical reasons. New
588// configs should use the prefix "txn_wal_".
589
590pub(crate) const DATA_SHARD_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
591    "persist_txns_data_shard_retryer_initial_backoff",
592    Duration::from_millis(1024),
593    "The initial backoff when polling for new batches from a txns data shard persist_source.",
594);
595
596pub(crate) const DATA_SHARD_RETRYER_MULTIPLIER: Config<u32> = Config::new(
597    "persist_txns_data_shard_retryer_multiplier",
598    2,
599    "The backoff multiplier when polling for new batches from a txns data shard persist_source.",
600);
601
602pub(crate) const DATA_SHARD_RETRYER_CLAMP: Config<Duration> = Config::new(
603    "persist_txns_data_shard_retryer_clamp",
604    Duration::from_secs(16),
605    "The backoff clamp duration when polling for new batches from a txns data shard persist_source.",
606);
607
608/// Retry configuration for txn-wal data shard override of
609/// `next_listen_batch`.
610pub fn txns_data_shard_retry_params(cfg: &ConfigSet) -> RetryParameters {
611    RetryParameters {
612        fixed_sleep: Duration::ZERO,
613        initial_backoff: DATA_SHARD_RETRYER_INITIAL_BACKOFF.get(cfg),
614        multiplier: DATA_SHARD_RETRYER_MULTIPLIER.get(cfg),
615        clamp: DATA_SHARD_RETRYER_CLAMP.get(cfg),
616    }
617}
618
619/// A helper for subscribing to a data shard using the timely operators.
620///
621/// This could instead be a wrapper around a [Subscribe], but it's only used in
622/// tests and maelstrom, so do it by wrapping the timely operators to get
623/// additional coverage. For the same reason, hardcode the K, V, T, D types.
624///
625/// [Subscribe]: mz_persist_client::read::Subscribe
626pub struct DataSubscribe {
627    pub(crate) as_of: u64,
628    pub(crate) worker: Worker,
629    data: ProbeHandle<u64>,
630    txns: ProbeHandle<u64>,
631    capture: mpsc::Receiver<Event<u64, Vec<(String, u64, i64)>>>,
632    output: Vec<(String, u64, i64)>,
633
634    _tokens: Vec<PressOnDropButton>,
635}
636
637impl std::fmt::Debug for DataSubscribe {
638    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639        let DataSubscribe {
640            as_of,
641            worker: _,
642            data,
643            txns,
644            capture: _,
645            output,
646            _tokens: _,
647        } = self;
648        f.debug_struct("DataSubscribe")
649            .field("as_of", as_of)
650            .field("data", data)
651            .field("txns", txns)
652            .field("output", output)
653            .finish_non_exhaustive()
654    }
655}
656
657impl DataSubscribe {
658    /// Creates a new [DataSubscribe].
659    pub fn new(
660        name: &str,
661        client: PersistClient,
662        txns_id: ShardId,
663        data_id: ShardId,
664        as_of: u64,
665        until: Antichain<u64>,
666    ) -> Self {
667        let mut worker = Worker::new(
668            WorkerConfig::default(),
669            timely::communication::Allocator::Thread(
670                timely::communication::allocator::Thread::default(),
671            ),
672            Some(std::time::Instant::now()),
673        );
674        let (data, txns, capture, tokens) = worker.dataflow::<u64, _, _>(|outer| {
675            let (data_stream, shard_source_token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
676                let client = client.clone();
677                let (data_stream, token) = shard_source::<String, (), u64, i64, _, _, _>(
678                    outer,
679                    scope,
680                    name,
681                    move || std::future::ready(client.clone()),
682                    data_id,
683                    Some(Antichain::from_elem(as_of)),
684                    SnapshotMode::Include,
685                    until.clone(),
686                    false.then_some(|_, _, _| unreachable!()),
687                    Arc::new(StringSchema),
688                    Arc::new(UnitSchema),
689                    FilterResult::keep_all,
690                    false.then_some(|| unreachable!()),
691                    async {},
692                    ErrorHandler::Halt("data_subscribe"),
693                );
694                (data_stream.leave(outer), token)
695            });
696            let (data, txns) = (ProbeHandle::new(), ProbeHandle::new());
697            let data_stream = data_stream.flat_map(|part| {
698                let part = part.parse();
699                part.part.map(|((k, ()), t, d)| (k, t, d))
700            });
701            let data_stream = data_stream.probe_with(&data);
702            let (data_stream, mut txns_progress_token) =
703                txns_progress::<String, (), u64, i64, _, TxnsCodecDefault, _>(
704                    data_stream,
705                    name,
706                    &TxnsContext::default(),
707                    || std::future::ready(client.clone()),
708                    txns_id,
709                    data_id,
710                    as_of,
711                    until,
712                    Arc::new(StringSchema),
713                    Arc::new(UnitSchema),
714                );
715            let data_stream = data_stream.probe_with(&txns);
716            let mut tokens = shard_source_token;
717            tokens.append(&mut txns_progress_token);
718            (data, txns, data_stream.capture(), tokens)
719        });
720        Self {
721            as_of,
722            worker,
723            data,
724            txns,
725            capture,
726            output: Vec::new(),
727            _tokens: tokens,
728        }
729    }
730
731    /// Returns the exclusive progress of the dataflow.
732    pub fn progress(&self) -> u64 {
733        self.txns
734            .with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX))
735    }
736
737    /// Steps the dataflow, capturing output.
738    pub fn step(&mut self) {
739        self.worker.step();
740        self.capture_output()
741    }
742
743    pub(crate) fn capture_output(&mut self) {
744        loop {
745            let event = match self.capture.try_recv() {
746                Ok(x) => x,
747                Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
748            };
749            match event {
750                Event::Progress(_) => {}
751                Event::Messages(_, mut msgs) => self.output.append(&mut msgs),
752            }
753        }
754    }
755
756    /// Steps the dataflow past the given time, capturing output.
757    #[cfg(test)]
758    pub async fn step_past(&mut self, ts: u64) {
759        while self.txns.less_equal(&ts) {
760            tracing::trace!(
761                "progress at {:?}",
762                self.txns.with_frontier(|x| x.to_owned()).elements()
763            );
764            self.step();
765            tokio::task::yield_now().await;
766        }
767    }
768
769    /// Returns captured output.
770    pub fn output(&self) -> &Vec<(String, u64, i64)> {
771        &self.output
772    }
773}
774
775/// A handle to a [DataSubscribe] running in a task.
776#[derive(Debug)]
777pub struct DataSubscribeTask {
778    /// Carries step requests. A `None` timestamp requests one step, a
779    /// `Some(ts)` requests stepping until we progress beyond `ts`.
780    tx: std::sync::mpsc::Sender<(
781        Option<u64>,
782        tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
783    )>,
784    task: mz_ore::task::JoinHandle<Vec<(String, u64, i64)>>,
785    output: Vec<(String, u64, i64)>,
786    progress: u64,
787}
788
789impl DataSubscribeTask {
790    /// Creates a new [DataSubscribeTask].
791    pub async fn new(
792        client: PersistClient,
793        txns_id: ShardId,
794        data_id: ShardId,
795        as_of: u64,
796    ) -> Self {
797        let cache = TxnsCache::open(&client, txns_id, Some(data_id)).await;
798        let (tx, rx) = std::sync::mpsc::channel();
799        let task = mz_ore::task::spawn_blocking(
800            || "data_subscribe task",
801            move || Self::task(client, cache, data_id, as_of, rx),
802        );
803        DataSubscribeTask {
804            tx,
805            task,
806            output: Vec::new(),
807            progress: 0,
808        }
809    }
810
811    #[cfg(test)]
812    async fn step(&mut self) {
813        self.send(None).await;
814    }
815
816    /// Steps the dataflow past the given time, capturing output.
817    pub async fn step_past(&mut self, ts: u64) -> u64 {
818        self.send(Some(ts)).await;
819        self.progress
820    }
821
822    /// Returns captured output.
823    pub fn output(&self) -> &Vec<(String, u64, i64)> {
824        &self.output
825    }
826
827    async fn send(&mut self, ts: Option<u64>) {
828        let (tx, rx) = tokio::sync::oneshot::channel();
829        self.tx.send((ts, tx)).expect("task should be running");
830        let (mut new_output, new_progress) = rx.await.expect("task should be running");
831        self.output.append(&mut new_output);
832        assert!(self.progress <= new_progress);
833        self.progress = new_progress;
834    }
835
836    /// Signals for the task to exit, and then waits for this to happen.
837    ///
838    /// _All_ output from the lifetime of the task (not just what was previously
839    /// captured) is returned.
840    pub async fn finish(self) -> Vec<(String, u64, i64)> {
841        // Closing the channel signals the task to exit.
842        drop(self.tx);
843        self.task.await
844    }
845
846    fn task(
847        client: PersistClient,
848        cache: TxnsCache<u64>,
849        data_id: ShardId,
850        as_of: u64,
851        rx: std::sync::mpsc::Receiver<(
852            Option<u64>,
853            tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
854        )>,
855    ) -> Vec<(String, u64, i64)> {
856        let mut subscribe = DataSubscribe::new(
857            "DataSubscribeTask",
858            client.clone(),
859            cache.txns_id(),
860            data_id,
861            as_of,
862            Antichain::new(),
863        );
864        let mut output = Vec::new();
865        loop {
866            let (ts, tx) = match rx.try_recv() {
867                Ok(x) => x,
868                Err(TryRecvError::Empty) => {
869                    // No requests, continue stepping so nothing deadlocks.
870                    subscribe.step();
871                    continue;
872                }
873                Err(TryRecvError::Disconnected) => {
874                    // All done! Return our output.
875                    return output;
876                }
877            };
878            // Always step at least once.
879            subscribe.step();
880            // If we got a ts, make sure to step past it.
881            if let Some(ts) = ts {
882                while subscribe.progress() <= ts {
883                    subscribe.step();
884                }
885            }
886            let new_output = std::mem::take(&mut subscribe.output);
887            output.extend(new_output.iter().cloned());
888            let _ = tx.send((new_output, subscribe.progress()));
889        }
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use itertools::{Either, Itertools};
896
897    use crate::tests::writer;
898    use crate::txns::TxnsHandle;
899
900    use super::*;
901
902    /// One scripted action applied to the operator's two inputs.
903    #[derive(Debug, Clone)]
904    enum Action {
905        /// Send a `DataRemapEntry` on the remap input.
906        Remap {
907            physical_upper: u64,
908            logical_upper: u64,
909        },
910        /// Advance the remap input frontier to `ts` (empty antichain if `None`).
911        RemapFrontier(Option<u64>),
912        /// Send passthrough data records (as `(payload, time)`), then leave them buffered.
913        Pass { records: Vec<(i64, u64)> },
914        /// Advance the passthrough input frontier to `ts` (empty antichain if `None`).
915        PassFrontier(Option<u64>),
916        /// Step the worker once.
917        Step,
918    }
919
920    /// Runs `schedule` against the operator built by `build`, returning the
921    /// captured output events and the final exclusive output frontier. Each
922    /// event is shaped as `(payload, time, count)`, where `count` is synthesized
923    /// as `1` so the output looks like a differential collection.
924    fn run_schedule(
925        build: impl for<'a> Fn(
926            StreamVec<'a, u64, DataRemapEntry<u64>>,
927            StreamVec<'a, u64, i64>,
928            Antichain<u64>,
929        ) -> (StreamVec<'a, u64, i64>, PressOnDropButton),
930        until: Antichain<u64>,
931        schedule: &[Action],
932    ) -> (Vec<(i64, u64, i64)>, u64) {
933        let mut worker = Worker::new(
934            WorkerConfig::default(),
935            timely::communication::Allocator::Thread(
936                timely::communication::allocator::Thread::default(),
937            ),
938            Some(std::time::Instant::now()),
939        );
940
941        // The button must outlive the run: dropping it presses the shutdown
942        // handle, which makes the operator drop its capability on the next
943        // activation. Hold it until after the drain loop completes.
944        let (remap_handle, pass_handle, probe, capture, _button) =
945            worker.dataflow::<u64, _, _>(|scope| {
946                let (remap_handle, remap_stream) = scope.new_input::<Vec<DataRemapEntry<u64>>>();
947                let (pass_handle, pass_stream) = scope.new_input::<Vec<i64>>();
948                let (out, button) = build(remap_stream, pass_stream, until.clone());
949                let probe = ProbeHandle::new();
950                let out = out.probe_with(&probe);
951                (remap_handle, pass_handle, probe, out.capture(), button)
952            });
953
954        // timely input handles can only `advance_to` forward in time. Track the
955        // last time used on each input so we can fail loudly with a useful
956        // message instead of panicking deep inside timely on a decreasing time.
957        let mut last_remap_ts = 0u64;
958        let mut last_pass_ts = 0u64;
959        // Held in `Option`s so a `*Frontier(None)` action can `take` and drop the
960        // handle, which closes the input to the empty antichain. Advancing to
961        // `u64::MAX` is NOT equivalent: it leaves the input's frontier at
962        // `Some(u64::MAX)`, which the operator (correctly) treats as a finite
963        // `logical_upper`/passthrough advance rather than a closed input.
964        let mut remap_handle = Some(remap_handle);
965        let mut pass_handle = Some(pass_handle);
966        for action in schedule {
967            match action.clone() {
968                // `Remap` is a `send` at the handle's current time, so it carries
969                // no explicit time and needs no monotonicity assert.
970                Action::Remap {
971                    physical_upper,
972                    logical_upper,
973                } => remap_handle
974                    .as_mut()
975                    .expect("remap input still open")
976                    .send(DataRemapEntry {
977                        physical_upper,
978                        logical_upper,
979                    }),
980                Action::RemapFrontier(Some(ts)) => {
981                    assert!(
982                        ts >= last_remap_ts,
983                        "Action::RemapFrontier time {ts} < previous remap time {last_remap_ts}; per-input times must be non-decreasing"
984                    );
985                    last_remap_ts = ts;
986                    remap_handle
987                        .as_mut()
988                        .expect("remap input still open")
989                        .advance_to(ts);
990                }
991                // Drop the handle to close the input to the empty antichain.
992                Action::RemapFrontier(None) => {
993                    last_remap_ts = u64::MAX;
994                    drop(remap_handle.take());
995                }
996                Action::Pass { records } => {
997                    let handle = pass_handle.as_mut().expect("passthrough input still open");
998                    for (payload, time) in records {
999                        assert!(
1000                            time >= last_pass_ts,
1001                            "Action::Pass time {time} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing"
1002                        );
1003                        last_pass_ts = time;
1004                        // `advance_to` is what makes each record's time visible to
1005                        // the operator; the subsequent `send` emits the payload at
1006                        // that time. Both impls consume the identical schedule, so
1007                        // the exact send mechanics need only be self-consistent.
1008                        handle.advance_to(time);
1009                        handle.send(payload);
1010                    }
1011                }
1012                Action::PassFrontier(Some(ts)) => {
1013                    assert!(
1014                        ts >= last_pass_ts,
1015                        "Action::PassFrontier time {ts} < previous passthrough time {last_pass_ts}; per-input times must be non-decreasing"
1016                    );
1017                    last_pass_ts = ts;
1018                    pass_handle
1019                        .as_mut()
1020                        .expect("passthrough input still open")
1021                        .advance_to(ts);
1022                }
1023                // Drop the handle to close the input to the empty antichain.
1024                Action::PassFrontier(None) => {
1025                    last_pass_ts = u64::MAX;
1026                    drop(pass_handle.take());
1027                }
1028                Action::Step => {
1029                    worker.step();
1030                }
1031            }
1032        }
1033        // Drain: flush inputs and step until the output probe frontier stops
1034        // advancing. A hard cap PANICS so a buggy operator that never settles
1035        // fails loudly instead of silently returning partial results.
1036        if let Some(handle) = remap_handle.as_mut() {
1037            handle.flush();
1038        }
1039        if let Some(handle) = pass_handle.as_mut() {
1040            handle.flush();
1041        }
1042        let mut last = probe.with_frontier(|f| f.to_owned());
1043        let mut stable = 0;
1044        for step in 0.. {
1045            assert!(
1046                step < 4096,
1047                "run_schedule did not quiesce within 4096 steps"
1048            );
1049            worker.step();
1050            let now = probe.with_frontier(|f| f.to_owned());
1051            if now == last {
1052                stable += 1;
1053                // Require a few consecutive no-change steps so in-flight messages flush.
1054                if stable >= 8 {
1055                    break;
1056                }
1057            } else {
1058                stable = 0;
1059                last = now;
1060            }
1061        }
1062
1063        let frontier = probe.with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX));
1064        let mut output = Vec::new();
1065        while let Ok(event) = capture.try_recv() {
1066            if let Event::Messages(time, msgs) = event {
1067                for payload in msgs {
1068                    output.push((payload, time, 1));
1069                }
1070            }
1071        }
1072        (output, frontier)
1073    }
1074
1075    impl<K, V, T, D, C> TxnsHandle<K, V, T, D, C>
1076    where
1077        K: Debug + Codec,
1078        V: Debug + Codec,
1079        T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
1080        D: Debug + Monoid + Ord + Codec64 + Send + Sync,
1081        C: TxnsCodec,
1082    {
1083        async fn subscribe_task(
1084            &self,
1085            client: &PersistClient,
1086            data_id: ShardId,
1087            as_of: u64,
1088        ) -> DataSubscribeTask {
1089            DataSubscribeTask::new(client.clone(), self.txns_id(), data_id, as_of).await
1090        }
1091    }
1092
1093    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1094    #[cfg_attr(miri, ignore)] // too slow
1095    async fn data_subscribe() {
1096        async fn step(subs: &mut Vec<DataSubscribeTask>) {
1097            for sub in subs.iter_mut() {
1098                sub.step().await;
1099            }
1100        }
1101
1102        let client = PersistClient::new_for_tests().await;
1103        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1104        let log = txns.new_log();
1105        let d0 = ShardId::new();
1106
1107        // Start a subscription before the shard gets registered.
1108        let mut subs = Vec::new();
1109        subs.push(txns.subscribe_task(&client, d0, 5).await);
1110        step(&mut subs).await;
1111
1112        // Now register the shard. Also start a new subscription and step the
1113        // previous one (plus repeat this for every later step).
1114        txns.register(1, [writer(&client, d0).await]).await.unwrap();
1115        subs.push(txns.subscribe_task(&client, d0, 5).await);
1116        step(&mut subs).await;
1117
1118        // Now write something unrelated.
1119        let d1 = txns.expect_register(2).await;
1120        txns.expect_commit_at(3, d1, &["nope"], &log).await;
1121        subs.push(txns.subscribe_task(&client, d0, 5).await);
1122        step(&mut subs).await;
1123
1124        // Now write to our shard before.
1125        txns.expect_commit_at(4, d0, &["4"], &log).await;
1126        subs.push(txns.subscribe_task(&client, d0, 5).await);
1127        step(&mut subs).await;
1128
1129        // Now write to our shard at the as_of.
1130        txns.expect_commit_at(5, d0, &["5"], &log).await;
1131        subs.push(txns.subscribe_task(&client, d0, 5).await);
1132        step(&mut subs).await;
1133
1134        // Now write to our shard past the as_of.
1135        txns.expect_commit_at(6, d0, &["6"], &log).await;
1136        subs.push(txns.subscribe_task(&client, d0, 5).await);
1137        step(&mut subs).await;
1138
1139        // Now write something unrelated again.
1140        txns.expect_commit_at(7, d1, &["nope"], &log).await;
1141        subs.push(txns.subscribe_task(&client, d0, 5).await);
1142        step(&mut subs).await;
1143
1144        // Verify that the dataflows can progress to the expected point and that
1145        // we read the right thing no matter when the dataflow started.
1146        for mut sub in subs {
1147            let progress = sub.step_past(7).await;
1148            assert_eq!(progress, 8);
1149            log.assert_eq(d0, 5, 8, sub.finish().await);
1150        }
1151    }
1152
1153    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1154    #[cfg_attr(miri, ignore)] // too slow
1155    async fn subscribe_shard_finalize() {
1156        let client = PersistClient::new_for_tests().await;
1157        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1158        let log = txns.new_log();
1159        let d0 = txns.expect_register(1).await;
1160
1161        // Start the operator as_of the register ts.
1162        let mut sub = txns.read_cache().expect_subscribe(&client, d0, 1);
1163        sub.step_past(1).await;
1164
1165        // Write to it via txns.
1166        txns.expect_commit_at(2, d0, &["foo"], &log).await;
1167        sub.step_past(2).await;
1168
1169        // Unregister it.
1170        txns.forget(3, [d0]).await.unwrap();
1171        sub.step_past(3).await;
1172
1173        // TODO: Hard mode, see if we can get the rest of this test to work even
1174        // _without_ the txns shard advancing.
1175        txns.begin().commit_at(&mut txns, 7).await.unwrap();
1176
1177        // The operator should continue to emit data written directly even
1178        // though it's no longer in the txns set.
1179        let mut d0_write = writer(&client, d0).await;
1180        let key = "bar".to_owned();
1181        crate::small_caa(|| "test", &mut d0_write, &[((&key, &()), &5, 1)], 4, 6)
1182            .await
1183            .unwrap();
1184        log.record((d0, key, 5, 1));
1185        sub.step_past(4).await;
1186
1187        // Now finalize the shard to writes.
1188        let () = d0_write
1189            .compare_and_append_batch(&mut [], Antichain::from_elem(6), Antichain::new(), true)
1190            .await
1191            .unwrap()
1192            .unwrap();
1193        while sub.txns.less_than(&u64::MAX) {
1194            sub.step();
1195            tokio::task::yield_now().await;
1196        }
1197
1198        // Make sure we read the correct things.
1199        log.assert_eq(d0, 1, u64::MAX, sub.output().clone());
1200
1201        // Also make sure that we can read the right things if we start up after
1202        // the forget but before the direct write and ditto after the direct
1203        // write.
1204        log.assert_subscribe(d0, 4, u64::MAX).await;
1205        log.assert_subscribe(d0, 6, u64::MAX).await;
1206    }
1207
1208    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1209    #[cfg_attr(miri, ignore)] // too slow
1210    async fn subscribe_shard_register_forget() {
1211        let client = PersistClient::new_for_tests().await;
1212        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1213        let d0 = ShardId::new();
1214
1215        // Start a subscription on the data shard.
1216        let mut sub = txns.read_cache().expect_subscribe(&client, d0, 0);
1217        assert_eq!(sub.progress(), 0);
1218
1219        // Register the shard at 10.
1220        txns.register(10, [writer(&client, d0).await])
1221            .await
1222            .unwrap();
1223        sub.step_past(10).await;
1224        assert!(
1225            sub.progress() > 10,
1226            "operator should advance past 10 when shard is registered"
1227        );
1228
1229        // Forget the shard at 20.
1230        txns.forget(20, [d0]).await.unwrap();
1231        sub.step_past(20).await;
1232        assert!(
1233            sub.progress() > 20,
1234            "operator should advance past 20 when shard is forgotten"
1235        );
1236    }
1237
1238    #[mz_ore::test(tokio::test)]
1239    #[cfg_attr(miri, ignore)] // too slow
1240    async fn as_of_until() {
1241        let client = PersistClient::new_for_tests().await;
1242        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1243        let log = txns.new_log();
1244
1245        let d0 = txns.expect_register(1).await;
1246        txns.expect_commit_at(2, d0, &["2"], &log).await;
1247        txns.expect_commit_at(3, d0, &["3"], &log).await;
1248        txns.expect_commit_at(4, d0, &["4"], &log).await;
1249        txns.expect_commit_at(5, d0, &["5"], &log).await;
1250        txns.expect_commit_at(6, d0, &["6"], &log).await;
1251        txns.expect_commit_at(7, d0, &["7"], &log).await;
1252
1253        let until = 5;
1254        let mut sub = DataSubscribe::new(
1255            "as_of_until",
1256            client,
1257            txns.txns_id(),
1258            d0,
1259            3,
1260            Antichain::from_elem(until),
1261        );
1262        // Manually step the dataflow, instead of going through the
1263        // `DataSubscribe` helper because we're interested in all captured
1264        // events.
1265        while sub.txns.less_equal(&5) {
1266            sub.worker.step();
1267            tokio::task::yield_now().await;
1268            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1269        }
1270        let (actual_progresses, actual_events): (Vec<_>, Vec<_>) =
1271            sub.capture.into_iter().partition_map(|event| match event {
1272                Event::Progress(progress) => Either::Left(progress),
1273                Event::Messages(ts, data) => Either::Right((ts, data)),
1274            });
1275        // Aggregate the captured records, ignoring the stream-level
1276        // timestamp on each batch. The operator emits each container at
1277        // whatever capability it currently holds (which is determined by
1278        // its scheduling cadence and the upstream frontiers it has
1279        // observed), so the per-batch `ts` is not deterministic and not
1280        // part of the operator's contract. Per-record `(key, time, diff)`
1281        // tuples are what callers see, and the differential invariant
1282        // (stream `ts <= record time`) is checked separately below.
1283        let mut actual_records: Vec<(String, u64, i64)> = actual_events
1284            .iter()
1285            .flat_map(|(_ts, data)| data.iter().cloned())
1286            .collect();
1287        actual_records.sort();
1288        let expected_records: Vec<(String, u64, i64)> = vec![
1289            ("2".to_owned(), 3, 1),
1290            ("3".to_owned(), 3, 1),
1291            ("4".to_owned(), 4, 1),
1292        ];
1293        assert_eq!(actual_records, expected_records);
1294
1295        // Verify the differential invariant: each batch's stream
1296        // timestamp `ts` must be `<= record_time` for every record it
1297        // carries. The operator's contract requires this so that
1298        // downstream differential operators can integrate the records
1299        // at their declared times.
1300        for (ts, data) in &actual_events {
1301            for (_key, record_ts, _diff) in data {
1302                assert!(
1303                    ts <= record_ts,
1304                    "differential invariant violated: stream ts {ts} > record time {record_ts}",
1305                );
1306            }
1307        }
1308
1309        // The number and contents of progress messages is not guaranteed and
1310        // depends on the downgrade behavior. The only thing we can assert is
1311        // the max progress timestamp, if there is one, is less than the until.
1312        if let Some(max_progress_ts) = actual_progresses
1313            .into_iter()
1314            .flatten()
1315            .map(|(ts, _diff)| ts)
1316            .max()
1317        {
1318            assert!(max_progress_ts < until, "{max_progress_ts} < {until}");
1319        }
1320    }
1321
1322    /// Builds the sync operator for the harness.
1323    fn build_sync<'a>(
1324        remap: StreamVec<'a, u64, DataRemapEntry<u64>>,
1325        pass: StreamVec<'a, u64, i64>,
1326        until: Antichain<u64>,
1327    ) -> (StreamVec<'a, u64, i64>, PressOnDropButton) {
1328        txns_progress_frontiers::<String, (), u64, i64, i64, TxnsCodecDefault>(
1329            remap,
1330            pass,
1331            "test",
1332            ShardId::new(),
1333            until,
1334            0,
1335        )
1336    }
1337
1338    /// Generates a random schedule for the no-data-loss fuzz test. Interleaves
1339    /// remap entries/frontiers with passthrough data/frontiers. Payloads are
1340    /// unique and increasing so a single dropped or duplicated record is
1341    /// detectable; per-input times are non-decreasing (the harness requires
1342    /// this). The schedule never closes the passthrough input, and the test
1343    /// uses `until = ∅`, so the operator never has a legitimate reason to shut
1344    /// down and must pass through every record it is given.
1345    ///
1346    /// Schedules are intentionally NOT constrained to respect the remap
1347    /// "[physical_upper, logical_upper) is empty" contract. The no-data-loss
1348    /// property must hold under arbitrary interleavings, so feeding
1349    /// contract-violating schedules only strengthens the test.
1350    fn gen_schedule(seed: u64) -> Vec<Action> {
1351        // Simple xorshift RNG for determinism without extra deps.
1352        let mut state = seed.wrapping_add(0x9E3779B97F4A7C15).max(1);
1353        let mut next = || {
1354            state ^= state << 13;
1355            state ^= state >> 7;
1356            state ^= state << 17;
1357            state
1358        };
1359
1360        let mut schedule = Vec::new();
1361        let mut physical = 0u64;
1362        let mut logical = 0u64;
1363        let mut pass_frontier = 0u64;
1364        let mut payload = 0i64;
1365        let mut remap_closed = false;
1366        let steps = 8 + (next() % 16);
1367        for _ in 0..steps {
1368            match next() % 5 {
1369                0 if !remap_closed => {
1370                    physical += next() % 3;
1371                    logical = logical.max(physical) + (next() % 4);
1372                    schedule.push(Action::Remap {
1373                        physical_upper: physical,
1374                        logical_upper: logical,
1375                    });
1376                }
1377                1 if !remap_closed => {
1378                    if next() % 8 == 0 {
1379                        remap_closed = true;
1380                        schedule.push(Action::RemapFrontier(None));
1381                    } else {
1382                        logical += next() % 3;
1383                        schedule.push(Action::RemapFrontier(Some(logical)));
1384                    }
1385                }
1386                2 => {
1387                    let t = pass_frontier + (next() % 3);
1388                    pass_frontier = t;
1389                    payload += 1;
1390                    schedule.push(Action::Pass {
1391                        records: vec![(payload, t)],
1392                    });
1393                }
1394                3 => {
1395                    pass_frontier += next() % 3;
1396                    schedule.push(Action::PassFrontier(Some(pass_frontier)));
1397                }
1398                _ => schedule.push(Action::Step),
1399            }
1400            schedule.push(Action::Step);
1401        }
1402        schedule
1403    }
1404
1405    /// Fuzz: under any random interleaving, the deasynced operator must emit
1406    /// every passthrough record it is given (no loss, no duplication) and must
1407    /// not prematurely shut down. With `until = ∅` and no passthrough close, the
1408    /// operator never legitimately drops its capability, so the output frontier
1409    /// must stay finite.
1410    #[mz_ore::test]
1411    #[cfg_attr(miri, ignore)] // too slow
1412    fn frontiers_fuzz_no_data_loss() {
1413        for seed in 0..500u64 {
1414            let schedule = gen_schedule(seed);
1415            let mut sent: Vec<i64> = schedule
1416                .iter()
1417                .flat_map(|a| match a {
1418                    Action::Pass { records } => records.iter().map(|(p, _)| *p).collect(),
1419                    _ => Vec::new(),
1420                })
1421                .collect();
1422            let (out, frontier) = run_schedule(build_sync, Antichain::new(), &schedule);
1423            let mut emitted: Vec<i64> = out.iter().map(|(p, _, _)| *p).collect();
1424            sent.sort();
1425            emitted.sort();
1426            assert_eq!(
1427                emitted, sent,
1428                "seed {seed}: operator lost or duplicated data\nschedule={schedule:?}\nout={out:?}"
1429            );
1430            assert_ne!(
1431                frontier,
1432                u64::MAX,
1433                "seed {seed}: operator prematurely shut down (empty output frontier)\nschedule={schedule:?}"
1434            );
1435        }
1436    }
1437
1438    #[mz_ore::test]
1439    #[cfg_attr(miri, ignore)] // too slow
1440    fn frontiers_sql_299_up_to_no_tail_loss() {
1441        // until = 0. A remap entry with physical_upper = 5 keeps the operator
1442        // out of the `waiting_for_remap` state (5 > cap.time() = 0), so the
1443        // until check actually fires. Buffer a record at time 0 (payload 4) and
1444        // leave it pending. In the single activation, the operator sees both the
1445        // buffered record and the passthrough frontier at 0, which already
1446        // satisfies `until <= pass_frontier` and drops the capability. The
1447        // record must be emitted before that drop, not discarded. Buffering at
1448        // time 0 (the cap's time) is what makes the record and the
1449        // until-crossing land in the same activation — with the ordered
1450        // `new_input` handle, advancing the passthrough frontier past the record
1451        // would deliver the record in an earlier activation and mask the bug.
1452        let schedule = vec![
1453            Action::Remap {
1454                physical_upper: 5,
1455                logical_upper: 5,
1456            },
1457            Action::RemapFrontier(Some(5)),
1458            Action::Pass {
1459                records: vec![(4, 0)],
1460            },
1461            Action::PassFrontier(None),
1462            Action::Step,
1463        ];
1464        let (output, _frontier) = run_schedule(build_sync, Antichain::from_elem(0), &schedule);
1465        let payloads: Vec<i64> = output.iter().map(|(p, _, _)| *p).collect();
1466        assert!(
1467            payloads.contains(&4),
1468            "buffered record at time 0 must be emitted before until-driven shutdown, got {output:?}"
1469        );
1470    }
1471
1472    #[mz_ore::test]
1473    #[cfg_attr(miri, ignore)] // too slow
1474    fn frontiers_per4_advance_after_remap_close() {
1475        // Emit a remap entry whose logical_upper (10) exceeds its physical_upper
1476        // (5). Close the remap input while the passthrough frontier is still
1477        // below physical_upper (so the capability has NOT yet advanced to
1478        // logical_upper), then advance the passthrough frontier up to
1479        // physical_upper (5). The capability must still advance to logical_upper
1480        // (10) using the remap entry retained across the close, not stall at the
1481        // passthrough frontier (5). The async impl dropped the entry on close and
1482        // stalled here (PER-4).
1483        let schedule = vec![
1484            Action::Remap {
1485                physical_upper: 5,
1486                logical_upper: 10,
1487            },
1488            Action::RemapFrontier(Some(10)),
1489            Action::Step,
1490            // Close remap before the passthrough frontier reaches physical_upper.
1491            Action::RemapFrontier(None),
1492            Action::Step,
1493            // Only now does the passthrough frontier reach physical_upper.
1494            Action::PassFrontier(Some(5)),
1495            Action::Step,
1496        ];
1497        let (_output, frontier) = run_schedule(build_sync, Antichain::new(), &schedule);
1498        assert_eq!(
1499            frontier, 10,
1500            "capability must advance to logical_upper after remap close, got {frontier}"
1501        );
1502    }
1503
1504    #[mz_ore::test]
1505    #[cfg_attr(miri, ignore)] // too slow
1506    fn frontiers_select_as_of_max_blocks() {
1507        // Mimic `SELECT AS OF MAX`: a remap entry exists with physical_upper == 0
1508        // (so physical_upper <= cap.time() and the operator waits for remap), no
1509        // further remap update arrives, and the passthrough frontier reaches the
1510        // empty antichain. The operator must NOT drop its capability (must keep
1511        // blocking), so the output frontier stays finite (0), not u64::MAX.
1512        let schedule = vec![
1513            Action::Remap {
1514                physical_upper: 0,
1515                logical_upper: 0,
1516            },
1517            Action::RemapFrontier(Some(0)),
1518            Action::PassFrontier(None),
1519            Action::Step,
1520        ];
1521        let (_output, frontier) = run_schedule(build_sync, Antichain::new(), &schedule);
1522        assert_eq!(
1523            frontier, 0,
1524            "operator must block (retain capability) while waiting for remap, got {frontier}"
1525        );
1526    }
1527}