Skip to main content

mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, EvalError, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_ore::str::redact;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31    ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_client::stats::STATS_AUDIT_PANIC;
34use mz_persist_types::Codec64;
35use mz_persist_types::codec_impls::UnitSchema;
36use mz_persist_types::columnar::{ColumnEncoder, Schema};
37use mz_repr::{
38    Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
39};
40use mz_storage_types::StorageDiff;
41use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
42use mz_storage_types::errors::DataflowError;
43use mz_storage_types::sources::SourceData;
44use mz_storage_types::stats::RelationPartStats;
45use mz_timely_util::builder_async::{
46    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
47};
48use mz_timely_util::probe::ProbeNotify;
49use mz_txn_wal::operator::{TxnsContext, txns_progress};
50use serde::{Deserialize, Serialize};
51use timely::PartialOrder;
52use timely::container::CapacityContainerBuilder;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::{Scope, Stream, StreamVec};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::{error, trace};
66
67use crate::metrics::BackpressureMetrics;
68
69/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
70/// progress than would otherwise be possible.
71///
72/// This is "opaque" since we'd like to reserve the right to change the definition in the future
73/// without downstreams being able to rely on the precise representation. (At the moment, this
74/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
75/// in the future.)
76#[derive(
77    Copy,
78    Clone,
79    PartialEq,
80    Default,
81    Eq,
82    PartialOrd,
83    Ord,
84    Debug,
85    Serialize,
86    Deserialize,
87    Hash
88)]
89pub struct Subtime(u64);
90
91impl PartialOrder for Subtime {
92    fn less_equal(&self, other: &Self) -> bool {
93        self.0.less_equal(&other.0)
94    }
95}
96
97impl TotalOrder for Subtime {}
98
99impl PathSummary<Subtime> for Subtime {
100    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
101        self.0.results_in(&src.0).map(Subtime)
102    }
103
104    fn followed_by(&self, other: &Self) -> Option<Self> {
105        self.0.followed_by(&other.0).map(Subtime)
106    }
107}
108
109impl TimelyTimestamp for Subtime {
110    type Summary = Subtime;
111
112    fn minimum() -> Self {
113        Subtime(0)
114    }
115}
116
117impl differential_dataflow::lattice::Lattice for Subtime {
118    fn join(&self, other: &Self) -> Self {
119        Subtime(std::cmp::max(self.0, other.0))
120    }
121    fn meet(&self, other: &Self) -> Self {
122        Subtime(std::cmp::min(self.0, other.0))
123    }
124}
125
126impl differential_dataflow::lattice::Maximum for Subtime {
127    fn maximum() -> Self {
128        Subtime(u64::MAX)
129    }
130}
131
132impl Subtime {
133    /// The smallest non-zero summary for the opaque timestamp type.
134    pub const fn least_summary() -> Self {
135        Subtime(1)
136    }
137}
138
139/// Creates a new source that reads from a persist shard, distributing the work
140/// of reading data to all timely workers.
141///
142/// All times emitted will have been [advanced by] the given `as_of` frontier.
143/// All updates at times greater or equal to `until` will be suppressed.
144/// The `map_filter_project` argument, if supplied, may be partially applied,
145/// and any un-applied part of the argument will be left behind in the argument.
146///
147/// Users of this function have the ability to apply flow control to the output
148/// to limit the in-flight data (measured in bytes) it can emit. The flow control
149/// input is a timely stream that communicates the frontier at which the data
150/// emitted from by this source have been dropped.
151///
152/// **Note:** Because this function is reading batches from `persist`, it is working
153/// at batch granularity. In practice, the source will be overshooting the target
154/// flow control upper by an amount that is related to the size of batches.
155///
156/// If no flow control is desired an empty stream whose frontier immediately advances
157/// to the empty antichain can be used. An easy easy of creating such stream is by
158/// using [`timely::dataflow::operators::generic::operator::empty`].
159///
160/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
161pub fn persist_source<'scope, E>(
162    scope: Scope<'scope, mz_repr::Timestamp>,
163    source_id: GlobalId,
164    persist_clients: Arc<PersistClientCache>,
165    txns_ctx: &TxnsContext,
166    metadata: CollectionMetadata,
167    read_schema: Option<RelationDesc>,
168    as_of: Option<Antichain<Timestamp>>,
169    snapshot_mode: SnapshotMode,
170    until: Antichain<Timestamp>,
171    map_filter_project: Option<&mut MfpPlan>,
172    max_inflight_bytes: Option<usize>,
173    start_signal: impl Future<Output = ()> + Send + 'static,
174    error_handler: ErrorHandler,
175) -> (
176    StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
177    StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
178    Vec<PressOnDropButton>,
179)
180where
181    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
182{
183    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
184
185    let mut tokens = vec![];
186
187    let outer = scope.clone();
188    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
189        let (flow_control, flow_control_probe) = match max_inflight_bytes {
190            Some(max_inflight_bytes) => {
191                let backpressure_metrics = BackpressureMetrics {
192                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
193                    last_backpressured_bytes: Arc::clone(
194                        &shard_metrics.backpressure_last_backpressured_bytes,
195                    ),
196                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
197                };
198
199                let probe = mz_timely_util::probe::Handle::default();
200                let progress_stream = mz_timely_util::probe::source(
201                    scope.clone(),
202                    format!("decode_backpressure_probe({source_id})"),
203                    probe.clone(),
204                );
205                let flow_control = FlowControl {
206                    progress_stream,
207                    max_inflight_bytes,
208                    summary: (Default::default(), Subtime::least_summary()),
209                    metrics: Some(backpressure_metrics),
210                };
211                (Some(flow_control), Some(probe))
212            }
213            None => (None, None),
214        };
215
216        // Our default listen sleeps are tuned for the case of a shard that is
217        // written once a second, but txn-wal allows these to be lazy.
218        // Override the tuning to reduce crdb load. The pubsub fallback
219        // responsibility is then replaced by manual "one state" wakeups in the
220        // txns_progress operator.
221        let cfg = Arc::clone(&persist_clients.cfg().configs);
222        let subscribe_sleep = match metadata.txns_shard {
223            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
224            None => None,
225        };
226
227        let (stream, source_tokens) = persist_source_core(
228            outer,
229            scope,
230            source_id,
231            Arc::clone(&persist_clients),
232            metadata.clone(),
233            read_schema,
234            as_of.clone(),
235            snapshot_mode,
236            until.clone(),
237            map_filter_project,
238            flow_control,
239            subscribe_sleep,
240            start_signal,
241            error_handler,
242        );
243        tokens.extend(source_tokens);
244
245        let stream = match flow_control_probe {
246            Some(probe) => stream.probe_notify_with(vec![probe]),
247            None => stream,
248        };
249
250        stream.leave(outer)
251    });
252
253    // If a txns_shard was provided, then this shard is in the txn-wal
254    // system. This means the "logical" upper may be ahead of the "physical"
255    // upper. Render a dataflow operator that passes through the input and
256    // translates the progress frontiers as necessary.
257    let (stream, txns_tokens) = match metadata.txns_shard {
258        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
259            stream,
260            &source_id.to_string(),
261            txns_ctx,
262            move || {
263                let (c, l) = (
264                    Arc::clone(&persist_clients),
265                    metadata.persist_location.clone(),
266                );
267                async move { c.open(l).await.expect("location is valid") }
268            },
269            txns_shard,
270            metadata.data_shard,
271            as_of
272                .expect("as_of is provided for table sources")
273                .into_option()
274                .expect("shard is not closed"),
275            until,
276            Arc::new(metadata.relation_desc),
277            Arc::new(UnitSchema),
278        ),
279        None => (stream, vec![]),
280    };
281    tokens.extend(txns_tokens);
282    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
283        Ok(row) => Ok((row, t.0, r)),
284        Err(err) => Err((err, t.0, r)),
285    });
286    (ok_stream, err_stream, tokens)
287}
288
289type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
290
291/// Creates a new source that reads from a persist shard, distributing the work
292/// of reading data to all timely workers.
293///
294/// All times emitted will have been [advanced by] the given `as_of` frontier.
295///
296/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
297#[allow(clippy::needless_borrow)]
298pub fn persist_source_core<'g, 'outer, E>(
299    outer: Scope<'outer, mz_repr::Timestamp>,
300    scope: RefinedScope<'g, mz_repr::Timestamp>,
301    source_id: GlobalId,
302    persist_clients: Arc<PersistClientCache>,
303    metadata: CollectionMetadata,
304    read_schema: Option<RelationDesc>,
305    as_of: Option<Antichain<Timestamp>>,
306    snapshot_mode: SnapshotMode,
307    until: Antichain<Timestamp>,
308    map_filter_project: Option<&mut MfpPlan>,
309    flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
310    // If Some, an override for the default listen sleep retry parameters.
311    listen_sleep: Option<impl Fn() -> RetryParameters + Send + 'static>,
312    start_signal: impl Future<Output = ()> + Send + 'static,
313    error_handler: ErrorHandler,
314) -> (
315    Stream<
316        'g,
317        (mz_repr::Timestamp, Subtime),
318        Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
319    >,
320    Vec<PressOnDropButton>,
321)
322where
323    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
324{
325    let cfg = persist_clients.cfg().clone();
326    let name = source_id.to_string();
327    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
328
329    // N.B. `read_schema` may be a subset of the total columns for this shard.
330    let read_desc = match read_schema {
331        Some(desc) => desc,
332        None => metadata.relation_desc,
333    };
334
335    let desc_transformer = match flow_control {
336        Some(flow_control) => Some(move |scope, descs, chosen_worker| {
337            let (stream, token) = backpressure(
338                scope,
339                &format!("backpressure({source_id})"),
340                descs,
341                flow_control,
342                chosen_worker,
343                None,
344            );
345            (stream, vec![token])
346        }),
347        None => None,
348    };
349
350    let metrics = Arc::clone(persist_clients.metrics());
351    let filter_name = name.clone();
352    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
353    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
354    // `until ` is the empty antichain.
355    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
356    let (fetched, token) = shard_source(
357        outer,
358        scope,
359        &name,
360        move || {
361            let (c, l) = (
362                Arc::clone(&persist_clients),
363                metadata.persist_location.clone(),
364            );
365            async move { c.open(l).await.unwrap() }
366        },
367        metadata.data_shard,
368        as_of,
369        snapshot_mode,
370        until.clone(),
371        desc_transformer,
372        Arc::new(read_desc.clone()),
373        Arc::new(UnitSchema),
374        move |stats, frontier| {
375            let Some(lower) = frontier.as_option().copied() else {
376                // If the frontier has advanced to the empty antichain,
377                // we'll never emit any rows from any part.
378                return FilterResult::Discard;
379            };
380
381            if lower > upper {
382                // The frontier timestamp is larger than the until of the dataflow:
383                // anything from this part will necessarily be filtered out.
384                return FilterResult::Discard;
385            }
386
387            let time_range =
388                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
389            if let Some(plan) = &filter_plan {
390                let metrics = &metrics.pushdown.part_stats;
391                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
392                filter_result(&read_desc, time_range, stats, plan)
393            } else {
394                FilterResult::Keep
395            }
396        },
397        listen_sleep,
398        start_signal,
399        error_handler,
400    );
401    let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
402    (rows, token)
403}
404
405fn filter_result(
406    relation_desc: &RelationDesc,
407    time_range: ResultSpec,
408    stats: RelationPartStats,
409    plan: &MfpPlan,
410) -> FilterResult {
411    let arena = RowArena::new();
412    let relation = ReprRelationType::from(relation_desc.typ());
413    let mut ranges = ColumnSpecs::new(&relation, &arena);
414    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
415
416    let may_error = stats.err_count().map_or(true, |count| count > 0);
417
418    // N.B. We may have pushed down column "demands" into Persist, so this
419    // relation desc may have a different set of columns than the stats.
420    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
421        let result_spec = stats.col_stats(idx, &arena);
422        ranges.push_column(pos, result_spec);
423    }
424    let result = ranges.mfp_plan_filter(plan).range;
425    let may_error = may_error || result.may_fail();
426    let may_keep = result.may_contain(Datum::True);
427    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
428    if relation_desc.len() == 0 && !may_error && !may_skip {
429        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
430            return FilterResult::Keep;
431        };
432        key.append(&SourceData(Ok(Row::default())));
433        let key = key.finish();
434        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
435            return FilterResult::Keep;
436        };
437        val.append(&());
438        let val = val.finish();
439
440        FilterResult::ReplaceWith {
441            key: Arc::new(key),
442            val: Arc::new(val),
443        }
444    } else if may_error || may_keep {
445        FilterResult::Keep
446    } else {
447        FilterResult::Discard
448    }
449}
450
451pub fn decode_and_mfp<'scope, E>(
452    cfg: PersistConfig,
453    fetched: StreamVec<
454        'scope,
455        (mz_repr::Timestamp, Subtime),
456        FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
457    >,
458    name: &str,
459    until: Antichain<Timestamp>,
460    mut map_filter_project: Option<&mut MfpPlan>,
461) -> StreamVec<
462    'scope,
463    (mz_repr::Timestamp, Subtime),
464    (Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff),
465>
466where
467    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
468{
469    let scope = fetched.scope();
470    let mut builder = OperatorBuilder::new(
471        format!("persist_source::decode_and_mfp({})", name),
472        scope.clone(),
473    );
474    let operator_info = builder.operator_info();
475
476    let mut fetched_input = builder.new_input(fetched, Pipeline);
477    let (updates_output, updates_stream) = builder.new_output();
478    let mut updates_output = OutputBuilder::from(updates_output);
479
480    // Re-used state for processing and building rows.
481    let mut datum_vec = mz_repr::DatumVec::new();
482    let mut row_builder = Row::default();
483
484    // Extract the MFP if it exists; leave behind an identity MFP in that case.
485    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
486
487    builder.build(move |_caps| {
488        let name = name.to_owned();
489        // Acquire an activator to reschedule the operator when it has unfinished work.
490        let activations = scope.activations();
491        let activator = Activator::new(operator_info.address, activations);
492        // Maintain a list of work to do
493        let mut pending_work = std::collections::VecDeque::new();
494        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
495
496        move |_frontier| {
497            fetched_input.for_each(|time, data| {
498                let capability = time.retain(0);
499                for fetched_blob in data.drain(..) {
500                    pending_work.push_back(PendingWork {
501                        panic_on_audit_failure: panic_on_audit_failure.get(),
502                        capability: capability.clone(),
503                        part: PendingPart::Unparsed(fetched_blob),
504                    })
505                }
506            });
507
508            // Get dyncfg values once per schedule to amortize the cost of
509            // loading the atomics.
510            let yield_fuel = cfg.storage_source_decode_fuel();
511            let yield_fn = |_, work| work >= yield_fuel;
512
513            let mut work = 0;
514            let start_time = Instant::now();
515            let mut output = updates_output.activate();
516            while !pending_work.is_empty() && !yield_fn(start_time, work) {
517                let done = pending_work.front_mut().unwrap().do_work(
518                    &mut work,
519                    &name,
520                    start_time,
521                    yield_fn,
522                    &until,
523                    map_filter_project.as_ref(),
524                    &mut datum_vec,
525                    &mut row_builder,
526                    &mut output,
527                );
528                if done {
529                    pending_work.pop_front();
530                }
531            }
532            if !pending_work.is_empty() {
533                activator.activate();
534            }
535        }
536    });
537
538    updates_stream
539}
540
541/// Pending work to read from fetched parts
542struct PendingWork {
543    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
544    panic_on_audit_failure: bool,
545    /// The time at which the work should happen.
546    capability: Capability<(mz_repr::Timestamp, Subtime)>,
547    /// Pending fetched part.
548    part: PendingPart,
549}
550
551enum PendingPart {
552    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
553    Parsed {
554        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
555    },
556}
557
558impl PendingPart {
559    /// Returns the contained `FetchedPart`, first parsing it from a
560    /// `FetchedBlob` if necessary.
561    ///
562    /// Also returns a bool, which is true if the part is known (from pushdown
563    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
564    /// is known to contain errors or if it's unknown.
565    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
566        match self {
567            PendingPart::Unparsed(x) => {
568                *self = PendingPart::Parsed { part: x.parse() };
569                // Won't recurse any further.
570                self.part_mut()
571            }
572            PendingPart::Parsed { part } => &mut part.part,
573        }
574    }
575}
576
577impl PendingWork {
578    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
579    /// `yield_fn` whether more fuel is available.
580    fn do_work<YFn, E>(
581        &mut self,
582        work: &mut usize,
583        name: &str,
584        start_time: Instant,
585        yield_fn: YFn,
586        until: &Antichain<Timestamp>,
587        map_filter_project: Option<&MfpPlan>,
588        datum_vec: &mut DatumVec,
589        row_builder: &mut Row,
590        output: &mut OutputBuilderSession<
591            '_,
592            (mz_repr::Timestamp, Subtime),
593            ConsolidatingContainerBuilder<
594                Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
595            >,
596        >,
597    ) -> bool
598    where
599        YFn: Fn(Instant, usize) -> bool,
600        E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
601    {
602        let mut session = output.session_with_builder(&self.capability);
603        let fetched_part = self.part.part_mut();
604        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
605        let mut row_buf = None;
606        while let Some(((key, val), time, diff)) =
607            fetched_part.next_with_storage(&mut row_buf, &mut None)
608        {
609            if until.less_equal(&time) {
610                continue;
611            }
612            match (key, val) {
613                (SourceData(Ok(row)), ()) => {
614                    if let Some(mfp) = map_filter_project {
615                        // We originally accounted work as the number of outputs, to give downstream
616                        // operators a chance to reduce down anything we've emitted. This mfp call
617                        // might have a restrictive filter, which would have been counted as no
618                        // work. However, in practice, we've been decode_and_mfp be a source of
619                        // interactivity loss during rehydration, so we now also count each mfp
620                        // evaluation against our fuel.
621                        *work += 1;
622                        let arena = mz_repr::RowArena::new();
623                        let mut datums_local = datum_vec.borrow_with(&row);
624                        for result in mfp.evaluate(
625                            &mut datums_local,
626                            &arena,
627                            time,
628                            diff.into(),
629                            |time| !until.less_equal(time),
630                            row_builder,
631                        ) {
632                            // Earlier we decided this Part doesn't need to be fetched, but to
633                            // audit our logic we fetched it any way. If the MFP returned data it
634                            // means our earlier decision to not fetch this part was incorrect.
635                            if let Some(stats) = &is_filter_pushdown_audit {
636                                // NB: The tag added by this scope is used for alerting. The panic
637                                // message may be changed arbitrarily, but the tag key and val must
638                                // stay the same.
639                                sentry::with_scope(
640                                    |scope| {
641                                        scope
642                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
643                                    },
644                                    || {
645                                        error!(
646                                            ?stats,
647                                            name,
648                                            mfp = ?redact(&mfp),
649                                            result = ?redact(&result),
650                                            "persist filter pushdown correctness violation!"
651                                        );
652                                        if self.panic_on_audit_failure {
653                                            panic!(
654                                                "persist filter pushdown correctness violation! {}",
655                                                name
656                                            );
657                                        }
658                                    },
659                                );
660                            }
661                            match result {
662                                Ok((row, time, diff)) => {
663                                    // Additional `until` filtering due to temporal filters.
664                                    if !until.less_equal(&time) {
665                                        let mut emit_time = *self.capability.time();
666                                        emit_time.0 = time;
667                                        session.give((Ok(row), emit_time, diff));
668                                        *work += 1;
669                                    }
670                                }
671                                Err((err, time, diff)) => {
672                                    // Additional `until` filtering due to temporal filters.
673                                    if !until.less_equal(&time) {
674                                        let mut emit_time = *self.capability.time();
675                                        emit_time.0 = time;
676                                        session.give((Err(err), emit_time, diff));
677                                        *work += 1;
678                                    }
679                                }
680                            }
681                        }
682                        // At the moment, this is the only case where we can re-use the allocs for
683                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
684                        // operator used a different container than `Vec<Row>`.
685                        drop(datums_local);
686                        row_buf.replace(SourceData(Ok(row)));
687                    } else {
688                        let mut emit_time = *self.capability.time();
689                        emit_time.0 = time;
690                        // Clone row so we retain our row allocation.
691                        session.give((Ok(row.clone()), emit_time, diff.into()));
692                        row_buf.replace(SourceData(Ok(row)));
693                        *work += 1;
694                    }
695                }
696                (SourceData(Err(err)), ()) => {
697                    let mut emit_time = *self.capability.time();
698                    emit_time.0 = time;
699                    session.give((Err(E::from(err)), emit_time, diff.into()));
700                    *work += 1;
701                }
702            }
703            if yield_fn(start_time, *work) {
704                return false;
705            }
706        }
707        true
708    }
709}
710
711/// A trait representing a type that can be used in `backpressure`.
712pub trait Backpressureable: Clone + 'static {
713    /// Return the weight of the object, in bytes.
714    fn byte_size(&self) -> usize;
715}
716
717impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
718    fn byte_size(&self) -> usize {
719        self.1.encoded_size_bytes()
720    }
721}
722
723/// Flow control configuration.
724#[derive(Debug)]
725pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
726    /// Stream providing in-flight frontier updates.
727    ///
728    /// As implied by its type, this stream never emits data, only progress updates.
729    ///
730    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
731    pub progress_stream: StreamVec<'scope, T, Infallible>,
732    /// Maximum number of in-flight bytes.
733    pub max_inflight_bytes: usize,
734    /// The minimum range of timestamps (be they granular or not) that must be emitted,
735    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
736    pub summary: T::Summary,
737
738    /// Optional metrics for the `backpressure` operator to keep up-to-date.
739    pub metrics: Option<BackpressureMetrics>,
740}
741
742/// Apply flow control to the `data` input, based on the given `FlowControl`.
743///
744/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
745/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
746/// and a `summary`. Note that the `data` input expects all the second part of the tuple
747/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
748///
749/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
750/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
751/// `max_inflight_bytes`.
752///
753/// The implementation of this operator is very subtle. Many inline comments have been added.
754pub fn backpressure<'scope, T, O>(
755    scope: Scope<'scope, (T, Subtime)>,
756    name: &str,
757    data: StreamVec<'scope, (T, Subtime), O>,
758    flow_control: FlowControl<'scope, (T, Subtime)>,
759    chosen_worker: usize,
760    // A probe used to inspect this operator during unit-testing
761    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
762) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
763where
764    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
765    O: Backpressureable + std::fmt::Debug,
766{
767    let worker_index = scope.index();
768
769    let (flow_control_stream, flow_control_max_bytes, metrics) = (
770        flow_control.progress_stream,
771        flow_control.max_inflight_bytes,
772        flow_control.metrics,
773    );
774
775    // Both the `flow_control` input and the data input are disconnected from the output. We manually
776    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
777    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
778    // non-circular fashion.
779    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
780    flow_control_stream.connect_loop(handle);
781
782    let mut builder = AsyncOperatorBuilder::new(
783        format!("persist_source_backpressure({})", name),
784        scope.clone(),
785    );
786    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
787
788    let mut data_input = builder.new_disconnected_input(data, Pipeline);
789    let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
790
791    // Helper method used to synthesize current and next frontier for ordered times.
792    fn synthesize_frontiers<T: PartialOrder + Clone>(
793        mut frontier: Antichain<(T, Subtime)>,
794        mut time: (T, Subtime),
795        part_number: &mut u64,
796    ) -> (
797        (T, Subtime),
798        Antichain<(T, Subtime)>,
799        Antichain<(T, Subtime)>,
800    ) {
801        let mut next_frontier = frontier.clone();
802        time.1 = Subtime(*part_number);
803        frontier.insert(time.clone());
804        *part_number += 1;
805        let mut next_time = time.clone();
806        next_time.1 = Subtime(*part_number);
807        next_frontier.insert(next_time);
808        (time, frontier, next_frontier)
809    }
810
811    // _Refine_ the data stream by amending the second input with the part number. This also
812    // ensures that we order the parts by time.
813    let data_input = async_stream::stream!({
814        let mut part_number = 0;
815        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
816        loop {
817            match data_input.next().await {
818                None => {
819                    let empty = Antichain::new();
820                    parts.sort_by_key(|val| val.0.clone());
821                    for (part_time, d) in parts.drain(..) {
822                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
823                            empty.clone(),
824                            part_time.clone(),
825                            &mut part_number,
826                        );
827                        yield Either::Right((part_time, d, frontier, next_frontier))
828                    }
829                    break;
830                }
831                Some(Event::Data(time, data)) => {
832                    for d in data {
833                        parts.push((time.clone(), d));
834                    }
835                }
836                Some(Event::Progress(prog)) => {
837                    parts.sort_by_key(|val| val.0.clone());
838                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
839                        let (part_time, frontier, next_frontier) =
840                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
841                        yield Either::Right((part_time, d, frontier, next_frontier))
842                    }
843                    yield Either::Left(prog)
844                }
845            }
846        }
847    });
848    let shutdown_button = builder.build(move |caps| async move {
849        // The output capability.
850        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
851
852        // The frontier of our output. This matches the `CapabilitySet` above.
853        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
854        // The frontier of the `flow_control` input.
855        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
856
857        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
858        let mut inflight_parts = Vec::new();
859        // Parts we have not yet emitted, but do participate in the `input_frontier`.
860        let mut pending_parts = std::collections::VecDeque::new();
861
862        // Only one worker is responsible for distributing parts
863        if worker_index != chosen_worker {
864            trace!(
865                "We are not the chosen worker ({}), exiting...",
866                chosen_worker
867            );
868            return;
869        }
870        tokio::pin!(data_input);
871        'emitting_parts: loop {
872            // At the beginning of our main loop, we determine the total size of
873            // inflight parts.
874            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
875
876            // There are 2 main cases where we can continue to emit parts:
877            // - The total emitted bytes is less than `flow_control_max_bytes`.
878            // - The output frontier is not beyond the `flow_control_frontier`
879            //
880            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
881            // block, as we wait for progress tracking to keep the `flow_control` frontier
882            // up-to-date. This is tested in unit-tests.
883            if inflight_bytes < flow_control_max_bytes
884                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
885            {
886                let (time, part, next_frontier) =
887                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
888                        (time, part, next_frontier)
889                    } else {
890                        match data_input.next().await {
891                            Some(Either::Right((time, part, frontier, next_frontier))) => {
892                                // Downgrade the output frontier to this part's time. This is useful
893                                // "close" timestamp's from previous parts, even if we don't yet
894                                // emit this part. Note that this is safe because `data_input` ensures
895                                // time-ordering.
896                                output_frontier = frontier;
897                                cap_set.downgrade(output_frontier.iter());
898
899                                // If the most recent value's time is _beyond_ the
900                                // `flow_control` frontier (which takes into account the `summary`), we
901                                // have emitted an entire `summary` worth of data, and can store this
902                                // value for later.
903                                if inflight_bytes >= flow_control_max_bytes
904                                    && !PartialOrder::less_than(
905                                        &output_frontier,
906                                        &flow_control_frontier,
907                                    )
908                                {
909                                    pending_parts.push_back((time, part, next_frontier));
910                                    continue 'emitting_parts;
911                                }
912                                (time, part, next_frontier)
913                            }
914                            Some(Either::Left(prog)) => {
915                                output_frontier = prog;
916                                cap_set.downgrade(output_frontier.iter());
917                                continue 'emitting_parts;
918                            }
919                            None => {
920                                if pending_parts.is_empty() {
921                                    break 'emitting_parts;
922                                } else {
923                                    continue 'emitting_parts;
924                                }
925                            }
926                        }
927                    };
928
929                let byte_size = part.byte_size();
930                // Store the value with the _frontier_ the `flow_control_input` must reach
931                // to retire it. Note that if this `results_in` is `None`, then we
932                // are at `T::MAX`, and give up on flow_control entirely.
933                //
934                // SUBTLE: If we stop storing these parts, we will likely never check the
935                // `flow_control_input` ever again. This won't pile up data as that input
936                // only has frontier updates. There may be spurious activations from it though.
937                //
938                // Also note that we don't attempt to handle overflowing the `u64` part counter.
939                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
940                    inflight_parts.push((emission_ts, byte_size));
941                }
942
943                // Emit the data at the given time, and update the frontier and capabilities
944                // to just beyond the part.
945                data_output.give(&cap_set.delayed(&time), part);
946
947                if let Some(metrics) = &metrics {
948                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
949                }
950
951                output_frontier = next_frontier;
952                cap_set.downgrade(output_frontier.iter())
953            } else {
954                if let Some(metrics) = &metrics {
955                    metrics
956                        .last_backpressured_bytes
957                        .set(u64::cast_from(inflight_bytes))
958                }
959                let parts_count = inflight_parts.len();
960                // We've exhausted our budget, listen for updates to the flow_control
961                // input's frontier until we free up new budget. If we don't interact with
962                // with this side of the if statement, because the stream has no data, we
963                // don't cause unbounded buffering in timely.
964                let new_flow_control_frontier = match flow_control_input.next().await {
965                    Some(Event::Progress(frontier)) => frontier,
966                    Some(Event::Data(_, _)) => {
967                        unreachable!("flow_control_input should not contain data")
968                    }
969                    None => Antichain::new(),
970                };
971
972                // Update the `flow_control_frontier` if its advanced.
973                flow_control_frontier.clone_from(&new_flow_control_frontier);
974
975                // Retire parts that are processed downstream.
976                let retired_parts = inflight_parts
977                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
978                let (retired_size, retired_count): (usize, usize) = retired_parts
979                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
980                        (accum_size + size, accum_count + 1)
981                    });
982                trace!(
983                    "returning {} parts with {} bytes, frontier: {:?}",
984                    retired_count, retired_size, flow_control_frontier,
985                );
986
987                if let Some(metrics) = &metrics {
988                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
989                }
990
991                // Optionally emit some information for tests to examine.
992                if let Some(probe) = probe.as_ref() {
993                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
994                }
995            }
996        }
997    });
998    (data_stream, shutdown_button.press_on_drop())
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use timely::container::CapacityContainerBuilder;
1004    use timely::dataflow::operators::{Enter, Probe};
1005    use tokio::sync::mpsc::unbounded_channel;
1006    use tokio::sync::oneshot;
1007
1008    use super::*;
1009
1010    #[mz_ore::test]
1011    fn test_backpressure_non_granular() {
1012        use Step::*;
1013        backpressure_runner(
1014            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1015            100,
1016            (1, Subtime(0)),
1017            vec![
1018                // Assert we backpressure only after we have emitted
1019                // the entire timestamp.
1020                AssertOutputFrontier((50, Subtime(2))),
1021                AssertBackpressured {
1022                    frontier: (1, Subtime(0)),
1023                    inflight_parts: 1,
1024                    retired_parts: 0,
1025                },
1026                AssertBackpressured {
1027                    frontier: (51, Subtime(0)),
1028                    inflight_parts: 1,
1029                    retired_parts: 0,
1030                },
1031                ProcessXParts(2),
1032                AssertBackpressured {
1033                    frontier: (101, Subtime(0)),
1034                    inflight_parts: 2,
1035                    retired_parts: 2,
1036                },
1037                // Assert we make later progress once processing
1038                // the parts.
1039                AssertOutputFrontier((100, Subtime(3))),
1040            ],
1041            true,
1042        );
1043
1044        backpressure_runner(
1045            vec![
1046                (50, Part(10)),
1047                (50, Part(10)),
1048                (51, Part(100)),
1049                (52, Part(1000)),
1050            ],
1051            50,
1052            (1, Subtime(0)),
1053            vec![
1054                // Assert we backpressure only after we emitted enough bytes
1055                AssertOutputFrontier((51, Subtime(3))),
1056                AssertBackpressured {
1057                    frontier: (1, Subtime(0)),
1058                    inflight_parts: 3,
1059                    retired_parts: 0,
1060                },
1061                ProcessXParts(3),
1062                AssertBackpressured {
1063                    frontier: (52, Subtime(0)),
1064                    inflight_parts: 3,
1065                    retired_parts: 2,
1066                },
1067                AssertBackpressured {
1068                    frontier: (53, Subtime(0)),
1069                    inflight_parts: 1,
1070                    retired_parts: 1,
1071                },
1072                // Assert we make later progress once processing
1073                // the parts.
1074                AssertOutputFrontier((52, Subtime(4))),
1075            ],
1076            true,
1077        );
1078
1079        backpressure_runner(
1080            vec![
1081                (50, Part(98)),
1082                (50, Part(1)),
1083                (51, Part(10)),
1084                (52, Part(100)),
1085                // Additional parts at the same timestamp
1086                (52, Part(10)),
1087                (52, Part(10)),
1088                (52, Part(10)),
1089                (52, Part(100)),
1090                // A later part with a later ts.
1091                (100, Part(100)),
1092            ],
1093            100,
1094            (1, Subtime(0)),
1095            vec![
1096                AssertOutputFrontier((51, Subtime(3))),
1097                // Assert we backpressure after we have emitted enough bytes.
1098                // We assert twice here because we get updates as
1099                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1100                AssertBackpressured {
1101                    frontier: (1, Subtime(0)),
1102                    inflight_parts: 3,
1103                    retired_parts: 0,
1104                },
1105                AssertBackpressured {
1106                    frontier: (51, Subtime(0)),
1107                    inflight_parts: 3,
1108                    retired_parts: 0,
1109                },
1110                ProcessXParts(1),
1111                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1112                // 50.
1113                AssertOutputFrontier((51, Subtime(3))),
1114                // After we process all of `50`, we can start emitting data at `52`, but only until
1115                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1116                // all of `51`.
1117                ProcessXParts(1),
1118                AssertOutputFrontier((52, Subtime(4))),
1119                AssertBackpressured {
1120                    frontier: (52, Subtime(0)),
1121                    inflight_parts: 3,
1122                    retired_parts: 2,
1123                },
1124                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1125                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1126                // time `100`.
1127                ProcessXParts(1),
1128                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1129                // parts at `52`
1130                // This is an intermediate state.
1131                AssertBackpressured {
1132                    frontier: (53, Subtime(0)),
1133                    inflight_parts: 2,
1134                    retired_parts: 1,
1135                },
1136                // After we process all of `52`, we can continue to the next time.
1137                ProcessXParts(5),
1138                AssertBackpressured {
1139                    frontier: (101, Subtime(0)),
1140                    inflight_parts: 5,
1141                    retired_parts: 5,
1142                },
1143                AssertOutputFrontier((100, Subtime(9))),
1144            ],
1145            true,
1146        );
1147    }
1148
1149    #[mz_ore::test]
1150    fn test_backpressure_granular() {
1151        use Step::*;
1152        backpressure_runner(
1153            vec![(50, Part(101)), (50, Part(101))],
1154            100,
1155            (0, Subtime(1)),
1156            vec![
1157                // Advance our frontier to outputting a single part.
1158                AssertOutputFrontier((50, Subtime(1))),
1159                // Receive backpressure updates until our frontier is up-to-date but
1160                // not beyond the parts (while considering the summary).
1161                AssertBackpressured {
1162                    frontier: (0, Subtime(1)),
1163                    inflight_parts: 1,
1164                    retired_parts: 0,
1165                },
1166                AssertBackpressured {
1167                    frontier: (50, Subtime(1)),
1168                    inflight_parts: 1,
1169                    retired_parts: 0,
1170                },
1171                // Process that part.
1172                ProcessXParts(1),
1173                // Assert that we clear the backpressure status
1174                AssertBackpressured {
1175                    frontier: (50, Subtime(2)),
1176                    inflight_parts: 1,
1177                    retired_parts: 1,
1178                },
1179                // Ensure we make progress to the next part.
1180                AssertOutputFrontier((50, Subtime(2))),
1181            ],
1182            false,
1183        );
1184
1185        backpressure_runner(
1186            vec![
1187                (50, Part(10)),
1188                (50, Part(10)),
1189                (51, Part(35)),
1190                (52, Part(100)),
1191            ],
1192            50,
1193            (0, Subtime(1)),
1194            vec![
1195                // we can emit 3 parts before we hit the backpressure limit
1196                AssertOutputFrontier((51, Subtime(3))),
1197                AssertBackpressured {
1198                    frontier: (0, Subtime(1)),
1199                    inflight_parts: 3,
1200                    retired_parts: 0,
1201                },
1202                AssertBackpressured {
1203                    frontier: (50, Subtime(1)),
1204                    inflight_parts: 3,
1205                    retired_parts: 0,
1206                },
1207                // Retire the single part.
1208                ProcessXParts(1),
1209                AssertBackpressured {
1210                    frontier: (50, Subtime(2)),
1211                    inflight_parts: 3,
1212                    retired_parts: 1,
1213                },
1214                // Ensure we make progress, and then
1215                // can retire the next 2 parts.
1216                AssertOutputFrontier((52, Subtime(4))),
1217                ProcessXParts(2),
1218                AssertBackpressured {
1219                    frontier: (52, Subtime(4)),
1220                    inflight_parts: 3,
1221                    retired_parts: 2,
1222                },
1223            ],
1224            false,
1225        );
1226    }
1227
1228    type Time = (u64, Subtime);
1229    #[derive(Clone, Debug)]
1230    struct Part(usize);
1231    impl Backpressureable for Part {
1232        fn byte_size(&self) -> usize {
1233            self.0
1234        }
1235    }
1236
1237    /// Actions taken by `backpressure_runner`.
1238    enum Step {
1239        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1240        /// this far. This is a single time because we assume
1241        AssertOutputFrontier(Time),
1242        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1243        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1244        /// retired.
1245        AssertBackpressured {
1246            frontier: Time,
1247            inflight_parts: usize,
1248            retired_parts: usize,
1249        },
1250        /// Process X parts in the downstream operator. This affects the feedback frontier.
1251        ProcessXParts(usize),
1252    }
1253
1254    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1255    fn backpressure_runner(
1256        // The input data to the `backpressure` operator
1257        input: Vec<(u64, Part)>,
1258        // The maximum inflight bytes the `backpressure` operator allows through.
1259        max_inflight_bytes: usize,
1260        // The feedback summary used by the `backpressure` operator.
1261        summary: Time,
1262        // List of steps to run through.
1263        steps: Vec<Step>,
1264        // Whether or not to consume records in the non-granular scope. This is useful when the
1265        // `summary` is something like `(1, 0)`.
1266        non_granular_consumer: bool,
1267    ) {
1268        timely::execute::execute_directly(move |worker| {
1269            let (
1270                backpressure_probe,
1271                consumer_tx,
1272                mut backpressure_status_rx,
1273                finalizer_tx,
1274                _token,
1275            ) =
1276                // Set up the top-level non-granular scope.
1277                worker.dataflow::<u64, _, _>(|outer_scope| {
1278                    let (non_granular_feedback_handle, non_granular_feedback) =
1279                        if non_granular_consumer {
1280                            let (h, f) = outer_scope.feedback(Default::default());
1281                            (Some(h), Some(f))
1282                        } else {
1283                            (None, None)
1284                        };
1285                    let (
1286                        backpressure_probe,
1287                        consumer_tx,
1288                        backpressure_status_rx,
1289                        token,
1290                        backpressured,
1291                        finalizer_tx,
1292                    ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1293                        let (input, finalizer_tx) =
1294                            iterator_operator(scope.clone(), input.into_iter());
1295
1296                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1297                            (
1298                                FlowControl {
1299                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1300                                    max_inflight_bytes,
1301                                    summary,
1302                                    metrics: None
1303                                },
1304                                None,
1305                            )
1306                        } else {
1307                            let (granular_feedback_handle, granular_feedback) =
1308                                scope.feedback(Default::default());
1309                            (
1310                                FlowControl {
1311                                    progress_stream: granular_feedback,
1312                                    max_inflight_bytes,
1313                                    summary,
1314                                    metrics: None,
1315                                },
1316                                Some(granular_feedback_handle),
1317                            )
1318                        };
1319
1320                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1321
1322                        let (backpressured, token) = backpressure(
1323                            scope,
1324                            "test",
1325                            input,
1326                            flow_control,
1327                            0,
1328                            Some(backpressure_status_tx),
1329                        );
1330
1331                        // If we want to granularly consume the output, we setup the consumer here.
1332                        let tx = if !non_granular_consumer {
1333                            Some(consumer_operator(
1334                                scope.clone(),
1335                                backpressured.clone(),
1336                                granular_feedback_handle.unwrap(),
1337                            ))
1338                        } else {
1339                            None
1340                        };
1341
1342                        let (probe_handle, backpressured) = backpressured.probe();
1343                        (
1344                            probe_handle,
1345                            tx,
1346                            backpressure_status_rx,
1347                            token,
1348                            backpressured.leave(outer_scope),
1349                            finalizer_tx,
1350                        )
1351                    });
1352
1353                    // If we want to non-granularly consume the output, we setup the consumer here.
1354                    let consumer_tx = if non_granular_consumer {
1355                        consumer_operator(
1356                            outer_scope.clone(),
1357                            backpressured,
1358                            non_granular_feedback_handle.unwrap(),
1359                        )
1360                    } else {
1361                        consumer_tx.unwrap()
1362                    };
1363
1364                    (
1365                        backpressure_probe,
1366                        consumer_tx,
1367                        backpressure_status_rx,
1368                        finalizer_tx,
1369                        token,
1370                    )
1371                });
1372
1373            use Step::*;
1374            for step in steps {
1375                match step {
1376                    AssertOutputFrontier(time) => {
1377                        eprintln!("checking advance to {time:?}");
1378                        backpressure_probe.with_frontier(|front| {
1379                            eprintln!("current backpressure output frontier: {front:?}");
1380                        });
1381                        while backpressure_probe.less_than(&time) {
1382                            worker.step();
1383                            backpressure_probe.with_frontier(|front| {
1384                                eprintln!("current backpressure output frontier: {front:?}");
1385                            });
1386                            std::thread::sleep(std::time::Duration::from_millis(25));
1387                        }
1388                    }
1389                    ProcessXParts(parts) => {
1390                        eprintln!("processing {parts:?} parts");
1391                        for _ in 0..parts {
1392                            consumer_tx.send(()).unwrap();
1393                        }
1394                    }
1395                    AssertBackpressured {
1396                        frontier,
1397                        inflight_parts,
1398                        retired_parts,
1399                    } => {
1400                        let frontier = Antichain::from_elem(frontier);
1401                        eprintln!(
1402                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1403                            and {retired_parts:?} retired"
1404                        );
1405                        let (new_frontier, new_count, new_retired_count) = loop {
1406                            if let Ok(val) = backpressure_status_rx.try_recv() {
1407                                break val;
1408                            }
1409                            worker.step();
1410                            std::thread::sleep(std::time::Duration::from_millis(25));
1411                        };
1412                        assert_eq!(
1413                            (frontier, inflight_parts, retired_parts),
1414                            (new_frontier, new_count, new_retired_count)
1415                        );
1416                    }
1417                }
1418            }
1419            // Send the input to the empty frontier.
1420            let _ = finalizer_tx.send(());
1421        });
1422    }
1423
1424    /// An operator that emits `Part`'s at the specified timestamps. Does not
1425    /// drop its capability until it gets a signal from the `Sender` it returns.
1426    fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1427        scope: Scope<'scope, (u64, Subtime)>,
1428        mut input: I,
1429    ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1430        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1431        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1432        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1433
1434        iterator.build(|mut caps| async move {
1435            let mut capability = Some(caps.pop().unwrap());
1436            let mut last = None;
1437            while let Some(element) = input.next() {
1438                let time = element.0.clone();
1439                let part = element.1;
1440                last = Some((time, Subtime(0)));
1441                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1442            }
1443            if let Some(last) = last {
1444                capability
1445                    .as_mut()
1446                    .unwrap()
1447                    .downgrade(&(last.0 + 1, last.1));
1448            }
1449
1450            let _ = finalizer_rx.await;
1451            capability.take();
1452        });
1453
1454        (output, finalizer_tx)
1455    }
1456
1457    /// An operator that consumes its input ONLY when given a signal to do from
1458    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1459    /// being processed. Also connects the `feedback` handle to its output.
1460    fn consumer_operator<
1461        'scope,
1462        T: timely::progress::Timestamp,
1463        O: Backpressureable + std::fmt::Debug,
1464    >(
1465        scope: Scope<'scope, T>,
1466        input: StreamVec<'scope, T, O>,
1467        feedback: timely::dataflow::operators::feedback::Handle<
1468            'scope,
1469            T,
1470            Vec<std::convert::Infallible>,
1471        >,
1472    ) -> UnboundedSender<()> {
1473        let (tx, mut rx) = unbounded_channel::<()>();
1474        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1475        let (output_handle, output) =
1476            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1477        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1478
1479        consumer.build(|_caps| async move {
1480            while let Some(()) = rx.recv().await {
1481                // Consume exactly one messages (unless the input is exhausted).
1482                while let Some(Event::Progress(_)) = input.next().await {}
1483            }
1484        });
1485        output.connect_loop(feedback);
1486
1487        tx
1488    }
1489}