Skip to main content

mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, EvalError, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_ore::str::redact;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31    ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_client::stats::STATS_AUDIT_PANIC;
34use mz_persist_types::Codec64;
35use mz_persist_types::codec_impls::UnitSchema;
36use mz_persist_types::columnar::{ColumnEncoder, Schema};
37use mz_repr::{
38    Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
39};
40use mz_storage_types::StorageDiff;
41use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
42use mz_storage_types::errors::DataflowError;
43use mz_storage_types::sources::SourceData;
44use mz_storage_types::stats::RelationPartStats;
45use mz_timely_util::builder_async::{
46    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
47};
48use mz_timely_util::probe::ProbeNotify;
49use mz_txn_wal::operator::{TxnsContext, txns_progress};
50use serde::{Deserialize, Serialize};
51use timely::PartialOrder;
52use timely::container::CapacityContainerBuilder;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::{Scope, Stream, StreamVec};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::{error, trace};
66
67use crate::metrics::BackpressureMetrics;
68
69/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
70/// progress than would otherwise be possible.
71///
72/// This is "opaque" since we'd like to reserve the right to change the definition in the future
73/// without downstreams being able to rely on the precise representation. (At the moment, this
74/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
75/// in the future.)
76#[derive(
77    Copy,
78    Clone,
79    PartialEq,
80    Default,
81    Eq,
82    PartialOrd,
83    Ord,
84    Debug,
85    Serialize,
86    Deserialize,
87    Hash
88)]
89pub struct Subtime(u64);
90
91impl PartialOrder for Subtime {
92    fn less_equal(&self, other: &Self) -> bool {
93        self.0.less_equal(&other.0)
94    }
95}
96
97impl TotalOrder for Subtime {}
98
99impl PathSummary<Subtime> for Subtime {
100    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
101        self.0.results_in(&src.0).map(Subtime)
102    }
103
104    fn followed_by(&self, other: &Self) -> Option<Self> {
105        self.0.followed_by(&other.0).map(Subtime)
106    }
107}
108
109impl TimelyTimestamp for Subtime {
110    type Summary = Subtime;
111
112    fn minimum() -> Self {
113        Subtime(0)
114    }
115}
116
117impl columnation::Columnation for Subtime {
118    type InnerRegion = columnation::CopyRegion<Subtime>;
119}
120
121impl differential_dataflow::lattice::Lattice for Subtime {
122    fn join(&self, other: &Self) -> Self {
123        Subtime(std::cmp::max(self.0, other.0))
124    }
125    fn meet(&self, other: &Self) -> Self {
126        Subtime(std::cmp::min(self.0, other.0))
127    }
128}
129
130impl differential_dataflow::lattice::Maximum for Subtime {
131    fn maximum() -> Self {
132        Subtime(u64::MAX)
133    }
134}
135
136impl Subtime {
137    /// The smallest non-zero summary for the opaque timestamp type.
138    pub const fn least_summary() -> Self {
139        Subtime(1)
140    }
141}
142
143/// Creates a new source that reads from a persist shard, distributing the work
144/// of reading data to all timely workers.
145///
146/// All times emitted will have been [advanced by] the given `as_of` frontier.
147/// All updates at times greater or equal to `until` will be suppressed.
148/// The `map_filter_project` argument, if supplied, may be partially applied,
149/// and any un-applied part of the argument will be left behind in the argument.
150///
151/// Users of this function have the ability to apply flow control to the output
152/// to limit the in-flight data (measured in bytes) it can emit. The flow control
153/// input is a timely stream that communicates the frontier at which the data
154/// emitted from by this source have been dropped.
155///
156/// **Note:** Because this function is reading batches from `persist`, it is working
157/// at batch granularity. In practice, the source will be overshooting the target
158/// flow control upper by an amount that is related to the size of batches.
159///
160/// If no flow control is desired an empty stream whose frontier immediately advances
161/// to the empty antichain can be used. An easy easy of creating such stream is by
162/// using [`timely::dataflow::operators::generic::operator::empty`].
163///
164/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
165pub fn persist_source<'scope, E>(
166    scope: Scope<'scope, mz_repr::Timestamp>,
167    source_id: GlobalId,
168    persist_clients: Arc<PersistClientCache>,
169    txns_ctx: &TxnsContext,
170    metadata: CollectionMetadata,
171    read_schema: Option<RelationDesc>,
172    as_of: Option<Antichain<Timestamp>>,
173    snapshot_mode: SnapshotMode,
174    until: Antichain<Timestamp>,
175    map_filter_project: Option<&mut MfpPlan>,
176    max_inflight_bytes: Option<usize>,
177    start_signal: impl Future<Output = ()> + Send + 'static,
178    error_handler: ErrorHandler,
179) -> (
180    StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
181    StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
182    Vec<PressOnDropButton>,
183)
184where
185    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
186{
187    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
188
189    let mut tokens = vec![];
190
191    let outer = scope.clone();
192    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
193        let (flow_control, flow_control_probe) = match max_inflight_bytes {
194            Some(max_inflight_bytes) => {
195                let backpressure_metrics = BackpressureMetrics {
196                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
197                    last_backpressured_bytes: Arc::clone(
198                        &shard_metrics.backpressure_last_backpressured_bytes,
199                    ),
200                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
201                };
202
203                let probe = mz_timely_util::probe::Handle::default();
204                let progress_stream = mz_timely_util::probe::source(
205                    scope.clone(),
206                    format!("decode_backpressure_probe({source_id})"),
207                    probe.clone(),
208                );
209                let flow_control = FlowControl {
210                    progress_stream,
211                    max_inflight_bytes,
212                    summary: (Default::default(), Subtime::least_summary()),
213                    metrics: Some(backpressure_metrics),
214                };
215                (Some(flow_control), Some(probe))
216            }
217            None => (None, None),
218        };
219
220        // Our default listen sleeps are tuned for the case of a shard that is
221        // written once a second, but txn-wal allows these to be lazy.
222        // Override the tuning to reduce crdb load. The pubsub fallback
223        // responsibility is then replaced by manual "one state" wakeups in the
224        // txns_progress operator.
225        let cfg = Arc::clone(&persist_clients.cfg().configs);
226        let subscribe_sleep = match metadata.txns_shard {
227            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
228            None => None,
229        };
230
231        let (stream, source_tokens) = persist_source_core(
232            outer,
233            scope,
234            source_id,
235            Arc::clone(&persist_clients),
236            metadata.clone(),
237            read_schema,
238            as_of.clone(),
239            snapshot_mode,
240            until.clone(),
241            map_filter_project,
242            flow_control,
243            subscribe_sleep,
244            start_signal,
245            error_handler,
246        );
247        tokens.extend(source_tokens);
248
249        let stream = match flow_control_probe {
250            Some(probe) => stream.probe_notify_with(vec![probe]),
251            None => stream,
252        };
253
254        stream.leave(outer)
255    });
256
257    // If a txns_shard was provided, then this shard is in the txn-wal
258    // system. This means the "logical" upper may be ahead of the "physical"
259    // upper. Render a dataflow operator that passes through the input and
260    // translates the progress frontiers as necessary.
261    let (stream, txns_tokens) = match metadata.txns_shard {
262        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
263            stream,
264            &source_id.to_string(),
265            txns_ctx,
266            move || {
267                let (c, l) = (
268                    Arc::clone(&persist_clients),
269                    metadata.persist_location.clone(),
270                );
271                async move { c.open(l).await.expect("location is valid") }
272            },
273            txns_shard,
274            metadata.data_shard,
275            as_of
276                .expect("as_of is provided for table sources")
277                .into_option()
278                .expect("shard is not closed"),
279            until,
280            Arc::new(metadata.relation_desc),
281            Arc::new(UnitSchema),
282        ),
283        None => (stream, vec![]),
284    };
285    tokens.extend(txns_tokens);
286    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
287        Ok(row) => Ok((row, t.0, r)),
288        Err(err) => Err((err, t.0, r)),
289    });
290    (ok_stream, err_stream, tokens)
291}
292
293type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
294
295/// Creates a new source that reads from a persist shard, distributing the work
296/// of reading data to all timely workers.
297///
298/// All times emitted will have been [advanced by] the given `as_of` frontier.
299///
300/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
301#[allow(clippy::needless_borrow)]
302pub fn persist_source_core<'g, 'outer, E>(
303    outer: Scope<'outer, mz_repr::Timestamp>,
304    scope: RefinedScope<'g, mz_repr::Timestamp>,
305    source_id: GlobalId,
306    persist_clients: Arc<PersistClientCache>,
307    metadata: CollectionMetadata,
308    read_schema: Option<RelationDesc>,
309    as_of: Option<Antichain<Timestamp>>,
310    snapshot_mode: SnapshotMode,
311    until: Antichain<Timestamp>,
312    map_filter_project: Option<&mut MfpPlan>,
313    flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
314    // If Some, an override for the default listen sleep retry parameters.
315    listen_sleep: Option<impl Fn() -> RetryParameters + Send + 'static>,
316    start_signal: impl Future<Output = ()> + Send + 'static,
317    error_handler: ErrorHandler,
318) -> (
319    Stream<
320        'g,
321        (mz_repr::Timestamp, Subtime),
322        Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
323    >,
324    Vec<PressOnDropButton>,
325)
326where
327    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
328{
329    let cfg = persist_clients.cfg().clone();
330    let name = source_id.to_string();
331    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
332
333    // N.B. `read_schema` may be a subset of the total columns for this shard.
334    let read_desc = match read_schema {
335        Some(desc) => desc,
336        None => metadata.relation_desc,
337    };
338
339    let desc_transformer = match flow_control {
340        Some(flow_control) => Some(move |scope, descs, chosen_worker| {
341            let (stream, token) = backpressure(
342                scope,
343                &format!("backpressure({source_id})"),
344                descs,
345                flow_control,
346                chosen_worker,
347                None,
348            );
349            (stream, vec![token])
350        }),
351        None => None,
352    };
353
354    let metrics = Arc::clone(persist_clients.metrics());
355    let filter_name = name.clone();
356    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
357    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
358    // `until ` is the empty antichain.
359    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
360    let (fetched, token) = shard_source(
361        outer,
362        scope,
363        &name,
364        move || {
365            let (c, l) = (
366                Arc::clone(&persist_clients),
367                metadata.persist_location.clone(),
368            );
369            async move { c.open(l).await.unwrap() }
370        },
371        metadata.data_shard,
372        as_of,
373        snapshot_mode,
374        until.clone(),
375        desc_transformer,
376        Arc::new(read_desc.clone()),
377        Arc::new(UnitSchema),
378        move |stats, frontier| {
379            let Some(lower) = frontier.as_option().copied() else {
380                // If the frontier has advanced to the empty antichain,
381                // we'll never emit any rows from any part.
382                return FilterResult::Discard;
383            };
384
385            if lower > upper {
386                // The frontier timestamp is larger than the until of the dataflow:
387                // anything from this part will necessarily be filtered out.
388                return FilterResult::Discard;
389            }
390
391            let time_range =
392                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
393            if let Some(plan) = &filter_plan {
394                let metrics = &metrics.pushdown.part_stats;
395                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
396                filter_result(&read_desc, time_range, stats, plan)
397            } else {
398                FilterResult::Keep
399            }
400        },
401        listen_sleep,
402        start_signal,
403        error_handler,
404    );
405    let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
406    (rows, token)
407}
408
409fn filter_result(
410    relation_desc: &RelationDesc,
411    time_range: ResultSpec,
412    stats: RelationPartStats,
413    plan: &MfpPlan,
414) -> FilterResult {
415    let arena = RowArena::new();
416    let relation = ReprRelationType::from(relation_desc.typ());
417    let mut ranges = ColumnSpecs::new(&relation, &arena);
418    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
419
420    let may_error = stats.err_count().map_or(true, |count| count > 0);
421
422    // N.B. We may have pushed down column "demands" into Persist, so this
423    // relation desc may have a different set of columns than the stats.
424    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
425        let result_spec = stats.col_stats(idx, &arena);
426        ranges.push_column(pos, result_spec);
427    }
428    let result = ranges.mfp_plan_filter(plan).range;
429    let may_error = may_error || result.may_fail();
430    let may_keep = result.may_contain(Datum::True);
431    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
432    if relation_desc.len() == 0 && !may_error && !may_skip {
433        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
434            return FilterResult::Keep;
435        };
436        key.append(&SourceData(Ok(Row::default())));
437        let key = key.finish();
438        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
439            return FilterResult::Keep;
440        };
441        val.append(&());
442        let val = val.finish();
443
444        FilterResult::ReplaceWith {
445            key: Arc::new(key),
446            val: Arc::new(val),
447        }
448    } else if may_error || may_keep {
449        FilterResult::Keep
450    } else {
451        FilterResult::Discard
452    }
453}
454
455pub fn decode_and_mfp<'scope, E>(
456    cfg: PersistConfig,
457    fetched: StreamVec<
458        'scope,
459        (mz_repr::Timestamp, Subtime),
460        FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
461    >,
462    name: &str,
463    until: Antichain<Timestamp>,
464    mut map_filter_project: Option<&mut MfpPlan>,
465) -> StreamVec<
466    'scope,
467    (mz_repr::Timestamp, Subtime),
468    (Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff),
469>
470where
471    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
472{
473    let scope = fetched.scope();
474    let mut builder = OperatorBuilder::new(
475        format!("persist_source::decode_and_mfp({})", name),
476        scope.clone(),
477    );
478    let operator_info = builder.operator_info();
479
480    let mut fetched_input = builder.new_input(fetched, Pipeline);
481    let (updates_output, updates_stream) = builder.new_output();
482    let mut updates_output = OutputBuilder::from(updates_output);
483
484    // Re-used state for processing and building rows.
485    let mut datum_vec = mz_repr::DatumVec::new();
486    let mut row_builder = Row::default();
487
488    // Extract the MFP if it exists; leave behind an identity MFP in that case.
489    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
490
491    builder.build(move |_caps| {
492        let name = name.to_owned();
493        // Acquire an activator to reschedule the operator when it has unfinished work.
494        let activations = scope.activations();
495        let activator = Activator::new(operator_info.address, activations);
496        // Maintain a list of work to do
497        let mut pending_work = std::collections::VecDeque::new();
498        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
499
500        move |_frontier| {
501            fetched_input.for_each(|time, data| {
502                let capability = time.retain(0);
503                for fetched_blob in data.drain(..) {
504                    pending_work.push_back(PendingWork {
505                        panic_on_audit_failure: panic_on_audit_failure.get(),
506                        capability: capability.clone(),
507                        part: PendingPart::Unparsed(fetched_blob),
508                    })
509                }
510            });
511
512            // Get dyncfg values once per schedule to amortize the cost of
513            // loading the atomics.
514            let yield_fuel = cfg.storage_source_decode_fuel();
515            let yield_fn = |_, work| work >= yield_fuel;
516
517            let mut work = 0;
518            let start_time = Instant::now();
519            let mut output = updates_output.activate();
520            while !pending_work.is_empty() && !yield_fn(start_time, work) {
521                let done = pending_work.front_mut().unwrap().do_work(
522                    &mut work,
523                    &name,
524                    start_time,
525                    yield_fn,
526                    &until,
527                    map_filter_project.as_ref(),
528                    &mut datum_vec,
529                    &mut row_builder,
530                    &mut output,
531                );
532                if done {
533                    pending_work.pop_front();
534                }
535            }
536            if !pending_work.is_empty() {
537                activator.activate();
538            }
539        }
540    });
541
542    updates_stream
543}
544
545/// Pending work to read from fetched parts
546struct PendingWork {
547    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
548    panic_on_audit_failure: bool,
549    /// The time at which the work should happen.
550    capability: Capability<(mz_repr::Timestamp, Subtime)>,
551    /// Pending fetched part.
552    part: PendingPart,
553}
554
555enum PendingPart {
556    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
557    Parsed {
558        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
559    },
560}
561
562impl PendingPart {
563    /// Returns the contained `FetchedPart`, first parsing it from a
564    /// `FetchedBlob` if necessary.
565    ///
566    /// Also returns a bool, which is true if the part is known (from pushdown
567    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
568    /// is known to contain errors or if it's unknown.
569    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
570        match self {
571            PendingPart::Unparsed(x) => {
572                *self = PendingPart::Parsed { part: x.parse() };
573                // Won't recurse any further.
574                self.part_mut()
575            }
576            PendingPart::Parsed { part } => &mut part.part,
577        }
578    }
579}
580
581impl PendingWork {
582    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
583    /// `yield_fn` whether more fuel is available.
584    fn do_work<YFn, E>(
585        &mut self,
586        work: &mut usize,
587        name: &str,
588        start_time: Instant,
589        yield_fn: YFn,
590        until: &Antichain<Timestamp>,
591        map_filter_project: Option<&MfpPlan>,
592        datum_vec: &mut DatumVec,
593        row_builder: &mut Row,
594        output: &mut OutputBuilderSession<
595            '_,
596            (mz_repr::Timestamp, Subtime),
597            ConsolidatingContainerBuilder<
598                Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
599            >,
600        >,
601    ) -> bool
602    where
603        YFn: Fn(Instant, usize) -> bool,
604        E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
605    {
606        let mut session = output.session_with_builder(&self.capability);
607        let fetched_part = self.part.part_mut();
608        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
609        let mut row_buf = None;
610        while let Some(((key, val), time, diff)) =
611            fetched_part.next_with_storage(&mut row_buf, &mut None)
612        {
613            if until.less_equal(&time) {
614                continue;
615            }
616            match (key, val) {
617                (SourceData(Ok(row)), ()) => {
618                    if let Some(mfp) = map_filter_project {
619                        // We originally accounted work as the number of outputs, to give downstream
620                        // operators a chance to reduce down anything we've emitted. This mfp call
621                        // might have a restrictive filter, which would have been counted as no
622                        // work. However, in practice, we've been decode_and_mfp be a source of
623                        // interactivity loss during rehydration, so we now also count each mfp
624                        // evaluation against our fuel.
625                        *work += 1;
626                        let arena = mz_repr::RowArena::new();
627                        let mut datums_local = datum_vec.borrow_with(&row);
628                        for result in mfp.evaluate(
629                            &mut datums_local,
630                            &arena,
631                            time,
632                            diff.into(),
633                            |time| !until.less_equal(time),
634                            row_builder,
635                        ) {
636                            // Earlier we decided this Part doesn't need to be fetched, but to
637                            // audit our logic we fetched it any way. If the MFP returned data it
638                            // means our earlier decision to not fetch this part was incorrect.
639                            if let Some(stats) = &is_filter_pushdown_audit {
640                                // NB: The tag added by this scope is used for alerting. The panic
641                                // message may be changed arbitrarily, but the tag key and val must
642                                // stay the same.
643                                sentry::with_scope(
644                                    |scope| {
645                                        scope
646                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
647                                    },
648                                    || {
649                                        error!(
650                                            ?stats,
651                                            name,
652                                            mfp = ?redact(&mfp),
653                                            result = ?redact(&result),
654                                            "persist filter pushdown correctness violation!"
655                                        );
656                                        if self.panic_on_audit_failure {
657                                            panic!(
658                                                "persist filter pushdown correctness violation! {}",
659                                                name
660                                            );
661                                        }
662                                    },
663                                );
664                            }
665                            match result {
666                                Ok((row, time, diff)) => {
667                                    // Additional `until` filtering due to temporal filters.
668                                    if !until.less_equal(&time) {
669                                        let mut emit_time = *self.capability.time();
670                                        emit_time.0 = time;
671                                        session.give((Ok(row), emit_time, diff));
672                                        *work += 1;
673                                    }
674                                }
675                                Err((err, time, diff)) => {
676                                    // Additional `until` filtering due to temporal filters.
677                                    if !until.less_equal(&time) {
678                                        let mut emit_time = *self.capability.time();
679                                        emit_time.0 = time;
680                                        session.give((Err(err), emit_time, diff));
681                                        *work += 1;
682                                    }
683                                }
684                            }
685                        }
686                        // At the moment, this is the only case where we can re-use the allocs for
687                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
688                        // operator used a different container than `Vec<Row>`.
689                        drop(datums_local);
690                        row_buf.replace(SourceData(Ok(row)));
691                    } else {
692                        let mut emit_time = *self.capability.time();
693                        emit_time.0 = time;
694                        // Clone row so we retain our row allocation.
695                        session.give((Ok(row.clone()), emit_time, diff.into()));
696                        row_buf.replace(SourceData(Ok(row)));
697                        *work += 1;
698                    }
699                }
700                (SourceData(Err(err)), ()) => {
701                    let mut emit_time = *self.capability.time();
702                    emit_time.0 = time;
703                    session.give((Err(E::from(err)), emit_time, diff.into()));
704                    *work += 1;
705                }
706            }
707            if yield_fn(start_time, *work) {
708                return false;
709            }
710        }
711        true
712    }
713}
714
715/// A trait representing a type that can be used in `backpressure`.
716pub trait Backpressureable: Clone + 'static {
717    /// Return the weight of the object, in bytes.
718    fn byte_size(&self) -> usize;
719}
720
721impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
722    fn byte_size(&self) -> usize {
723        self.1.encoded_size_bytes()
724    }
725}
726
727/// Flow control configuration.
728#[derive(Debug)]
729pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
730    /// Stream providing in-flight frontier updates.
731    ///
732    /// As implied by its type, this stream never emits data, only progress updates.
733    ///
734    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
735    pub progress_stream: StreamVec<'scope, T, Infallible>,
736    /// Maximum number of in-flight bytes.
737    pub max_inflight_bytes: usize,
738    /// The minimum range of timestamps (be they granular or not) that must be emitted,
739    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
740    pub summary: T::Summary,
741
742    /// Optional metrics for the `backpressure` operator to keep up-to-date.
743    pub metrics: Option<BackpressureMetrics>,
744}
745
746/// Apply flow control to the `data` input, based on the given `FlowControl`.
747///
748/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
749/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
750/// and a `summary`. Note that the `data` input expects all the second part of the tuple
751/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
752///
753/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
754/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
755/// `max_inflight_bytes`.
756///
757/// The implementation of this operator is very subtle. Many inline comments have been added.
758pub fn backpressure<'scope, T, O>(
759    scope: Scope<'scope, (T, Subtime)>,
760    name: &str,
761    data: StreamVec<'scope, (T, Subtime), O>,
762    flow_control: FlowControl<'scope, (T, Subtime)>,
763    chosen_worker: usize,
764    // A probe used to inspect this operator during unit-testing
765    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
766) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
767where
768    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
769    O: Backpressureable + std::fmt::Debug,
770{
771    let worker_index = scope.index();
772
773    let (flow_control_stream, flow_control_max_bytes, metrics) = (
774        flow_control.progress_stream,
775        flow_control.max_inflight_bytes,
776        flow_control.metrics,
777    );
778
779    // Both the `flow_control` input and the data input are disconnected from the output. We manually
780    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
781    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
782    // non-circular fashion.
783    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
784    flow_control_stream.connect_loop(handle);
785
786    let mut builder = AsyncOperatorBuilder::new(
787        format!("persist_source_backpressure({})", name),
788        scope.clone(),
789    );
790    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
791
792    let mut data_input = builder.new_disconnected_input(data, Pipeline);
793    let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
794
795    // Helper method used to synthesize current and next frontier for ordered times.
796    fn synthesize_frontiers<T: PartialOrder + Clone>(
797        mut frontier: Antichain<(T, Subtime)>,
798        mut time: (T, Subtime),
799        part_number: &mut u64,
800    ) -> (
801        (T, Subtime),
802        Antichain<(T, Subtime)>,
803        Antichain<(T, Subtime)>,
804    ) {
805        let mut next_frontier = frontier.clone();
806        time.1 = Subtime(*part_number);
807        frontier.insert(time.clone());
808        *part_number += 1;
809        let mut next_time = time.clone();
810        next_time.1 = Subtime(*part_number);
811        next_frontier.insert(next_time);
812        (time, frontier, next_frontier)
813    }
814
815    // _Refine_ the data stream by amending the second input with the part number. This also
816    // ensures that we order the parts by time.
817    let data_input = async_stream::stream!({
818        let mut part_number = 0;
819        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
820        loop {
821            match data_input.next().await {
822                None => {
823                    let empty = Antichain::new();
824                    parts.sort_by_key(|val| val.0.clone());
825                    for (part_time, d) in parts.drain(..) {
826                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
827                            empty.clone(),
828                            part_time.clone(),
829                            &mut part_number,
830                        );
831                        yield Either::Right((part_time, d, frontier, next_frontier))
832                    }
833                    break;
834                }
835                Some(Event::Data(time, data)) => {
836                    for d in data {
837                        parts.push((time.clone(), d));
838                    }
839                }
840                Some(Event::Progress(prog)) => {
841                    parts.sort_by_key(|val| val.0.clone());
842                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
843                        let (part_time, frontier, next_frontier) =
844                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
845                        yield Either::Right((part_time, d, frontier, next_frontier))
846                    }
847                    yield Either::Left(prog)
848                }
849            }
850        }
851    });
852    let shutdown_button = builder.build(move |caps| async move {
853        // The output capability.
854        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
855
856        // The frontier of our output. This matches the `CapabilitySet` above.
857        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
858        // The frontier of the `flow_control` input.
859        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
860
861        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
862        let mut inflight_parts = Vec::new();
863        // Parts we have not yet emitted, but do participate in the `input_frontier`.
864        let mut pending_parts = std::collections::VecDeque::new();
865
866        // Only one worker is responsible for distributing parts
867        if worker_index != chosen_worker {
868            trace!(
869                "We are not the chosen worker ({}), exiting...",
870                chosen_worker
871            );
872            return;
873        }
874        tokio::pin!(data_input);
875        'emitting_parts: loop {
876            // At the beginning of our main loop, we determine the total size of
877            // inflight parts.
878            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
879
880            // There are 2 main cases where we can continue to emit parts:
881            // - The total emitted bytes is less than `flow_control_max_bytes`.
882            // - The output frontier is not beyond the `flow_control_frontier`
883            //
884            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
885            // block, as we wait for progress tracking to keep the `flow_control` frontier
886            // up-to-date. This is tested in unit-tests.
887            if inflight_bytes < flow_control_max_bytes
888                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
889            {
890                let (time, part, next_frontier) =
891                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
892                        (time, part, next_frontier)
893                    } else {
894                        match data_input.next().await {
895                            Some(Either::Right((time, part, frontier, next_frontier))) => {
896                                // Downgrade the output frontier to this part's time. This is useful
897                                // "close" timestamp's from previous parts, even if we don't yet
898                                // emit this part. Note that this is safe because `data_input` ensures
899                                // time-ordering.
900                                output_frontier = frontier;
901                                cap_set.downgrade(output_frontier.iter());
902
903                                // If the most recent value's time is _beyond_ the
904                                // `flow_control` frontier (which takes into account the `summary`), we
905                                // have emitted an entire `summary` worth of data, and can store this
906                                // value for later.
907                                if inflight_bytes >= flow_control_max_bytes
908                                    && !PartialOrder::less_than(
909                                        &output_frontier,
910                                        &flow_control_frontier,
911                                    )
912                                {
913                                    pending_parts.push_back((time, part, next_frontier));
914                                    continue 'emitting_parts;
915                                }
916                                (time, part, next_frontier)
917                            }
918                            Some(Either::Left(prog)) => {
919                                output_frontier = prog;
920                                cap_set.downgrade(output_frontier.iter());
921                                continue 'emitting_parts;
922                            }
923                            None => {
924                                if pending_parts.is_empty() {
925                                    break 'emitting_parts;
926                                } else {
927                                    continue 'emitting_parts;
928                                }
929                            }
930                        }
931                    };
932
933                let byte_size = part.byte_size();
934                // Store the value with the _frontier_ the `flow_control_input` must reach
935                // to retire it. Note that if this `results_in` is `None`, then we
936                // are at `T::MAX`, and give up on flow_control entirely.
937                //
938                // SUBTLE: If we stop storing these parts, we will likely never check the
939                // `flow_control_input` ever again. This won't pile up data as that input
940                // only has frontier updates. There may be spurious activations from it though.
941                //
942                // Also note that we don't attempt to handle overflowing the `u64` part counter.
943                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
944                    inflight_parts.push((emission_ts, byte_size));
945                }
946
947                // Emit the data at the given time, and update the frontier and capabilities
948                // to just beyond the part.
949                data_output.give(&cap_set.delayed(&time), part);
950
951                if let Some(metrics) = &metrics {
952                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
953                }
954
955                output_frontier = next_frontier;
956                cap_set.downgrade(output_frontier.iter())
957            } else {
958                if let Some(metrics) = &metrics {
959                    metrics
960                        .last_backpressured_bytes
961                        .set(u64::cast_from(inflight_bytes))
962                }
963                let parts_count = inflight_parts.len();
964                // We've exhausted our budget, listen for updates to the flow_control
965                // input's frontier until we free up new budget. If we don't interact with
966                // with this side of the if statement, because the stream has no data, we
967                // don't cause unbounded buffering in timely.
968                let new_flow_control_frontier = match flow_control_input.next().await {
969                    Some(Event::Progress(frontier)) => frontier,
970                    Some(Event::Data(_, _)) => {
971                        unreachable!("flow_control_input should not contain data")
972                    }
973                    None => Antichain::new(),
974                };
975
976                // Update the `flow_control_frontier` if its advanced.
977                flow_control_frontier.clone_from(&new_flow_control_frontier);
978
979                // Retire parts that are processed downstream.
980                let retired_parts = inflight_parts
981                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
982                let (retired_size, retired_count): (usize, usize) = retired_parts
983                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
984                        (accum_size + size, accum_count + 1)
985                    });
986                trace!(
987                    "returning {} parts with {} bytes, frontier: {:?}",
988                    retired_count, retired_size, flow_control_frontier,
989                );
990
991                if let Some(metrics) = &metrics {
992                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
993                }
994
995                // Optionally emit some information for tests to examine.
996                if let Some(probe) = probe.as_ref() {
997                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
998                }
999            }
1000        }
1001    });
1002    (data_stream, shutdown_button.press_on_drop())
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use timely::container::CapacityContainerBuilder;
1008    use timely::dataflow::operators::{Enter, Probe};
1009    use tokio::sync::mpsc::unbounded_channel;
1010    use tokio::sync::oneshot;
1011
1012    use super::*;
1013
1014    #[mz_ore::test]
1015    fn test_backpressure_non_granular() {
1016        use Step::*;
1017        backpressure_runner(
1018            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1019            100,
1020            (1, Subtime(0)),
1021            vec![
1022                // Assert we backpressure only after we have emitted
1023                // the entire timestamp.
1024                AssertOutputFrontier((50, Subtime(2))),
1025                AssertBackpressured {
1026                    frontier: (1, Subtime(0)),
1027                    inflight_parts: 1,
1028                    retired_parts: 0,
1029                },
1030                AssertBackpressured {
1031                    frontier: (51, Subtime(0)),
1032                    inflight_parts: 1,
1033                    retired_parts: 0,
1034                },
1035                ProcessXParts(2),
1036                AssertBackpressured {
1037                    frontier: (101, Subtime(0)),
1038                    inflight_parts: 2,
1039                    retired_parts: 2,
1040                },
1041                // Assert we make later progress once processing
1042                // the parts.
1043                AssertOutputFrontier((100, Subtime(3))),
1044            ],
1045            true,
1046        );
1047
1048        backpressure_runner(
1049            vec![
1050                (50, Part(10)),
1051                (50, Part(10)),
1052                (51, Part(100)),
1053                (52, Part(1000)),
1054            ],
1055            50,
1056            (1, Subtime(0)),
1057            vec![
1058                // Assert we backpressure only after we emitted enough bytes
1059                AssertOutputFrontier((51, Subtime(3))),
1060                AssertBackpressured {
1061                    frontier: (1, Subtime(0)),
1062                    inflight_parts: 3,
1063                    retired_parts: 0,
1064                },
1065                ProcessXParts(3),
1066                AssertBackpressured {
1067                    frontier: (52, Subtime(0)),
1068                    inflight_parts: 3,
1069                    retired_parts: 2,
1070                },
1071                AssertBackpressured {
1072                    frontier: (53, Subtime(0)),
1073                    inflight_parts: 1,
1074                    retired_parts: 1,
1075                },
1076                // Assert we make later progress once processing
1077                // the parts.
1078                AssertOutputFrontier((52, Subtime(4))),
1079            ],
1080            true,
1081        );
1082
1083        backpressure_runner(
1084            vec![
1085                (50, Part(98)),
1086                (50, Part(1)),
1087                (51, Part(10)),
1088                (52, Part(100)),
1089                // Additional parts at the same timestamp
1090                (52, Part(10)),
1091                (52, Part(10)),
1092                (52, Part(10)),
1093                (52, Part(100)),
1094                // A later part with a later ts.
1095                (100, Part(100)),
1096            ],
1097            100,
1098            (1, Subtime(0)),
1099            vec![
1100                AssertOutputFrontier((51, Subtime(3))),
1101                // Assert we backpressure after we have emitted enough bytes.
1102                // We assert twice here because we get updates as
1103                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1104                AssertBackpressured {
1105                    frontier: (1, Subtime(0)),
1106                    inflight_parts: 3,
1107                    retired_parts: 0,
1108                },
1109                AssertBackpressured {
1110                    frontier: (51, Subtime(0)),
1111                    inflight_parts: 3,
1112                    retired_parts: 0,
1113                },
1114                ProcessXParts(1),
1115                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1116                // 50.
1117                AssertOutputFrontier((51, Subtime(3))),
1118                // After we process all of `50`, we can start emitting data at `52`, but only until
1119                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1120                // all of `51`.
1121                ProcessXParts(1),
1122                AssertOutputFrontier((52, Subtime(4))),
1123                AssertBackpressured {
1124                    frontier: (52, Subtime(0)),
1125                    inflight_parts: 3,
1126                    retired_parts: 2,
1127                },
1128                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1129                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1130                // time `100`.
1131                ProcessXParts(1),
1132                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1133                // parts at `52`
1134                // This is an intermediate state.
1135                AssertBackpressured {
1136                    frontier: (53, Subtime(0)),
1137                    inflight_parts: 2,
1138                    retired_parts: 1,
1139                },
1140                // After we process all of `52`, we can continue to the next time.
1141                ProcessXParts(5),
1142                AssertBackpressured {
1143                    frontier: (101, Subtime(0)),
1144                    inflight_parts: 5,
1145                    retired_parts: 5,
1146                },
1147                AssertOutputFrontier((100, Subtime(9))),
1148            ],
1149            true,
1150        );
1151    }
1152
1153    #[mz_ore::test]
1154    fn test_backpressure_granular() {
1155        use Step::*;
1156        backpressure_runner(
1157            vec![(50, Part(101)), (50, Part(101))],
1158            100,
1159            (0, Subtime(1)),
1160            vec![
1161                // Advance our frontier to outputting a single part.
1162                AssertOutputFrontier((50, Subtime(1))),
1163                // Receive backpressure updates until our frontier is up-to-date but
1164                // not beyond the parts (while considering the summary).
1165                AssertBackpressured {
1166                    frontier: (0, Subtime(1)),
1167                    inflight_parts: 1,
1168                    retired_parts: 0,
1169                },
1170                AssertBackpressured {
1171                    frontier: (50, Subtime(1)),
1172                    inflight_parts: 1,
1173                    retired_parts: 0,
1174                },
1175                // Process that part.
1176                ProcessXParts(1),
1177                // Assert that we clear the backpressure status
1178                AssertBackpressured {
1179                    frontier: (50, Subtime(2)),
1180                    inflight_parts: 1,
1181                    retired_parts: 1,
1182                },
1183                // Ensure we make progress to the next part.
1184                AssertOutputFrontier((50, Subtime(2))),
1185            ],
1186            false,
1187        );
1188
1189        backpressure_runner(
1190            vec![
1191                (50, Part(10)),
1192                (50, Part(10)),
1193                (51, Part(35)),
1194                (52, Part(100)),
1195            ],
1196            50,
1197            (0, Subtime(1)),
1198            vec![
1199                // we can emit 3 parts before we hit the backpressure limit
1200                AssertOutputFrontier((51, Subtime(3))),
1201                AssertBackpressured {
1202                    frontier: (0, Subtime(1)),
1203                    inflight_parts: 3,
1204                    retired_parts: 0,
1205                },
1206                AssertBackpressured {
1207                    frontier: (50, Subtime(1)),
1208                    inflight_parts: 3,
1209                    retired_parts: 0,
1210                },
1211                // Retire the single part.
1212                ProcessXParts(1),
1213                AssertBackpressured {
1214                    frontier: (50, Subtime(2)),
1215                    inflight_parts: 3,
1216                    retired_parts: 1,
1217                },
1218                // Ensure we make progress, and then
1219                // can retire the next 2 parts.
1220                AssertOutputFrontier((52, Subtime(4))),
1221                ProcessXParts(2),
1222                AssertBackpressured {
1223                    frontier: (52, Subtime(4)),
1224                    inflight_parts: 3,
1225                    retired_parts: 2,
1226                },
1227            ],
1228            false,
1229        );
1230    }
1231
1232    type Time = (u64, Subtime);
1233    #[derive(Clone, Debug)]
1234    struct Part(usize);
1235    impl Backpressureable for Part {
1236        fn byte_size(&self) -> usize {
1237            self.0
1238        }
1239    }
1240
1241    /// Actions taken by `backpressure_runner`.
1242    enum Step {
1243        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1244        /// this far. This is a single time because we assume
1245        AssertOutputFrontier(Time),
1246        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1247        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1248        /// retired.
1249        AssertBackpressured {
1250            frontier: Time,
1251            inflight_parts: usize,
1252            retired_parts: usize,
1253        },
1254        /// Process X parts in the downstream operator. This affects the feedback frontier.
1255        ProcessXParts(usize),
1256    }
1257
1258    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1259    fn backpressure_runner(
1260        // The input data to the `backpressure` operator
1261        input: Vec<(u64, Part)>,
1262        // The maximum inflight bytes the `backpressure` operator allows through.
1263        max_inflight_bytes: usize,
1264        // The feedback summary used by the `backpressure` operator.
1265        summary: Time,
1266        // List of steps to run through.
1267        steps: Vec<Step>,
1268        // Whether or not to consume records in the non-granular scope. This is useful when the
1269        // `summary` is something like `(1, 0)`.
1270        non_granular_consumer: bool,
1271    ) {
1272        timely::execute::execute_directly(move |worker| {
1273            let (
1274                backpressure_probe,
1275                consumer_tx,
1276                mut backpressure_status_rx,
1277                finalizer_tx,
1278                _token,
1279            ) =
1280                // Set up the top-level non-granular scope.
1281                worker.dataflow::<u64, _, _>(|outer_scope| {
1282                    let (non_granular_feedback_handle, non_granular_feedback) =
1283                        if non_granular_consumer {
1284                            let (h, f) = outer_scope.feedback(Default::default());
1285                            (Some(h), Some(f))
1286                        } else {
1287                            (None, None)
1288                        };
1289                    let (
1290                        backpressure_probe,
1291                        consumer_tx,
1292                        backpressure_status_rx,
1293                        token,
1294                        backpressured,
1295                        finalizer_tx,
1296                    ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1297                        let (input, finalizer_tx) =
1298                            iterator_operator(scope.clone(), input.into_iter());
1299
1300                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1301                            (
1302                                FlowControl {
1303                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1304                                    max_inflight_bytes,
1305                                    summary,
1306                                    metrics: None
1307                                },
1308                                None,
1309                            )
1310                        } else {
1311                            let (granular_feedback_handle, granular_feedback) =
1312                                scope.feedback(Default::default());
1313                            (
1314                                FlowControl {
1315                                    progress_stream: granular_feedback,
1316                                    max_inflight_bytes,
1317                                    summary,
1318                                    metrics: None,
1319                                },
1320                                Some(granular_feedback_handle),
1321                            )
1322                        };
1323
1324                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1325
1326                        let (backpressured, token) = backpressure(
1327                            scope,
1328                            "test",
1329                            input,
1330                            flow_control,
1331                            0,
1332                            Some(backpressure_status_tx),
1333                        );
1334
1335                        // If we want to granularly consume the output, we setup the consumer here.
1336                        let tx = if !non_granular_consumer {
1337                            Some(consumer_operator(
1338                                scope.clone(),
1339                                backpressured.clone(),
1340                                granular_feedback_handle.unwrap(),
1341                            ))
1342                        } else {
1343                            None
1344                        };
1345
1346                        let (probe_handle, backpressured) = backpressured.probe();
1347                        (
1348                            probe_handle,
1349                            tx,
1350                            backpressure_status_rx,
1351                            token,
1352                            backpressured.leave(outer_scope),
1353                            finalizer_tx,
1354                        )
1355                    });
1356
1357                    // If we want to non-granularly consume the output, we setup the consumer here.
1358                    let consumer_tx = if non_granular_consumer {
1359                        consumer_operator(
1360                            outer_scope.clone(),
1361                            backpressured,
1362                            non_granular_feedback_handle.unwrap(),
1363                        )
1364                    } else {
1365                        consumer_tx.unwrap()
1366                    };
1367
1368                    (
1369                        backpressure_probe,
1370                        consumer_tx,
1371                        backpressure_status_rx,
1372                        finalizer_tx,
1373                        token,
1374                    )
1375                });
1376
1377            use Step::*;
1378            for step in steps {
1379                match step {
1380                    AssertOutputFrontier(time) => {
1381                        eprintln!("checking advance to {time:?}");
1382                        backpressure_probe.with_frontier(|front| {
1383                            eprintln!("current backpressure output frontier: {front:?}");
1384                        });
1385                        while backpressure_probe.less_than(&time) {
1386                            worker.step();
1387                            backpressure_probe.with_frontier(|front| {
1388                                eprintln!("current backpressure output frontier: {front:?}");
1389                            });
1390                            std::thread::sleep(std::time::Duration::from_millis(25));
1391                        }
1392                    }
1393                    ProcessXParts(parts) => {
1394                        eprintln!("processing {parts:?} parts");
1395                        for _ in 0..parts {
1396                            consumer_tx.send(()).unwrap();
1397                        }
1398                    }
1399                    AssertBackpressured {
1400                        frontier,
1401                        inflight_parts,
1402                        retired_parts,
1403                    } => {
1404                        let frontier = Antichain::from_elem(frontier);
1405                        eprintln!(
1406                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1407                            and {retired_parts:?} retired"
1408                        );
1409                        let (new_frontier, new_count, new_retired_count) = loop {
1410                            if let Ok(val) = backpressure_status_rx.try_recv() {
1411                                break val;
1412                            }
1413                            worker.step();
1414                            std::thread::sleep(std::time::Duration::from_millis(25));
1415                        };
1416                        assert_eq!(
1417                            (frontier, inflight_parts, retired_parts),
1418                            (new_frontier, new_count, new_retired_count)
1419                        );
1420                    }
1421                }
1422            }
1423            // Send the input to the empty frontier.
1424            let _ = finalizer_tx.send(());
1425        });
1426    }
1427
1428    /// An operator that emits `Part`'s at the specified timestamps. Does not
1429    /// drop its capability until it gets a signal from the `Sender` it returns.
1430    fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1431        scope: Scope<'scope, (u64, Subtime)>,
1432        mut input: I,
1433    ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1434        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1435        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1436        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1437
1438        iterator.build(|mut caps| async move {
1439            let mut capability = Some(caps.pop().unwrap());
1440            let mut last = None;
1441            while let Some(element) = input.next() {
1442                let time = element.0.clone();
1443                let part = element.1;
1444                last = Some((time, Subtime(0)));
1445                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1446            }
1447            if let Some(last) = last {
1448                capability
1449                    .as_mut()
1450                    .unwrap()
1451                    .downgrade(&(last.0 + 1, last.1));
1452            }
1453
1454            let _ = finalizer_rx.await;
1455            capability.take();
1456        });
1457
1458        (output, finalizer_tx)
1459    }
1460
1461    /// An operator that consumes its input ONLY when given a signal to do from
1462    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1463    /// being processed. Also connects the `feedback` handle to its output.
1464    fn consumer_operator<
1465        'scope,
1466        T: timely::progress::Timestamp,
1467        O: Backpressureable + std::fmt::Debug,
1468    >(
1469        scope: Scope<'scope, T>,
1470        input: StreamVec<'scope, T, O>,
1471        feedback: timely::dataflow::operators::feedback::Handle<
1472            'scope,
1473            T,
1474            Vec<std::convert::Infallible>,
1475        >,
1476    ) -> UnboundedSender<()> {
1477        let (tx, mut rx) = unbounded_channel::<()>();
1478        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1479        let (output_handle, output) =
1480            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1481        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1482
1483        consumer.build(|_caps| async move {
1484            while let Some(()) = rx.recv().await {
1485                // Consume exactly one messages (unless the input is exhausted).
1486                while let Some(Event::Progress(_)) = input.next().await {}
1487            }
1488        });
1489        output.connect_loop(feedback);
1490
1491        tx
1492    }
1493}