Skip to main content

mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30    ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{
37    Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
38};
39use mz_storage_types::StorageDiff;
40use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
41use mz_storage_types::errors::DataflowError;
42use mz_storage_types::sources::SourceData;
43use mz_storage_types::stats::RelationPartStats;
44use mz_timely_util::builder_async::{
45    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::probe::ProbeNotify;
48use mz_txn_wal::operator::{TxnsContext, txns_progress};
49use serde::{Deserialize, Serialize};
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::ScopeParent;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::scopes::Child;
59use timely::dataflow::{Scope, Stream, StreamVec};
60use timely::order::TotalOrder;
61use timely::progress::Antichain;
62use timely::progress::Timestamp as TimelyTimestamp;
63use timely::progress::timestamp::PathSummary;
64use timely::scheduling::Activator;
65use tokio::sync::mpsc::UnboundedSender;
66use tracing::{error, trace};
67
68use crate::metrics::BackpressureMetrics;
69
70/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
71/// progress than would otherwise be possible.
72///
73/// This is "opaque" since we'd like to reserve the right to change the definition in the future
74/// without downstreams being able to rely on the precise representation. (At the moment, this
75/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
76/// in the future.)
77#[derive(
78    Copy,
79    Clone,
80    PartialEq,
81    Default,
82    Eq,
83    PartialOrd,
84    Ord,
85    Debug,
86    Serialize,
87    Deserialize,
88    Hash
89)]
90pub struct Subtime(u64);
91
92impl PartialOrder for Subtime {
93    fn less_equal(&self, other: &Self) -> bool {
94        self.0.less_equal(&other.0)
95    }
96}
97
98impl TotalOrder for Subtime {}
99
100impl PathSummary<Subtime> for Subtime {
101    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
102        self.0.results_in(&src.0).map(Subtime)
103    }
104
105    fn followed_by(&self, other: &Self) -> Option<Self> {
106        self.0.followed_by(&other.0).map(Subtime)
107    }
108}
109
110impl TimelyTimestamp for Subtime {
111    type Summary = Subtime;
112
113    fn minimum() -> Self {
114        Subtime(0)
115    }
116}
117
118impl Subtime {
119    /// The smallest non-zero summary for the opaque timestamp type.
120    pub const fn least_summary() -> Self {
121        Subtime(1)
122    }
123}
124
125/// Creates a new source that reads from a persist shard, distributing the work
126/// of reading data to all timely workers.
127///
128/// All times emitted will have been [advanced by] the given `as_of` frontier.
129/// All updates at times greater or equal to `until` will be suppressed.
130/// The `map_filter_project` argument, if supplied, may be partially applied,
131/// and any un-applied part of the argument will be left behind in the argument.
132///
133/// Users of this function have the ability to apply flow control to the output
134/// to limit the in-flight data (measured in bytes) it can emit. The flow control
135/// input is a timely stream that communicates the frontier at which the data
136/// emitted from by this source have been dropped.
137///
138/// **Note:** Because this function is reading batches from `persist`, it is working
139/// at batch granularity. In practice, the source will be overshooting the target
140/// flow control upper by an amount that is related to the size of batches.
141///
142/// If no flow control is desired an empty stream whose frontier immediately advances
143/// to the empty antichain can be used. An easy easy of creating such stream is by
144/// using [`timely::dataflow::operators::generic::operator::empty`].
145///
146/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
147pub fn persist_source<G>(
148    scope: &mut G,
149    source_id: GlobalId,
150    persist_clients: Arc<PersistClientCache>,
151    txns_ctx: &TxnsContext,
152    metadata: CollectionMetadata,
153    read_schema: Option<RelationDesc>,
154    as_of: Option<Antichain<Timestamp>>,
155    snapshot_mode: SnapshotMode,
156    until: Antichain<Timestamp>,
157    map_filter_project: Option<&mut MfpPlan>,
158    max_inflight_bytes: Option<usize>,
159    start_signal: impl Future<Output = ()> + 'static,
160    error_handler: ErrorHandler,
161) -> (
162    StreamVec<G, (Row, Timestamp, Diff)>,
163    StreamVec<G, (DataflowError, Timestamp, Diff)>,
164    Vec<PressOnDropButton>,
165)
166where
167    G: Scope<Timestamp = mz_repr::Timestamp>,
168{
169    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
170
171    let mut tokens = vec![];
172
173    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
174        let (flow_control, flow_control_probe) = match max_inflight_bytes {
175            Some(max_inflight_bytes) => {
176                let backpressure_metrics = BackpressureMetrics {
177                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
178                    last_backpressured_bytes: Arc::clone(
179                        &shard_metrics.backpressure_last_backpressured_bytes,
180                    ),
181                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
182                };
183
184                let probe = mz_timely_util::probe::Handle::default();
185                let progress_stream = mz_timely_util::probe::source(
186                    scope.clone(),
187                    format!("decode_backpressure_probe({source_id})"),
188                    probe.clone(),
189                );
190                let flow_control = FlowControl {
191                    progress_stream,
192                    max_inflight_bytes,
193                    summary: (Default::default(), Subtime::least_summary()),
194                    metrics: Some(backpressure_metrics),
195                };
196                (Some(flow_control), Some(probe))
197            }
198            None => (None, None),
199        };
200
201        // Our default listen sleeps are tuned for the case of a shard that is
202        // written once a second, but txn-wal allows these to be lazy.
203        // Override the tuning to reduce crdb load. The pubsub fallback
204        // responsibility is then replaced by manual "one state" wakeups in the
205        // txns_progress operator.
206        let cfg = Arc::clone(&persist_clients.cfg().configs);
207        let subscribe_sleep = match metadata.txns_shard {
208            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
209            None => None,
210        };
211
212        let (stream, source_tokens) = persist_source_core(
213            scope,
214            source_id,
215            Arc::clone(&persist_clients),
216            metadata.clone(),
217            read_schema,
218            as_of.clone(),
219            snapshot_mode,
220            until.clone(),
221            map_filter_project,
222            flow_control,
223            subscribe_sleep,
224            start_signal,
225            error_handler,
226        );
227        tokens.extend(source_tokens);
228
229        let stream = match flow_control_probe {
230            Some(probe) => stream.probe_notify_with(vec![probe]),
231            None => stream,
232        };
233
234        stream.leave()
235    });
236
237    // If a txns_shard was provided, then this shard is in the txn-wal
238    // system. This means the "logical" upper may be ahead of the "physical"
239    // upper. Render a dataflow operator that passes through the input and
240    // translates the progress frontiers as necessary.
241    let (stream, txns_tokens) = match metadata.txns_shard {
242        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
243            stream,
244            &source_id.to_string(),
245            txns_ctx,
246            move || {
247                let (c, l) = (
248                    Arc::clone(&persist_clients),
249                    metadata.persist_location.clone(),
250                );
251                async move { c.open(l).await.expect("location is valid") }
252            },
253            txns_shard,
254            metadata.data_shard,
255            as_of
256                .expect("as_of is provided for table sources")
257                .into_option()
258                .expect("shard is not closed"),
259            until,
260            Arc::new(metadata.relation_desc),
261            Arc::new(UnitSchema),
262        ),
263        None => (stream, vec![]),
264    };
265    tokens.extend(txns_tokens);
266    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
267        Ok(row) => Ok((row, t.0, r)),
268        Err(err) => Err((err, t.0, r)),
269    });
270    (ok_stream, err_stream, tokens)
271}
272
273type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
274
275/// Creates a new source that reads from a persist shard, distributing the work
276/// of reading data to all timely workers.
277///
278/// All times emitted will have been [advanced by] the given `as_of` frontier.
279///
280/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
281#[allow(clippy::needless_borrow)]
282pub fn persist_source_core<'g, G>(
283    scope: &RefinedScope<'g, G>,
284    source_id: GlobalId,
285    persist_clients: Arc<PersistClientCache>,
286    metadata: CollectionMetadata,
287    read_schema: Option<RelationDesc>,
288    as_of: Option<Antichain<Timestamp>>,
289    snapshot_mode: SnapshotMode,
290    until: Antichain<Timestamp>,
291    map_filter_project: Option<&mut MfpPlan>,
292    flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
293    // If Some, an override for the default listen sleep retry parameters.
294    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
295    start_signal: impl Future<Output = ()> + 'static,
296    error_handler: ErrorHandler,
297) -> (
298    Stream<
299        RefinedScope<'g, G>,
300        Vec<(
301            Result<Row, DataflowError>,
302            (mz_repr::Timestamp, Subtime),
303            Diff,
304        )>,
305    >,
306    Vec<PressOnDropButton>,
307)
308where
309    G: Scope<Timestamp = mz_repr::Timestamp>,
310{
311    let cfg = persist_clients.cfg().clone();
312    let name = source_id.to_string();
313    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
314
315    // N.B. `read_schema` may be a subset of the total columns for this shard.
316    let read_desc = match read_schema {
317        Some(desc) => desc,
318        None => metadata.relation_desc,
319    };
320
321    let desc_transformer = match flow_control {
322        Some(flow_control) => Some(move |mut scope: _, descs: Stream<_, _>, chosen_worker| {
323            let (stream, token) = backpressure(
324                &mut scope,
325                &format!("backpressure({source_id})"),
326                descs,
327                flow_control,
328                chosen_worker,
329                None,
330            );
331            (stream, vec![token])
332        }),
333        None => None,
334    };
335
336    let metrics = Arc::clone(persist_clients.metrics());
337    let filter_name = name.clone();
338    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
339    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
340    // `until ` is the empty antichain.
341    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
342    let (fetched, token) = shard_source(
343        &mut scope.clone(),
344        &name,
345        move || {
346            let (c, l) = (
347                Arc::clone(&persist_clients),
348                metadata.persist_location.clone(),
349            );
350            async move { c.open(l).await.unwrap() }
351        },
352        metadata.data_shard,
353        as_of,
354        snapshot_mode,
355        until.clone(),
356        desc_transformer,
357        Arc::new(read_desc.clone()),
358        Arc::new(UnitSchema),
359        move |stats, frontier| {
360            let Some(lower) = frontier.as_option().copied() else {
361                // If the frontier has advanced to the empty antichain,
362                // we'll never emit any rows from any part.
363                return FilterResult::Discard;
364            };
365
366            if lower > upper {
367                // The frontier timestamp is larger than the until of the dataflow:
368                // anything from this part will necessarily be filtered out.
369                return FilterResult::Discard;
370            }
371
372            let time_range =
373                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
374            if let Some(plan) = &filter_plan {
375                let metrics = &metrics.pushdown.part_stats;
376                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
377                filter_result(&read_desc, time_range, stats, plan)
378            } else {
379                FilterResult::Keep
380            }
381        },
382        listen_sleep,
383        start_signal,
384        error_handler,
385    );
386    let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
387    (rows, token)
388}
389
390fn filter_result(
391    relation_desc: &RelationDesc,
392    time_range: ResultSpec,
393    stats: RelationPartStats,
394    plan: &MfpPlan,
395) -> FilterResult {
396    let arena = RowArena::new();
397    let relation = ReprRelationType::from(relation_desc.typ());
398    let mut ranges = ColumnSpecs::new(&relation, &arena);
399    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
400
401    let may_error = stats.err_count().map_or(true, |count| count > 0);
402
403    // N.B. We may have pushed down column "demands" into Persist, so this
404    // relation desc may have a different set of columns than the stats.
405    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
406        let result_spec = stats.col_stats(idx, &arena);
407        ranges.push_column(pos, result_spec);
408    }
409    let result = ranges.mfp_plan_filter(plan).range;
410    let may_error = may_error || result.may_fail();
411    let may_keep = result.may_contain(Datum::True);
412    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
413    if relation_desc.len() == 0 && !may_error && !may_skip {
414        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
415            return FilterResult::Keep;
416        };
417        key.append(&SourceData(Ok(Row::default())));
418        let key = key.finish();
419        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
420            return FilterResult::Keep;
421        };
422        val.append(&());
423        let val = val.finish();
424
425        FilterResult::ReplaceWith {
426            key: Arc::new(key),
427            val: Arc::new(val),
428        }
429    } else if may_error || may_keep {
430        FilterResult::Keep
431    } else {
432        FilterResult::Discard
433    }
434}
435
436pub fn decode_and_mfp<G>(
437    cfg: PersistConfig,
438    fetched: StreamVec<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
439    name: &str,
440    until: Antichain<Timestamp>,
441    mut map_filter_project: Option<&mut MfpPlan>,
442) -> StreamVec<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
443where
444    G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
445{
446    let scope = fetched.scope();
447    let mut builder = OperatorBuilder::new(
448        format!("persist_source::decode_and_mfp({})", name),
449        scope.clone(),
450    );
451    let operator_info = builder.operator_info();
452
453    let mut fetched_input = builder.new_input(fetched, Pipeline);
454    let (updates_output, updates_stream) = builder.new_output();
455    let mut updates_output = OutputBuilder::from(updates_output);
456
457    // Re-used state for processing and building rows.
458    let mut datum_vec = mz_repr::DatumVec::new();
459    let mut row_builder = Row::default();
460
461    // Extract the MFP if it exists; leave behind an identity MFP in that case.
462    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
463
464    builder.build(move |_caps| {
465        let name = name.to_owned();
466        // Acquire an activator to reschedule the operator when it has unfinished work.
467        let activations = scope.activations();
468        let activator = Activator::new(operator_info.address, activations);
469        // Maintain a list of work to do
470        let mut pending_work = std::collections::VecDeque::new();
471        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
472
473        move |_frontier| {
474            fetched_input.for_each(|time, data| {
475                let capability = time.retain(0);
476                for fetched_blob in data.drain(..) {
477                    pending_work.push_back(PendingWork {
478                        panic_on_audit_failure: panic_on_audit_failure.get(),
479                        capability: capability.clone(),
480                        part: PendingPart::Unparsed(fetched_blob),
481                    })
482                }
483            });
484
485            // Get dyncfg values once per schedule to amortize the cost of
486            // loading the atomics.
487            let yield_fuel = cfg.storage_source_decode_fuel();
488            let yield_fn = |_, work| work >= yield_fuel;
489
490            let mut work = 0;
491            let start_time = Instant::now();
492            let mut output = updates_output.activate();
493            while !pending_work.is_empty() && !yield_fn(start_time, work) {
494                let done = pending_work.front_mut().unwrap().do_work(
495                    &mut work,
496                    &name,
497                    start_time,
498                    yield_fn,
499                    &until,
500                    map_filter_project.as_ref(),
501                    &mut datum_vec,
502                    &mut row_builder,
503                    &mut output,
504                );
505                if done {
506                    pending_work.pop_front();
507                }
508            }
509            if !pending_work.is_empty() {
510                activator.activate();
511            }
512        }
513    });
514
515    updates_stream
516}
517
518/// Pending work to read from fetched parts
519struct PendingWork {
520    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
521    panic_on_audit_failure: bool,
522    /// The time at which the work should happen.
523    capability: Capability<(mz_repr::Timestamp, Subtime)>,
524    /// Pending fetched part.
525    part: PendingPart,
526}
527
528enum PendingPart {
529    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
530    Parsed {
531        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
532    },
533}
534
535impl PendingPart {
536    /// Returns the contained `FetchedPart`, first parsing it from a
537    /// `FetchedBlob` if necessary.
538    ///
539    /// Also returns a bool, which is true if the part is known (from pushdown
540    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
541    /// is known to contain errors or if it's unknown.
542    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
543        match self {
544            PendingPart::Unparsed(x) => {
545                *self = PendingPart::Parsed { part: x.parse() };
546                // Won't recurse any further.
547                self.part_mut()
548            }
549            PendingPart::Parsed { part } => &mut part.part,
550        }
551    }
552}
553
554impl PendingWork {
555    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
556    /// `yield_fn` whether more fuel is available.
557    fn do_work<YFn>(
558        &mut self,
559        work: &mut usize,
560        name: &str,
561        start_time: Instant,
562        yield_fn: YFn,
563        until: &Antichain<Timestamp>,
564        map_filter_project: Option<&MfpPlan>,
565        datum_vec: &mut DatumVec,
566        row_builder: &mut Row,
567        output: &mut OutputBuilderSession<
568            '_,
569            (mz_repr::Timestamp, Subtime),
570            ConsolidatingContainerBuilder<
571                Vec<(
572                    Result<Row, DataflowError>,
573                    (mz_repr::Timestamp, Subtime),
574                    Diff,
575                )>,
576            >,
577        >,
578    ) -> bool
579    where
580        YFn: Fn(Instant, usize) -> bool,
581    {
582        let mut session = output.session_with_builder(&self.capability);
583        let fetched_part = self.part.part_mut();
584        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
585        let mut row_buf = None;
586        while let Some(((key, val), time, diff)) =
587            fetched_part.next_with_storage(&mut row_buf, &mut None)
588        {
589            if until.less_equal(&time) {
590                continue;
591            }
592            match (key, val) {
593                (SourceData(Ok(row)), ()) => {
594                    if let Some(mfp) = map_filter_project {
595                        // We originally accounted work as the number of outputs, to give downstream
596                        // operators a chance to reduce down anything we've emitted. This mfp call
597                        // might have a restrictive filter, which would have been counted as no
598                        // work. However, in practice, we've been decode_and_mfp be a source of
599                        // interactivity loss during rehydration, so we now also count each mfp
600                        // evaluation against our fuel.
601                        *work += 1;
602                        let arena = mz_repr::RowArena::new();
603                        let mut datums_local = datum_vec.borrow_with(&row);
604                        for result in mfp.evaluate(
605                            &mut datums_local,
606                            &arena,
607                            time,
608                            diff.into(),
609                            |time| !until.less_equal(time),
610                            row_builder,
611                        ) {
612                            // Earlier we decided this Part doesn't need to be fetched, but to
613                            // audit our logic we fetched it any way. If the MFP returned data it
614                            // means our earlier decision to not fetch this part was incorrect.
615                            if let Some(stats) = &is_filter_pushdown_audit {
616                                // NB: The tag added by this scope is used for alerting. The panic
617                                // message may be changed arbitrarily, but the tag key and val must
618                                // stay the same.
619                                sentry::with_scope(
620                                    |scope| {
621                                        scope
622                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
623                                    },
624                                    || {
625                                        error!(
626                                            ?stats,
627                                            name,
628                                            ?mfp,
629                                            ?result,
630                                            "persist filter pushdown correctness violation!"
631                                        );
632                                        if self.panic_on_audit_failure {
633                                            panic!(
634                                                "persist filter pushdown correctness violation! {}",
635                                                name
636                                            );
637                                        }
638                                    },
639                                );
640                            }
641                            match result {
642                                Ok((row, time, diff)) => {
643                                    // Additional `until` filtering due to temporal filters.
644                                    if !until.less_equal(&time) {
645                                        let mut emit_time = *self.capability.time();
646                                        emit_time.0 = time;
647                                        session.give((Ok(row), emit_time, diff));
648                                        *work += 1;
649                                    }
650                                }
651                                Err((err, time, diff)) => {
652                                    // Additional `until` filtering due to temporal filters.
653                                    if !until.less_equal(&time) {
654                                        let mut emit_time = *self.capability.time();
655                                        emit_time.0 = time;
656                                        session.give((Err(err), emit_time, diff));
657                                        *work += 1;
658                                    }
659                                }
660                            }
661                        }
662                        // At the moment, this is the only case where we can re-use the allocs for
663                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
664                        // operator used a different container than `Vec<Row>`.
665                        drop(datums_local);
666                        row_buf.replace(SourceData(Ok(row)));
667                    } else {
668                        let mut emit_time = *self.capability.time();
669                        emit_time.0 = time;
670                        // Clone row so we retain our row allocation.
671                        session.give((Ok(row.clone()), emit_time, diff.into()));
672                        row_buf.replace(SourceData(Ok(row)));
673                        *work += 1;
674                    }
675                }
676                (SourceData(Err(err)), ()) => {
677                    let mut emit_time = *self.capability.time();
678                    emit_time.0 = time;
679                    session.give((Err(err), emit_time, diff.into()));
680                    *work += 1;
681                }
682            }
683            if yield_fn(start_time, *work) {
684                return false;
685            }
686        }
687        true
688    }
689}
690
691/// A trait representing a type that can be used in `backpressure`.
692pub trait Backpressureable: Clone + 'static {
693    /// Return the weight of the object, in bytes.
694    fn byte_size(&self) -> usize;
695}
696
697impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
698    fn byte_size(&self) -> usize {
699        self.1.encoded_size_bytes()
700    }
701}
702
703/// Flow control configuration.
704#[derive(Debug)]
705pub struct FlowControl<G: Scope> {
706    /// Stream providing in-flight frontier updates.
707    ///
708    /// As implied by its type, this stream never emits data, only progress updates.
709    ///
710    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
711    pub progress_stream: StreamVec<G, Infallible>,
712    /// Maximum number of in-flight bytes.
713    pub max_inflight_bytes: usize,
714    /// The minimum range of timestamps (be they granular or not) that must be emitted,
715    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
716    pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
717
718    /// Optional metrics for the `backpressure` operator to keep up-to-date.
719    pub metrics: Option<BackpressureMetrics>,
720}
721
722/// Apply flow control to the `data` input, based on the given `FlowControl`.
723///
724/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
725/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
726/// and a `summary`. Note that the `data` input expects all the second part of the tuple
727/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
728///
729/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
730/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
731/// `max_inflight_bytes`.
732///
733/// The implementation of this operator is very subtle. Many inline comments have been added.
734pub fn backpressure<T, G, O>(
735    scope: &mut G,
736    name: &str,
737    data: StreamVec<G, O>,
738    flow_control: FlowControl<G>,
739    chosen_worker: usize,
740    // A probe used to inspect this operator during unit-testing
741    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
742) -> (StreamVec<G, O>, PressOnDropButton)
743where
744    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
745    G: Scope<Timestamp = (T, Subtime)>,
746    O: Backpressureable + std::fmt::Debug,
747{
748    let worker_index = scope.index();
749
750    let (flow_control_stream, flow_control_max_bytes, metrics) = (
751        flow_control.progress_stream,
752        flow_control.max_inflight_bytes,
753        flow_control.metrics,
754    );
755
756    // Both the `flow_control` input and the data input are disconnected from the output. We manually
757    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
758    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
759    // non-circular fashion.
760    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
761    flow_control_stream.connect_loop(handle);
762
763    let mut builder = AsyncOperatorBuilder::new(
764        format!("persist_source_backpressure({})", name),
765        scope.clone(),
766    );
767    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
768
769    let mut data_input = builder.new_disconnected_input(data, Pipeline);
770    let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
771
772    // Helper method used to synthesize current and next frontier for ordered times.
773    fn synthesize_frontiers<T: PartialOrder + Clone>(
774        mut frontier: Antichain<(T, Subtime)>,
775        mut time: (T, Subtime),
776        part_number: &mut u64,
777    ) -> (
778        (T, Subtime),
779        Antichain<(T, Subtime)>,
780        Antichain<(T, Subtime)>,
781    ) {
782        let mut next_frontier = frontier.clone();
783        time.1 = Subtime(*part_number);
784        frontier.insert(time.clone());
785        *part_number += 1;
786        let mut next_time = time.clone();
787        next_time.1 = Subtime(*part_number);
788        next_frontier.insert(next_time);
789        (time, frontier, next_frontier)
790    }
791
792    // _Refine_ the data stream by amending the second input with the part number. This also
793    // ensures that we order the parts by time.
794    let data_input = async_stream::stream!({
795        let mut part_number = 0;
796        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
797        loop {
798            match data_input.next().await {
799                None => {
800                    let empty = Antichain::new();
801                    parts.sort_by_key(|val| val.0.clone());
802                    for (part_time, d) in parts.drain(..) {
803                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
804                            empty.clone(),
805                            part_time.clone(),
806                            &mut part_number,
807                        );
808                        yield Either::Right((part_time, d, frontier, next_frontier))
809                    }
810                    break;
811                }
812                Some(Event::Data(time, data)) => {
813                    for d in data {
814                        parts.push((time.clone(), d));
815                    }
816                }
817                Some(Event::Progress(prog)) => {
818                    parts.sort_by_key(|val| val.0.clone());
819                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
820                        let (part_time, frontier, next_frontier) =
821                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
822                        yield Either::Right((part_time, d, frontier, next_frontier))
823                    }
824                    yield Either::Left(prog)
825                }
826            }
827        }
828    });
829    let shutdown_button = builder.build(move |caps| async move {
830        // The output capability.
831        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
832
833        // The frontier of our output. This matches the `CapabilitySet` above.
834        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
835        // The frontier of the `flow_control` input.
836        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
837
838        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
839        let mut inflight_parts = Vec::new();
840        // Parts we have not yet emitted, but do participate in the `input_frontier`.
841        let mut pending_parts = std::collections::VecDeque::new();
842
843        // Only one worker is responsible for distributing parts
844        if worker_index != chosen_worker {
845            trace!(
846                "We are not the chosen worker ({}), exiting...",
847                chosen_worker
848            );
849            return;
850        }
851        tokio::pin!(data_input);
852        'emitting_parts: loop {
853            // At the beginning of our main loop, we determine the total size of
854            // inflight parts.
855            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
856
857            // There are 2 main cases where we can continue to emit parts:
858            // - The total emitted bytes is less than `flow_control_max_bytes`.
859            // - The output frontier is not beyond the `flow_control_frontier`
860            //
861            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
862            // block, as we wait for progress tracking to keep the `flow_control` frontier
863            // up-to-date. This is tested in unit-tests.
864            if inflight_bytes < flow_control_max_bytes
865                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
866            {
867                let (time, part, next_frontier) =
868                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
869                        (time, part, next_frontier)
870                    } else {
871                        match data_input.next().await {
872                            Some(Either::Right((time, part, frontier, next_frontier))) => {
873                                // Downgrade the output frontier to this part's time. This is useful
874                                // "close" timestamp's from previous parts, even if we don't yet
875                                // emit this part. Note that this is safe because `data_input` ensures
876                                // time-ordering.
877                                output_frontier = frontier;
878                                cap_set.downgrade(output_frontier.iter());
879
880                                // If the most recent value's time is _beyond_ the
881                                // `flow_control` frontier (which takes into account the `summary`), we
882                                // have emitted an entire `summary` worth of data, and can store this
883                                // value for later.
884                                if inflight_bytes >= flow_control_max_bytes
885                                    && !PartialOrder::less_than(
886                                        &output_frontier,
887                                        &flow_control_frontier,
888                                    )
889                                {
890                                    pending_parts.push_back((time, part, next_frontier));
891                                    continue 'emitting_parts;
892                                }
893                                (time, part, next_frontier)
894                            }
895                            Some(Either::Left(prog)) => {
896                                output_frontier = prog;
897                                cap_set.downgrade(output_frontier.iter());
898                                continue 'emitting_parts;
899                            }
900                            None => {
901                                if pending_parts.is_empty() {
902                                    break 'emitting_parts;
903                                } else {
904                                    continue 'emitting_parts;
905                                }
906                            }
907                        }
908                    };
909
910                let byte_size = part.byte_size();
911                // Store the value with the _frontier_ the `flow_control_input` must reach
912                // to retire it. Note that if this `results_in` is `None`, then we
913                // are at `T::MAX`, and give up on flow_control entirely.
914                //
915                // SUBTLE: If we stop storing these parts, we will likely never check the
916                // `flow_control_input` ever again. This won't pile up data as that input
917                // only has frontier updates. There may be spurious activations from it though.
918                //
919                // Also note that we don't attempt to handle overflowing the `u64` part counter.
920                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
921                    inflight_parts.push((emission_ts, byte_size));
922                }
923
924                // Emit the data at the given time, and update the frontier and capabilities
925                // to just beyond the part.
926                data_output.give(&cap_set.delayed(&time), part);
927
928                if let Some(metrics) = &metrics {
929                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
930                }
931
932                output_frontier = next_frontier;
933                cap_set.downgrade(output_frontier.iter())
934            } else {
935                if let Some(metrics) = &metrics {
936                    metrics
937                        .last_backpressured_bytes
938                        .set(u64::cast_from(inflight_bytes))
939                }
940                let parts_count = inflight_parts.len();
941                // We've exhausted our budget, listen for updates to the flow_control
942                // input's frontier until we free up new budget. If we don't interact with
943                // with this side of the if statement, because the stream has no data, we
944                // don't cause unbounded buffering in timely.
945                let new_flow_control_frontier = match flow_control_input.next().await {
946                    Some(Event::Progress(frontier)) => frontier,
947                    Some(Event::Data(_, _)) => {
948                        unreachable!("flow_control_input should not contain data")
949                    }
950                    None => Antichain::new(),
951                };
952
953                // Update the `flow_control_frontier` if its advanced.
954                flow_control_frontier.clone_from(&new_flow_control_frontier);
955
956                // Retire parts that are processed downstream.
957                let retired_parts = inflight_parts
958                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
959                let (retired_size, retired_count): (usize, usize) = retired_parts
960                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
961                        (accum_size + size, accum_count + 1)
962                    });
963                trace!(
964                    "returning {} parts with {} bytes, frontier: {:?}",
965                    retired_count, retired_size, flow_control_frontier,
966                );
967
968                if let Some(metrics) = &metrics {
969                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
970                }
971
972                // Optionally emit some information for tests to examine.
973                if let Some(probe) = probe.as_ref() {
974                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
975                }
976            }
977        }
978    });
979    (data_stream, shutdown_button.press_on_drop())
980}
981
982#[cfg(test)]
983mod tests {
984    use timely::container::CapacityContainerBuilder;
985    use timely::dataflow::operators::{Enter, Probe};
986    use tokio::sync::mpsc::unbounded_channel;
987    use tokio::sync::oneshot;
988
989    use super::*;
990
991    #[mz_ore::test]
992    fn test_backpressure_non_granular() {
993        use Step::*;
994        backpressure_runner(
995            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
996            100,
997            (1, Subtime(0)),
998            vec![
999                // Assert we backpressure only after we have emitted
1000                // the entire timestamp.
1001                AssertOutputFrontier((50, Subtime(2))),
1002                AssertBackpressured {
1003                    frontier: (1, Subtime(0)),
1004                    inflight_parts: 1,
1005                    retired_parts: 0,
1006                },
1007                AssertBackpressured {
1008                    frontier: (51, Subtime(0)),
1009                    inflight_parts: 1,
1010                    retired_parts: 0,
1011                },
1012                ProcessXParts(2),
1013                AssertBackpressured {
1014                    frontier: (101, Subtime(0)),
1015                    inflight_parts: 2,
1016                    retired_parts: 2,
1017                },
1018                // Assert we make later progress once processing
1019                // the parts.
1020                AssertOutputFrontier((100, Subtime(3))),
1021            ],
1022            true,
1023        );
1024
1025        backpressure_runner(
1026            vec![
1027                (50, Part(10)),
1028                (50, Part(10)),
1029                (51, Part(100)),
1030                (52, Part(1000)),
1031            ],
1032            50,
1033            (1, Subtime(0)),
1034            vec![
1035                // Assert we backpressure only after we emitted enough bytes
1036                AssertOutputFrontier((51, Subtime(3))),
1037                AssertBackpressured {
1038                    frontier: (1, Subtime(0)),
1039                    inflight_parts: 3,
1040                    retired_parts: 0,
1041                },
1042                ProcessXParts(3),
1043                AssertBackpressured {
1044                    frontier: (52, Subtime(0)),
1045                    inflight_parts: 3,
1046                    retired_parts: 2,
1047                },
1048                AssertBackpressured {
1049                    frontier: (53, Subtime(0)),
1050                    inflight_parts: 1,
1051                    retired_parts: 1,
1052                },
1053                // Assert we make later progress once processing
1054                // the parts.
1055                AssertOutputFrontier((52, Subtime(4))),
1056            ],
1057            true,
1058        );
1059
1060        backpressure_runner(
1061            vec![
1062                (50, Part(98)),
1063                (50, Part(1)),
1064                (51, Part(10)),
1065                (52, Part(100)),
1066                // Additional parts at the same timestamp
1067                (52, Part(10)),
1068                (52, Part(10)),
1069                (52, Part(10)),
1070                (52, Part(100)),
1071                // A later part with a later ts.
1072                (100, Part(100)),
1073            ],
1074            100,
1075            (1, Subtime(0)),
1076            vec![
1077                AssertOutputFrontier((51, Subtime(3))),
1078                // Assert we backpressure after we have emitted enough bytes.
1079                // We assert twice here because we get updates as
1080                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1081                AssertBackpressured {
1082                    frontier: (1, Subtime(0)),
1083                    inflight_parts: 3,
1084                    retired_parts: 0,
1085                },
1086                AssertBackpressured {
1087                    frontier: (51, Subtime(0)),
1088                    inflight_parts: 3,
1089                    retired_parts: 0,
1090                },
1091                ProcessXParts(1),
1092                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1093                // 50.
1094                AssertOutputFrontier((51, Subtime(3))),
1095                // After we process all of `50`, we can start emitting data at `52`, but only until
1096                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1097                // all of `51`.
1098                ProcessXParts(1),
1099                AssertOutputFrontier((52, Subtime(4))),
1100                AssertBackpressured {
1101                    frontier: (52, Subtime(0)),
1102                    inflight_parts: 3,
1103                    retired_parts: 2,
1104                },
1105                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1106                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1107                // time `100`.
1108                ProcessXParts(1),
1109                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1110                // parts at `52`
1111                // This is an intermediate state.
1112                AssertBackpressured {
1113                    frontier: (53, Subtime(0)),
1114                    inflight_parts: 2,
1115                    retired_parts: 1,
1116                },
1117                // After we process all of `52`, we can continue to the next time.
1118                ProcessXParts(5),
1119                AssertBackpressured {
1120                    frontier: (101, Subtime(0)),
1121                    inflight_parts: 5,
1122                    retired_parts: 5,
1123                },
1124                AssertOutputFrontier((100, Subtime(9))),
1125            ],
1126            true,
1127        );
1128    }
1129
1130    #[mz_ore::test]
1131    fn test_backpressure_granular() {
1132        use Step::*;
1133        backpressure_runner(
1134            vec![(50, Part(101)), (50, Part(101))],
1135            100,
1136            (0, Subtime(1)),
1137            vec![
1138                // Advance our frontier to outputting a single part.
1139                AssertOutputFrontier((50, Subtime(1))),
1140                // Receive backpressure updates until our frontier is up-to-date but
1141                // not beyond the parts (while considering the summary).
1142                AssertBackpressured {
1143                    frontier: (0, Subtime(1)),
1144                    inflight_parts: 1,
1145                    retired_parts: 0,
1146                },
1147                AssertBackpressured {
1148                    frontier: (50, Subtime(1)),
1149                    inflight_parts: 1,
1150                    retired_parts: 0,
1151                },
1152                // Process that part.
1153                ProcessXParts(1),
1154                // Assert that we clear the backpressure status
1155                AssertBackpressured {
1156                    frontier: (50, Subtime(2)),
1157                    inflight_parts: 1,
1158                    retired_parts: 1,
1159                },
1160                // Ensure we make progress to the next part.
1161                AssertOutputFrontier((50, Subtime(2))),
1162            ],
1163            false,
1164        );
1165
1166        backpressure_runner(
1167            vec![
1168                (50, Part(10)),
1169                (50, Part(10)),
1170                (51, Part(35)),
1171                (52, Part(100)),
1172            ],
1173            50,
1174            (0, Subtime(1)),
1175            vec![
1176                // we can emit 3 parts before we hit the backpressure limit
1177                AssertOutputFrontier((51, Subtime(3))),
1178                AssertBackpressured {
1179                    frontier: (0, Subtime(1)),
1180                    inflight_parts: 3,
1181                    retired_parts: 0,
1182                },
1183                AssertBackpressured {
1184                    frontier: (50, Subtime(1)),
1185                    inflight_parts: 3,
1186                    retired_parts: 0,
1187                },
1188                // Retire the single part.
1189                ProcessXParts(1),
1190                AssertBackpressured {
1191                    frontier: (50, Subtime(2)),
1192                    inflight_parts: 3,
1193                    retired_parts: 1,
1194                },
1195                // Ensure we make progress, and then
1196                // can retire the next 2 parts.
1197                AssertOutputFrontier((52, Subtime(4))),
1198                ProcessXParts(2),
1199                AssertBackpressured {
1200                    frontier: (52, Subtime(4)),
1201                    inflight_parts: 3,
1202                    retired_parts: 2,
1203                },
1204            ],
1205            false,
1206        );
1207    }
1208
1209    type Time = (u64, Subtime);
1210    #[derive(Clone, Debug)]
1211    struct Part(usize);
1212    impl Backpressureable for Part {
1213        fn byte_size(&self) -> usize {
1214            self.0
1215        }
1216    }
1217
1218    /// Actions taken by `backpressure_runner`.
1219    enum Step {
1220        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1221        /// this far. This is a single time because we assume
1222        AssertOutputFrontier(Time),
1223        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1224        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1225        /// retired.
1226        AssertBackpressured {
1227            frontier: Time,
1228            inflight_parts: usize,
1229            retired_parts: usize,
1230        },
1231        /// Process X parts in the downstream operator. This affects the feedback frontier.
1232        ProcessXParts(usize),
1233    }
1234
1235    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1236    fn backpressure_runner(
1237        // The input data to the `backpressure` operator
1238        input: Vec<(u64, Part)>,
1239        // The maximum inflight bytes the `backpressure` operator allows through.
1240        max_inflight_bytes: usize,
1241        // The feedback summary used by the `backpressure` operator.
1242        summary: Time,
1243        // List of steps to run through.
1244        steps: Vec<Step>,
1245        // Whether or not to consume records in the non-granular scope. This is useful when the
1246        // `summary` is something like `(1, 0)`.
1247        non_granular_consumer: bool,
1248    ) {
1249        timely::execute::execute_directly(move |worker| {
1250            let (
1251                backpressure_probe,
1252                consumer_tx,
1253                mut backpressure_status_rx,
1254                finalizer_tx,
1255                _token,
1256            ) =
1257                // Set up the top-level non-granular scope.
1258                worker.dataflow::<u64, _, _>(|scope| {
1259                    let (non_granular_feedback_handle, non_granular_feedback) =
1260                        if non_granular_consumer {
1261                            let (h, f) = scope.feedback(Default::default());
1262                            (Some(h), Some(f))
1263                        } else {
1264                            (None, None)
1265                        };
1266                    let (
1267                        backpressure_probe,
1268                        consumer_tx,
1269                        backpressure_status_rx,
1270                        token,
1271                        backpressured,
1272                        finalizer_tx,
1273                    ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1274                        let (input, finalizer_tx) =
1275                            iterator_operator(scope.clone(), input.into_iter());
1276
1277                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1278                            (
1279                                FlowControl {
1280                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1281                                    max_inflight_bytes,
1282                                    summary,
1283                                    metrics: None
1284                                },
1285                                None,
1286                            )
1287                        } else {
1288                            let (granular_feedback_handle, granular_feedback) =
1289                                scope.feedback(Default::default());
1290                            (
1291                                FlowControl {
1292                                    progress_stream: granular_feedback,
1293                                    max_inflight_bytes,
1294                                    summary,
1295                                    metrics: None,
1296                                },
1297                                Some(granular_feedback_handle),
1298                            )
1299                        };
1300
1301                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1302
1303                        let (backpressured, token) = backpressure(
1304                            scope,
1305                            "test",
1306                            input,
1307                            flow_control,
1308                            0,
1309                            Some(backpressure_status_tx),
1310                        );
1311
1312                        // If we want to granularly consume the output, we setup the consumer here.
1313                        let tx = if !non_granular_consumer {
1314                            Some(consumer_operator(
1315                                scope.clone(),
1316                                backpressured.clone(),
1317                                granular_feedback_handle.unwrap(),
1318                            ))
1319                        } else {
1320                            None
1321                        };
1322
1323                        let (probe_handle, backpressured) = backpressured.probe();
1324                        (
1325                            probe_handle,
1326                            tx,
1327                            backpressure_status_rx,
1328                            token,
1329                            backpressured.leave(),
1330                            finalizer_tx,
1331                        )
1332                    });
1333
1334                    // If we want to non-granularly consume the output, we setup the consumer here.
1335                    let consumer_tx = if non_granular_consumer {
1336                        consumer_operator(
1337                            scope.clone(),
1338                            backpressured,
1339                            non_granular_feedback_handle.unwrap(),
1340                        )
1341                    } else {
1342                        consumer_tx.unwrap()
1343                    };
1344
1345                    (
1346                        backpressure_probe,
1347                        consumer_tx,
1348                        backpressure_status_rx,
1349                        finalizer_tx,
1350                        token,
1351                    )
1352                });
1353
1354            use Step::*;
1355            for step in steps {
1356                match step {
1357                    AssertOutputFrontier(time) => {
1358                        eprintln!("checking advance to {time:?}");
1359                        backpressure_probe.with_frontier(|front| {
1360                            eprintln!("current backpressure output frontier: {front:?}");
1361                        });
1362                        while backpressure_probe.less_than(&time) {
1363                            worker.step();
1364                            backpressure_probe.with_frontier(|front| {
1365                                eprintln!("current backpressure output frontier: {front:?}");
1366                            });
1367                            std::thread::sleep(std::time::Duration::from_millis(25));
1368                        }
1369                    }
1370                    ProcessXParts(parts) => {
1371                        eprintln!("processing {parts:?} parts");
1372                        for _ in 0..parts {
1373                            consumer_tx.send(()).unwrap();
1374                        }
1375                    }
1376                    AssertBackpressured {
1377                        frontier,
1378                        inflight_parts,
1379                        retired_parts,
1380                    } => {
1381                        let frontier = Antichain::from_elem(frontier);
1382                        eprintln!(
1383                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1384                            and {retired_parts:?} retired"
1385                        );
1386                        let (new_frontier, new_count, new_retired_count) = loop {
1387                            if let Ok(val) = backpressure_status_rx.try_recv() {
1388                                break val;
1389                            }
1390                            worker.step();
1391                            std::thread::sleep(std::time::Duration::from_millis(25));
1392                        };
1393                        assert_eq!(
1394                            (frontier, inflight_parts, retired_parts),
1395                            (new_frontier, new_count, new_retired_count)
1396                        );
1397                    }
1398                }
1399            }
1400            // Send the input to the empty frontier.
1401            let _ = finalizer_tx.send(());
1402        });
1403    }
1404
1405    /// An operator that emits `Part`'s at the specified timestamps. Does not
1406    /// drop its capability until it gets a signal from the `Sender` it returns.
1407    fn iterator_operator<
1408        G: Scope<Timestamp = (u64, Subtime)>,
1409        I: Iterator<Item = (u64, Part)> + 'static,
1410    >(
1411        scope: G,
1412        mut input: I,
1413    ) -> (StreamVec<G, Part>, oneshot::Sender<()>) {
1414        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1415        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1416        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1417
1418        iterator.build(|mut caps| async move {
1419            let mut capability = Some(caps.pop().unwrap());
1420            let mut last = None;
1421            while let Some(element) = input.next() {
1422                let time = element.0.clone();
1423                let part = element.1;
1424                last = Some((time, Subtime(0)));
1425                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1426            }
1427            if let Some(last) = last {
1428                capability
1429                    .as_mut()
1430                    .unwrap()
1431                    .downgrade(&(last.0 + 1, last.1));
1432            }
1433
1434            let _ = finalizer_rx.await;
1435            capability.take();
1436        });
1437
1438        (output, finalizer_tx)
1439    }
1440
1441    /// An operator that consumes its input ONLY when given a signal to do from
1442    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1443    /// being processed. Also connects the `feedback` handle to its output.
1444    fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1445        scope: G,
1446        input: StreamVec<G, O>,
1447        feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1448    ) -> UnboundedSender<()> {
1449        let (tx, mut rx) = unbounded_channel::<()>();
1450        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1451        let (output_handle, output) =
1452            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1453        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1454
1455        consumer.build(|_caps| async move {
1456            while let Some(()) = rx.recv().await {
1457                // Consume exactly one messages (unless the input is exhausted).
1458                while let Some(Event::Progress(_)) = input.next().await {}
1459            }
1460        });
1461        output.connect_loop(feedback);
1462
1463        tx
1464    }
1465}