Skip to main content

mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, EvalError, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_ore::str::redact;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::cfg::{PersistConfig, RetryParameters};
28use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
29use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
30use mz_persist_client::operators::shard_source::{
31    ErrorHandler, FilterResult, SnapshotMode, shard_source,
32};
33use mz_persist_client::stats::STATS_AUDIT_PANIC;
34use mz_persist_types::Codec64;
35use mz_persist_types::codec_impls::UnitSchema;
36use mz_persist_types::columnar::{ColumnEncoder, Schema};
37use mz_repr::{
38    Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
39};
40use mz_storage_types::StorageDiff;
41use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
42use mz_storage_types::errors::DataflowError;
43use mz_storage_types::sources::SourceData;
44use mz_storage_types::stats::RelationPartStats;
45use mz_timely_util::builder_async::{
46    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
47};
48use mz_timely_util::probe::ProbeNotify;
49use mz_txn_wal::operator::{TxnsContext, txns_progress};
50use serde::{Deserialize, Serialize};
51use timely::PartialOrder;
52use timely::container::CapacityContainerBuilder;
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
56use timely::dataflow::operators::{Capability, Leave, OkErr};
57use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
58use timely::dataflow::{Scope, Stream, StreamVec};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::{error, trace};
66
67use crate::metrics::BackpressureMetrics;
68
69/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
70/// progress than would otherwise be possible.
71///
72/// This is "opaque" since we'd like to reserve the right to change the definition in the future
73/// without downstreams being able to rely on the precise representation. (At the moment, this
74/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
75/// in the future.)
76#[derive(
77    Copy,
78    Clone,
79    PartialEq,
80    Default,
81    Eq,
82    PartialOrd,
83    Ord,
84    Debug,
85    Serialize,
86    Deserialize,
87    Hash,
88    columnar::Columnar
89)]
90// Derive ordering on the generated `SubtimeReference` too: the paged merge
91// batcher sorts the `(Timestamp, Subtime)` time and so requires `Ref: Ord`.
92#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))]
93pub struct Subtime(u64);
94
95impl PartialOrder for Subtime {
96    fn less_equal(&self, other: &Self) -> bool {
97        self.0.less_equal(&other.0)
98    }
99}
100
101impl TotalOrder for Subtime {}
102
103impl PathSummary<Subtime> for Subtime {
104    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
105        self.0.results_in(&src.0).map(Subtime)
106    }
107
108    fn followed_by(&self, other: &Self) -> Option<Self> {
109        self.0.followed_by(&other.0).map(Subtime)
110    }
111}
112
113impl TimelyTimestamp for Subtime {
114    type Summary = Subtime;
115
116    fn minimum() -> Self {
117        Subtime(0)
118    }
119}
120
121impl columnation::Columnation for Subtime {
122    type InnerRegion = columnation::CopyRegion<Subtime>;
123}
124
125impl differential_dataflow::lattice::Lattice for Subtime {
126    fn join(&self, other: &Self) -> Self {
127        Subtime(std::cmp::max(self.0, other.0))
128    }
129    fn meet(&self, other: &Self) -> Self {
130        Subtime(std::cmp::min(self.0, other.0))
131    }
132}
133
134impl differential_dataflow::lattice::Maximum for Subtime {
135    fn maximum() -> Self {
136        Subtime(u64::MAX)
137    }
138}
139
140impl Subtime {
141    /// The smallest non-zero summary for the opaque timestamp type.
142    pub const fn least_summary() -> Self {
143        Subtime(1)
144    }
145}
146
147/// Creates a new source that reads from a persist shard, distributing the work
148/// of reading data to all timely workers.
149///
150/// All times emitted will have been [advanced by] the given `as_of` frontier.
151/// All updates at times greater or equal to `until` will be suppressed.
152/// The `map_filter_project` argument, if supplied, may be partially applied,
153/// and any un-applied part of the argument will be left behind in the argument.
154///
155/// Users of this function have the ability to apply flow control to the output
156/// to limit the in-flight data (measured in bytes) it can emit. The flow control
157/// input is a timely stream that communicates the frontier at which the data
158/// emitted from by this source have been dropped.
159///
160/// **Note:** Because this function is reading batches from `persist`, it is working
161/// at batch granularity. In practice, the source will be overshooting the target
162/// flow control upper by an amount that is related to the size of batches.
163///
164/// If no flow control is desired an empty stream whose frontier immediately advances
165/// to the empty antichain can be used. An easy easy of creating such stream is by
166/// using [`timely::dataflow::operators::generic::operator::empty`].
167///
168/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
169pub fn persist_source<'scope, E>(
170    scope: Scope<'scope, mz_repr::Timestamp>,
171    source_id: GlobalId,
172    persist_clients: Arc<PersistClientCache>,
173    txns_ctx: &TxnsContext,
174    metadata: CollectionMetadata,
175    read_schema: Option<RelationDesc>,
176    as_of: Option<Antichain<Timestamp>>,
177    snapshot_mode: SnapshotMode,
178    until: Antichain<Timestamp>,
179    map_filter_project: Option<&mut MfpPlan>,
180    max_inflight_bytes: Option<usize>,
181    start_signal: impl Future<Output = ()> + Send + 'static,
182    error_handler: ErrorHandler,
183) -> (
184    StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
185    StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
186    Vec<PressOnDropButton>,
187)
188where
189    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
190{
191    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
192
193    let mut tokens = vec![];
194
195    let outer = scope.clone();
196    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
197        let (flow_control, flow_control_probe) = match max_inflight_bytes {
198            Some(max_inflight_bytes) => {
199                let backpressure_metrics = BackpressureMetrics {
200                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
201                    last_backpressured_bytes: Arc::clone(
202                        &shard_metrics.backpressure_last_backpressured_bytes,
203                    ),
204                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
205                };
206
207                let probe = mz_timely_util::probe::Handle::default();
208                let progress_stream = mz_timely_util::probe::source(
209                    scope.clone(),
210                    format!("decode_backpressure_probe({source_id})"),
211                    probe.clone(),
212                );
213                let flow_control = FlowControl {
214                    progress_stream,
215                    max_inflight_bytes,
216                    summary: (Default::default(), Subtime::least_summary()),
217                    metrics: Some(backpressure_metrics),
218                };
219                (Some(flow_control), Some(probe))
220            }
221            None => (None, None),
222        };
223
224        // Our default listen sleeps are tuned for the case of a shard that is
225        // written once a second, but txn-wal allows these to be lazy.
226        // Override the tuning to reduce crdb load. The pubsub fallback
227        // responsibility is then replaced by manual "one state" wakeups in the
228        // txns_progress operator.
229        let cfg = Arc::clone(&persist_clients.cfg().configs);
230        let subscribe_sleep = match metadata.txns_shard {
231            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
232            None => None,
233        };
234
235        let (stream, source_tokens) = persist_source_core(
236            outer,
237            scope,
238            source_id,
239            Arc::clone(&persist_clients),
240            metadata.clone(),
241            read_schema,
242            as_of.clone(),
243            snapshot_mode,
244            until.clone(),
245            map_filter_project,
246            flow_control,
247            subscribe_sleep,
248            start_signal,
249            error_handler,
250        );
251        tokens.extend(source_tokens);
252
253        let stream = match flow_control_probe {
254            Some(probe) => stream.probe_notify_with(vec![probe]),
255            None => stream,
256        };
257
258        stream.leave(outer)
259    });
260
261    // If a txns_shard was provided, then this shard is in the txn-wal
262    // system. This means the "logical" upper may be ahead of the "physical"
263    // upper. Render a dataflow operator that passes through the input and
264    // translates the progress frontiers as necessary.
265    let (stream, txns_tokens) = match metadata.txns_shard {
266        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
267            stream,
268            &source_id.to_string(),
269            txns_ctx,
270            move || {
271                let (c, l) = (
272                    Arc::clone(&persist_clients),
273                    metadata.persist_location.clone(),
274                );
275                async move { c.open(l).await.expect("location is valid") }
276            },
277            txns_shard,
278            metadata.data_shard,
279            as_of
280                .expect("as_of is provided for table sources")
281                .into_option()
282                .expect("shard is not closed"),
283            until,
284            Arc::new(metadata.relation_desc),
285            Arc::new(UnitSchema),
286        ),
287        None => (stream, vec![]),
288    };
289    tokens.extend(txns_tokens);
290    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
291        Ok(row) => Ok((row, t.0, r)),
292        Err(err) => Err((err, t.0, r)),
293    });
294    (ok_stream, err_stream, tokens)
295}
296
297type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
298
299/// Creates a new source that reads from a persist shard, distributing the work
300/// of reading data to all timely workers.
301///
302/// All times emitted will have been [advanced by] the given `as_of` frontier.
303///
304/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
305#[allow(clippy::needless_borrow)]
306pub fn persist_source_core<'g, 'outer, E>(
307    outer: Scope<'outer, mz_repr::Timestamp>,
308    scope: RefinedScope<'g, mz_repr::Timestamp>,
309    source_id: GlobalId,
310    persist_clients: Arc<PersistClientCache>,
311    metadata: CollectionMetadata,
312    read_schema: Option<RelationDesc>,
313    as_of: Option<Antichain<Timestamp>>,
314    snapshot_mode: SnapshotMode,
315    until: Antichain<Timestamp>,
316    map_filter_project: Option<&mut MfpPlan>,
317    flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
318    // If Some, an override for the default listen sleep retry parameters.
319    listen_sleep: Option<impl Fn() -> RetryParameters + Send + 'static>,
320    start_signal: impl Future<Output = ()> + Send + 'static,
321    error_handler: ErrorHandler,
322) -> (
323    Stream<
324        'g,
325        (mz_repr::Timestamp, Subtime),
326        Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
327    >,
328    Vec<PressOnDropButton>,
329)
330where
331    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
332{
333    let cfg = persist_clients.cfg().clone();
334    let name = source_id.to_string();
335    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
336
337    // N.B. `read_schema` may be a subset of the total columns for this shard.
338    let read_desc = match read_schema {
339        Some(desc) => desc,
340        None => metadata.relation_desc,
341    };
342
343    let desc_transformer = match flow_control {
344        Some(flow_control) => Some(move |scope, descs, chosen_worker| {
345            let (stream, token) = backpressure(
346                scope,
347                &format!("backpressure({source_id})"),
348                descs,
349                flow_control,
350                chosen_worker,
351                None,
352            );
353            (stream, vec![token])
354        }),
355        None => None,
356    };
357
358    let metrics = Arc::clone(persist_clients.metrics());
359    let filter_name = name.clone();
360    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
361    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
362    // `until ` is the empty antichain.
363    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
364    let (fetched, token) = shard_source(
365        outer,
366        scope,
367        &name,
368        move || {
369            let (c, l) = (
370                Arc::clone(&persist_clients),
371                metadata.persist_location.clone(),
372            );
373            async move { c.open(l).await.unwrap() }
374        },
375        metadata.data_shard,
376        as_of,
377        snapshot_mode,
378        until.clone(),
379        desc_transformer,
380        Arc::new(read_desc.clone()),
381        Arc::new(UnitSchema),
382        move |stats, frontier| {
383            let Some(lower) = frontier.as_option().copied() else {
384                // If the frontier has advanced to the empty antichain,
385                // we'll never emit any rows from any part.
386                return FilterResult::Discard;
387            };
388
389            if lower > upper {
390                // The frontier timestamp is larger than the until of the dataflow:
391                // anything from this part will necessarily be filtered out.
392                return FilterResult::Discard;
393            }
394
395            let time_range =
396                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
397            if let Some(plan) = &filter_plan {
398                let metrics = &metrics.pushdown.part_stats;
399                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
400                filter_result(&read_desc, time_range, stats, plan)
401            } else {
402                FilterResult::Keep
403            }
404        },
405        listen_sleep,
406        start_signal,
407        error_handler,
408    );
409    let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
410    (rows, token)
411}
412
413fn filter_result(
414    relation_desc: &RelationDesc,
415    time_range: ResultSpec,
416    stats: RelationPartStats,
417    plan: &MfpPlan,
418) -> FilterResult {
419    let arena = RowArena::new();
420    let relation = ReprRelationType::from(relation_desc.typ());
421    let mut ranges = ColumnSpecs::new(&relation, &arena);
422    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
423
424    let may_error = stats.err_count().map_or(true, |count| count > 0);
425
426    // N.B. We may have pushed down column "demands" into Persist, so this
427    // relation desc may have a different set of columns than the stats.
428    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
429        let result_spec = stats.col_stats(idx, &arena);
430        ranges.push_column(pos, result_spec);
431    }
432    let result = ranges.mfp_plan_filter(plan).range;
433    let may_error = may_error || result.may_fail();
434    let may_keep = result.may_contain(Datum::True);
435    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
436    if relation_desc.len() == 0 && !may_error && !may_skip {
437        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
438            return FilterResult::Keep;
439        };
440        key.append(&SourceData(Ok(Row::default())));
441        let key = key.finish();
442        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
443            return FilterResult::Keep;
444        };
445        val.append(&());
446        let val = val.finish();
447
448        FilterResult::ReplaceWith {
449            key: Arc::new(key),
450            val: Arc::new(val),
451        }
452    } else if may_error || may_keep {
453        FilterResult::Keep
454    } else {
455        FilterResult::Discard
456    }
457}
458
459pub fn decode_and_mfp<'scope, E>(
460    cfg: PersistConfig,
461    fetched: StreamVec<
462        'scope,
463        (mz_repr::Timestamp, Subtime),
464        FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
465    >,
466    name: &str,
467    until: Antichain<Timestamp>,
468    mut map_filter_project: Option<&mut MfpPlan>,
469) -> StreamVec<
470    'scope,
471    (mz_repr::Timestamp, Subtime),
472    (Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff),
473>
474where
475    E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
476{
477    let scope = fetched.scope();
478    let mut builder = OperatorBuilder::new(
479        format!("persist_source::decode_and_mfp({})", name),
480        scope.clone(),
481    );
482    let operator_info = builder.operator_info();
483
484    let mut fetched_input = builder.new_input(fetched, Pipeline);
485    let (updates_output, updates_stream) = builder.new_output();
486    let mut updates_output = OutputBuilder::from(updates_output);
487
488    // Re-used state for processing and building rows.
489    let mut datum_vec = mz_repr::DatumVec::new();
490    let mut row_builder = Row::default();
491
492    // Extract the MFP if it exists; leave behind an identity MFP in that case.
493    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
494
495    builder.build(move |_caps| {
496        let name = name.to_owned();
497        // Acquire an activator to reschedule the operator when it has unfinished work.
498        let activations = scope.activations();
499        let activator = Activator::new(operator_info.address, activations);
500        // Maintain a list of work to do
501        let mut pending_work = std::collections::VecDeque::new();
502        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
503
504        move |_frontier| {
505            fetched_input.for_each(|time, data| {
506                let capability = time.retain(0);
507                for fetched_blob in data.drain(..) {
508                    pending_work.push_back(PendingWork {
509                        panic_on_audit_failure: panic_on_audit_failure.get(),
510                        capability: capability.clone(),
511                        part: PendingPart::Unparsed(fetched_blob),
512                    })
513                }
514            });
515
516            // Get dyncfg values once per schedule to amortize the cost of
517            // loading the atomics.
518            let yield_fuel = cfg.storage_source_decode_fuel();
519            let yield_fn = |_, work| work >= yield_fuel;
520
521            let mut work = 0;
522            let start_time = Instant::now();
523            let mut output = updates_output.activate();
524            while !pending_work.is_empty() && !yield_fn(start_time, work) {
525                let done = pending_work.front_mut().unwrap().do_work(
526                    &mut work,
527                    &name,
528                    start_time,
529                    yield_fn,
530                    &until,
531                    map_filter_project.as_ref(),
532                    &mut datum_vec,
533                    &mut row_builder,
534                    &mut output,
535                );
536                if done {
537                    pending_work.pop_front();
538                }
539            }
540            if !pending_work.is_empty() {
541                activator.activate();
542            }
543        }
544    });
545
546    updates_stream
547}
548
549/// Pending work to read from fetched parts
550struct PendingWork {
551    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
552    panic_on_audit_failure: bool,
553    /// The time at which the work should happen.
554    capability: Capability<(mz_repr::Timestamp, Subtime)>,
555    /// Pending fetched part.
556    part: PendingPart,
557}
558
559enum PendingPart {
560    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
561    Parsed {
562        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
563    },
564}
565
566impl PendingPart {
567    /// Returns the contained `FetchedPart`, first parsing it from a
568    /// `FetchedBlob` if necessary.
569    ///
570    /// Also returns a bool, which is true if the part is known (from pushdown
571    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
572    /// is known to contain errors or if it's unknown.
573    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
574        match self {
575            PendingPart::Unparsed(x) => {
576                *self = PendingPart::Parsed { part: x.parse() };
577                // Won't recurse any further.
578                self.part_mut()
579            }
580            PendingPart::Parsed { part } => &mut part.part,
581        }
582    }
583}
584
585impl PendingWork {
586    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
587    /// `yield_fn` whether more fuel is available.
588    fn do_work<YFn, E>(
589        &mut self,
590        work: &mut usize,
591        name: &str,
592        start_time: Instant,
593        yield_fn: YFn,
594        until: &Antichain<Timestamp>,
595        map_filter_project: Option<&MfpPlan>,
596        datum_vec: &mut DatumVec,
597        row_builder: &mut Row,
598        output: &mut OutputBuilderSession<
599            '_,
600            (mz_repr::Timestamp, Subtime),
601            ConsolidatingContainerBuilder<
602                Vec<(Result<Row, E>, (mz_repr::Timestamp, Subtime), Diff)>,
603            >,
604        >,
605    ) -> bool
606    where
607        YFn: Fn(Instant, usize) -> bool,
608        E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
609    {
610        let mut session = output.session_with_builder(&self.capability);
611        let fetched_part = self.part.part_mut();
612        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
613        let mut row_buf = None;
614        while let Some(((key, val), time, diff)) =
615            fetched_part.next_with_storage(&mut row_buf, &mut None)
616        {
617            if until.less_equal(&time) {
618                continue;
619            }
620            match (key, val) {
621                (SourceData(Ok(row)), ()) => {
622                    if let Some(mfp) = map_filter_project {
623                        // We originally accounted work as the number of outputs, to give downstream
624                        // operators a chance to reduce down anything we've emitted. This mfp call
625                        // might have a restrictive filter, which would have been counted as no
626                        // work. However, in practice, we've been decode_and_mfp be a source of
627                        // interactivity loss during rehydration, so we now also count each mfp
628                        // evaluation against our fuel.
629                        *work += 1;
630                        let arena = mz_repr::RowArena::new();
631                        let mut datums_local = datum_vec.borrow_with(&row);
632                        for result in mfp.evaluate(
633                            &mut datums_local,
634                            &arena,
635                            time,
636                            diff.into(),
637                            |time| !until.less_equal(time),
638                            row_builder,
639                        ) {
640                            // Earlier we decided this Part doesn't need to be fetched, but to
641                            // audit our logic we fetched it any way. If the MFP returned data it
642                            // means our earlier decision to not fetch this part was incorrect.
643                            if let Some(stats) = &is_filter_pushdown_audit {
644                                // NB: The tag added by this scope is used for alerting. The panic
645                                // message may be changed arbitrarily, but the tag key and val must
646                                // stay the same.
647                                sentry::with_scope(
648                                    |scope| {
649                                        scope
650                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
651                                    },
652                                    || {
653                                        error!(
654                                            ?stats,
655                                            name,
656                                            mfp = ?redact(&mfp),
657                                            result = ?redact(&result),
658                                            "persist filter pushdown correctness violation!"
659                                        );
660                                        if self.panic_on_audit_failure {
661                                            panic!(
662                                                "persist filter pushdown correctness violation! {}",
663                                                name
664                                            );
665                                        }
666                                    },
667                                );
668                            }
669                            match result {
670                                Ok((row, time, diff)) => {
671                                    // Additional `until` filtering due to temporal filters.
672                                    if !until.less_equal(&time) {
673                                        let mut emit_time = *self.capability.time();
674                                        emit_time.0 = time;
675                                        session.give((Ok(row), emit_time, diff));
676                                        *work += 1;
677                                    }
678                                }
679                                Err((err, time, diff)) => {
680                                    // Additional `until` filtering due to temporal filters.
681                                    if !until.less_equal(&time) {
682                                        let mut emit_time = *self.capability.time();
683                                        emit_time.0 = time;
684                                        session.give((Err(err), emit_time, diff));
685                                        *work += 1;
686                                    }
687                                }
688                            }
689                        }
690                        // At the moment, this is the only case where we can re-use the allocs for
691                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
692                        // operator used a different container than `Vec<Row>`.
693                        drop(datums_local);
694                        row_buf.replace(SourceData(Ok(row)));
695                    } else {
696                        let mut emit_time = *self.capability.time();
697                        emit_time.0 = time;
698                        // Clone row so we retain our row allocation.
699                        session.give((Ok(row.clone()), emit_time, diff.into()));
700                        row_buf.replace(SourceData(Ok(row)));
701                        *work += 1;
702                    }
703                }
704                (SourceData(Err(err)), ()) => {
705                    let mut emit_time = *self.capability.time();
706                    emit_time.0 = time;
707                    session.give((Err(E::from(err)), emit_time, diff.into()));
708                    *work += 1;
709                }
710            }
711            if yield_fn(start_time, *work) {
712                return false;
713            }
714        }
715        true
716    }
717}
718
719/// A trait representing a type that can be used in `backpressure`.
720pub trait Backpressureable: Clone + 'static {
721    /// Return the weight of the object, in bytes.
722    fn byte_size(&self) -> usize;
723}
724
725impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
726    fn byte_size(&self) -> usize {
727        self.1.encoded_size_bytes()
728    }
729}
730
731/// Flow control configuration.
732#[derive(Debug)]
733pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
734    /// Stream providing in-flight frontier updates.
735    ///
736    /// As implied by its type, this stream never emits data, only progress updates.
737    ///
738    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
739    pub progress_stream: StreamVec<'scope, T, Infallible>,
740    /// Maximum number of in-flight bytes.
741    pub max_inflight_bytes: usize,
742    /// The minimum range of timestamps (be they granular or not) that must be emitted,
743    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
744    pub summary: T::Summary,
745
746    /// Optional metrics for the `backpressure` operator to keep up-to-date.
747    pub metrics: Option<BackpressureMetrics>,
748}
749
750/// Apply flow control to the `data` input, based on the given `FlowControl`.
751///
752/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
753/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
754/// and a `summary`. Note that the `data` input expects all the second part of the tuple
755/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
756///
757/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
758/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
759/// `max_inflight_bytes`.
760///
761/// The implementation of this operator is very subtle. Many inline comments have been added.
762pub fn backpressure<'scope, T, O>(
763    scope: Scope<'scope, (T, Subtime)>,
764    name: &str,
765    data: StreamVec<'scope, (T, Subtime), O>,
766    flow_control: FlowControl<'scope, (T, Subtime)>,
767    chosen_worker: usize,
768    // A probe used to inspect this operator during unit-testing
769    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
770) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
771where
772    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
773    O: Backpressureable + std::fmt::Debug,
774{
775    let worker_index = scope.index();
776
777    let (flow_control_stream, flow_control_max_bytes, metrics) = (
778        flow_control.progress_stream,
779        flow_control.max_inflight_bytes,
780        flow_control.metrics,
781    );
782
783    // Both the `flow_control` input and the data input are disconnected from the output. We manually
784    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
785    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
786    // non-circular fashion.
787    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
788    flow_control_stream.connect_loop(handle);
789
790    let mut builder = AsyncOperatorBuilder::new(
791        format!("persist_source_backpressure({})", name),
792        scope.clone(),
793    );
794    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
795
796    let mut data_input = builder.new_disconnected_input(data, Pipeline);
797    let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
798
799    // Helper method used to synthesize current and next frontier for ordered times.
800    fn synthesize_frontiers<T: PartialOrder + Clone>(
801        mut frontier: Antichain<(T, Subtime)>,
802        mut time: (T, Subtime),
803        part_number: &mut u64,
804    ) -> (
805        (T, Subtime),
806        Antichain<(T, Subtime)>,
807        Antichain<(T, Subtime)>,
808    ) {
809        let mut next_frontier = frontier.clone();
810        time.1 = Subtime(*part_number);
811        frontier.insert(time.clone());
812        *part_number += 1;
813        let mut next_time = time.clone();
814        next_time.1 = Subtime(*part_number);
815        next_frontier.insert(next_time);
816        (time, frontier, next_frontier)
817    }
818
819    // _Refine_ the data stream by amending the second input with the part number. This also
820    // ensures that we order the parts by time.
821    let data_input = async_stream::stream!({
822        let mut part_number = 0;
823        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
824        loop {
825            match data_input.next().await {
826                None => {
827                    let empty = Antichain::new();
828                    parts.sort_by_key(|val| val.0.clone());
829                    for (part_time, d) in parts.drain(..) {
830                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
831                            empty.clone(),
832                            part_time.clone(),
833                            &mut part_number,
834                        );
835                        yield Either::Right((part_time, d, frontier, next_frontier))
836                    }
837                    break;
838                }
839                Some(Event::Data(time, data)) => {
840                    for d in data {
841                        parts.push((time.clone(), d));
842                    }
843                }
844                Some(Event::Progress(prog)) => {
845                    parts.sort_by_key(|val| val.0.clone());
846                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
847                        let (part_time, frontier, next_frontier) =
848                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
849                        yield Either::Right((part_time, d, frontier, next_frontier))
850                    }
851                    yield Either::Left(prog)
852                }
853            }
854        }
855    });
856    let shutdown_button = builder.build(move |caps| async move {
857        // The output capability.
858        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
859
860        // The frontier of our output. This matches the `CapabilitySet` above.
861        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
862        // The frontier of the `flow_control` input.
863        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
864
865        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
866        let mut inflight_parts = Vec::new();
867        // Parts we have not yet emitted, but do participate in the `input_frontier`.
868        let mut pending_parts = std::collections::VecDeque::new();
869
870        // Only one worker is responsible for distributing parts
871        if worker_index != chosen_worker {
872            trace!(
873                "We are not the chosen worker ({}), exiting...",
874                chosen_worker
875            );
876            return;
877        }
878        tokio::pin!(data_input);
879        'emitting_parts: loop {
880            // At the beginning of our main loop, we determine the total size of
881            // inflight parts.
882            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
883
884            // There are 2 main cases where we can continue to emit parts:
885            // - The total emitted bytes is less than `flow_control_max_bytes`.
886            // - The output frontier is not beyond the `flow_control_frontier`
887            //
888            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
889            // block, as we wait for progress tracking to keep the `flow_control` frontier
890            // up-to-date. This is tested in unit-tests.
891            if inflight_bytes < flow_control_max_bytes
892                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
893            {
894                let (time, part, next_frontier) =
895                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
896                        (time, part, next_frontier)
897                    } else {
898                        match data_input.next().await {
899                            Some(Either::Right((time, part, frontier, next_frontier))) => {
900                                // Downgrade the output frontier to this part's time. This is useful
901                                // "close" timestamp's from previous parts, even if we don't yet
902                                // emit this part. Note that this is safe because `data_input` ensures
903                                // time-ordering.
904                                output_frontier = frontier;
905                                cap_set.downgrade(output_frontier.iter());
906
907                                // If the most recent value's time is _beyond_ the
908                                // `flow_control` frontier (which takes into account the `summary`), we
909                                // have emitted an entire `summary` worth of data, and can store this
910                                // value for later.
911                                if inflight_bytes >= flow_control_max_bytes
912                                    && !PartialOrder::less_than(
913                                        &output_frontier,
914                                        &flow_control_frontier,
915                                    )
916                                {
917                                    pending_parts.push_back((time, part, next_frontier));
918                                    continue 'emitting_parts;
919                                }
920                                (time, part, next_frontier)
921                            }
922                            Some(Either::Left(prog)) => {
923                                output_frontier = prog;
924                                cap_set.downgrade(output_frontier.iter());
925                                continue 'emitting_parts;
926                            }
927                            None => {
928                                if pending_parts.is_empty() {
929                                    break 'emitting_parts;
930                                } else {
931                                    continue 'emitting_parts;
932                                }
933                            }
934                        }
935                    };
936
937                let byte_size = part.byte_size();
938                // Store the value with the _frontier_ the `flow_control_input` must reach
939                // to retire it. Note that if this `results_in` is `None`, then we
940                // are at `T::MAX`, and give up on flow_control entirely.
941                //
942                // SUBTLE: If we stop storing these parts, we will likely never check the
943                // `flow_control_input` ever again. This won't pile up data as that input
944                // only has frontier updates. There may be spurious activations from it though.
945                //
946                // Also note that we don't attempt to handle overflowing the `u64` part counter.
947                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
948                    inflight_parts.push((emission_ts, byte_size));
949                }
950
951                // Emit the data at the given time, and update the frontier and capabilities
952                // to just beyond the part.
953                data_output.give(&cap_set.delayed(&time), part);
954
955                if let Some(metrics) = &metrics {
956                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
957                }
958
959                output_frontier = next_frontier;
960                cap_set.downgrade(output_frontier.iter())
961            } else {
962                if let Some(metrics) = &metrics {
963                    metrics
964                        .last_backpressured_bytes
965                        .set(u64::cast_from(inflight_bytes))
966                }
967                let parts_count = inflight_parts.len();
968                // We've exhausted our budget, listen for updates to the flow_control
969                // input's frontier until we free up new budget. If we don't interact with
970                // with this side of the if statement, because the stream has no data, we
971                // don't cause unbounded buffering in timely.
972                let new_flow_control_frontier = match flow_control_input.next().await {
973                    Some(Event::Progress(frontier)) => frontier,
974                    Some(Event::Data(_, _)) => {
975                        unreachable!("flow_control_input should not contain data")
976                    }
977                    None => Antichain::new(),
978                };
979
980                // Update the `flow_control_frontier` if its advanced.
981                flow_control_frontier.clone_from(&new_flow_control_frontier);
982
983                // Retire parts that are processed downstream.
984                let retired_parts = inflight_parts
985                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
986                let (retired_size, retired_count): (usize, usize) = retired_parts
987                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
988                        (accum_size + size, accum_count + 1)
989                    });
990                trace!(
991                    "returning {} parts with {} bytes, frontier: {:?}",
992                    retired_count, retired_size, flow_control_frontier,
993                );
994
995                if let Some(metrics) = &metrics {
996                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
997                }
998
999                // Optionally emit some information for tests to examine.
1000                if let Some(probe) = probe.as_ref() {
1001                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
1002                }
1003            }
1004        }
1005    });
1006    (data_stream, shutdown_button.press_on_drop())
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use timely::container::CapacityContainerBuilder;
1012    use timely::dataflow::operators::{Enter, Probe};
1013    use tokio::sync::mpsc::unbounded_channel;
1014    use tokio::sync::oneshot;
1015
1016    use super::*;
1017
1018    #[mz_ore::test]
1019    fn test_backpressure_non_granular() {
1020        use Step::*;
1021        backpressure_runner(
1022            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1023            100,
1024            (1, Subtime(0)),
1025            vec![
1026                // Assert we backpressure only after we have emitted
1027                // the entire timestamp.
1028                AssertOutputFrontier((50, Subtime(2))),
1029                AssertBackpressured {
1030                    frontier: (1, Subtime(0)),
1031                    inflight_parts: 1,
1032                    retired_parts: 0,
1033                },
1034                AssertBackpressured {
1035                    frontier: (51, Subtime(0)),
1036                    inflight_parts: 1,
1037                    retired_parts: 0,
1038                },
1039                ProcessXParts(2),
1040                AssertBackpressured {
1041                    frontier: (101, Subtime(0)),
1042                    inflight_parts: 2,
1043                    retired_parts: 2,
1044                },
1045                // Assert we make later progress once processing
1046                // the parts.
1047                AssertOutputFrontier((100, Subtime(3))),
1048            ],
1049            true,
1050        );
1051
1052        backpressure_runner(
1053            vec![
1054                (50, Part(10)),
1055                (50, Part(10)),
1056                (51, Part(100)),
1057                (52, Part(1000)),
1058            ],
1059            50,
1060            (1, Subtime(0)),
1061            vec![
1062                // Assert we backpressure only after we emitted enough bytes
1063                AssertOutputFrontier((51, Subtime(3))),
1064                AssertBackpressured {
1065                    frontier: (1, Subtime(0)),
1066                    inflight_parts: 3,
1067                    retired_parts: 0,
1068                },
1069                ProcessXParts(3),
1070                AssertBackpressured {
1071                    frontier: (52, Subtime(0)),
1072                    inflight_parts: 3,
1073                    retired_parts: 2,
1074                },
1075                AssertBackpressured {
1076                    frontier: (53, Subtime(0)),
1077                    inflight_parts: 1,
1078                    retired_parts: 1,
1079                },
1080                // Assert we make later progress once processing
1081                // the parts.
1082                AssertOutputFrontier((52, Subtime(4))),
1083            ],
1084            true,
1085        );
1086
1087        backpressure_runner(
1088            vec![
1089                (50, Part(98)),
1090                (50, Part(1)),
1091                (51, Part(10)),
1092                (52, Part(100)),
1093                // Additional parts at the same timestamp
1094                (52, Part(10)),
1095                (52, Part(10)),
1096                (52, Part(10)),
1097                (52, Part(100)),
1098                // A later part with a later ts.
1099                (100, Part(100)),
1100            ],
1101            100,
1102            (1, Subtime(0)),
1103            vec![
1104                AssertOutputFrontier((51, Subtime(3))),
1105                // Assert we backpressure after we have emitted enough bytes.
1106                // We assert twice here because we get updates as
1107                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1108                AssertBackpressured {
1109                    frontier: (1, Subtime(0)),
1110                    inflight_parts: 3,
1111                    retired_parts: 0,
1112                },
1113                AssertBackpressured {
1114                    frontier: (51, Subtime(0)),
1115                    inflight_parts: 3,
1116                    retired_parts: 0,
1117                },
1118                ProcessXParts(1),
1119                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1120                // 50.
1121                AssertOutputFrontier((51, Subtime(3))),
1122                // After we process all of `50`, we can start emitting data at `52`, but only until
1123                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1124                // all of `51`.
1125                ProcessXParts(1),
1126                AssertOutputFrontier((52, Subtime(4))),
1127                AssertBackpressured {
1128                    frontier: (52, Subtime(0)),
1129                    inflight_parts: 3,
1130                    retired_parts: 2,
1131                },
1132                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1133                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1134                // time `100`.
1135                ProcessXParts(1),
1136                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1137                // parts at `52`
1138                // This is an intermediate state.
1139                AssertBackpressured {
1140                    frontier: (53, Subtime(0)),
1141                    inflight_parts: 2,
1142                    retired_parts: 1,
1143                },
1144                // After we process all of `52`, we can continue to the next time.
1145                ProcessXParts(5),
1146                AssertBackpressured {
1147                    frontier: (101, Subtime(0)),
1148                    inflight_parts: 5,
1149                    retired_parts: 5,
1150                },
1151                AssertOutputFrontier((100, Subtime(9))),
1152            ],
1153            true,
1154        );
1155    }
1156
1157    #[mz_ore::test]
1158    fn test_backpressure_granular() {
1159        use Step::*;
1160        backpressure_runner(
1161            vec![(50, Part(101)), (50, Part(101))],
1162            100,
1163            (0, Subtime(1)),
1164            vec![
1165                // Advance our frontier to outputting a single part.
1166                AssertOutputFrontier((50, Subtime(1))),
1167                // Receive backpressure updates until our frontier is up-to-date but
1168                // not beyond the parts (while considering the summary).
1169                AssertBackpressured {
1170                    frontier: (0, Subtime(1)),
1171                    inflight_parts: 1,
1172                    retired_parts: 0,
1173                },
1174                AssertBackpressured {
1175                    frontier: (50, Subtime(1)),
1176                    inflight_parts: 1,
1177                    retired_parts: 0,
1178                },
1179                // Process that part.
1180                ProcessXParts(1),
1181                // Assert that we clear the backpressure status
1182                AssertBackpressured {
1183                    frontier: (50, Subtime(2)),
1184                    inflight_parts: 1,
1185                    retired_parts: 1,
1186                },
1187                // Ensure we make progress to the next part.
1188                AssertOutputFrontier((50, Subtime(2))),
1189            ],
1190            false,
1191        );
1192
1193        backpressure_runner(
1194            vec![
1195                (50, Part(10)),
1196                (50, Part(10)),
1197                (51, Part(35)),
1198                (52, Part(100)),
1199            ],
1200            50,
1201            (0, Subtime(1)),
1202            vec![
1203                // we can emit 3 parts before we hit the backpressure limit
1204                AssertOutputFrontier((51, Subtime(3))),
1205                AssertBackpressured {
1206                    frontier: (0, Subtime(1)),
1207                    inflight_parts: 3,
1208                    retired_parts: 0,
1209                },
1210                AssertBackpressured {
1211                    frontier: (50, Subtime(1)),
1212                    inflight_parts: 3,
1213                    retired_parts: 0,
1214                },
1215                // Retire the single part.
1216                ProcessXParts(1),
1217                AssertBackpressured {
1218                    frontier: (50, Subtime(2)),
1219                    inflight_parts: 3,
1220                    retired_parts: 1,
1221                },
1222                // Ensure we make progress, and then
1223                // can retire the next 2 parts.
1224                AssertOutputFrontier((52, Subtime(4))),
1225                ProcessXParts(2),
1226                AssertBackpressured {
1227                    frontier: (52, Subtime(4)),
1228                    inflight_parts: 3,
1229                    retired_parts: 2,
1230                },
1231            ],
1232            false,
1233        );
1234    }
1235
1236    type Time = (u64, Subtime);
1237    #[derive(Clone, Debug)]
1238    struct Part(usize);
1239    impl Backpressureable for Part {
1240        fn byte_size(&self) -> usize {
1241            self.0
1242        }
1243    }
1244
1245    /// Actions taken by `backpressure_runner`.
1246    enum Step {
1247        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1248        /// this far. This is a single time because we assume
1249        AssertOutputFrontier(Time),
1250        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1251        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1252        /// retired.
1253        AssertBackpressured {
1254            frontier: Time,
1255            inflight_parts: usize,
1256            retired_parts: usize,
1257        },
1258        /// Process X parts in the downstream operator. This affects the feedback frontier.
1259        ProcessXParts(usize),
1260    }
1261
1262    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1263    fn backpressure_runner(
1264        // The input data to the `backpressure` operator
1265        input: Vec<(u64, Part)>,
1266        // The maximum inflight bytes the `backpressure` operator allows through.
1267        max_inflight_bytes: usize,
1268        // The feedback summary used by the `backpressure` operator.
1269        summary: Time,
1270        // List of steps to run through.
1271        steps: Vec<Step>,
1272        // Whether or not to consume records in the non-granular scope. This is useful when the
1273        // `summary` is something like `(1, 0)`.
1274        non_granular_consumer: bool,
1275    ) {
1276        timely::execute::execute_directly(move |worker| {
1277            let (
1278                backpressure_probe,
1279                consumer_tx,
1280                mut backpressure_status_rx,
1281                finalizer_tx,
1282                _token,
1283            ) =
1284                // Set up the top-level non-granular scope.
1285                worker.dataflow::<u64, _, _>(|outer_scope| {
1286                    let (non_granular_feedback_handle, non_granular_feedback) =
1287                        if non_granular_consumer {
1288                            let (h, f) = outer_scope.feedback(Default::default());
1289                            (Some(h), Some(f))
1290                        } else {
1291                            (None, None)
1292                        };
1293                    let (
1294                        backpressure_probe,
1295                        consumer_tx,
1296                        backpressure_status_rx,
1297                        token,
1298                        backpressured,
1299                        finalizer_tx,
1300                    ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1301                        let (input, finalizer_tx) =
1302                            iterator_operator(scope.clone(), input.into_iter());
1303
1304                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1305                            (
1306                                FlowControl {
1307                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1308                                    max_inflight_bytes,
1309                                    summary,
1310                                    metrics: None
1311                                },
1312                                None,
1313                            )
1314                        } else {
1315                            let (granular_feedback_handle, granular_feedback) =
1316                                scope.feedback(Default::default());
1317                            (
1318                                FlowControl {
1319                                    progress_stream: granular_feedback,
1320                                    max_inflight_bytes,
1321                                    summary,
1322                                    metrics: None,
1323                                },
1324                                Some(granular_feedback_handle),
1325                            )
1326                        };
1327
1328                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1329
1330                        let (backpressured, token) = backpressure(
1331                            scope,
1332                            "test",
1333                            input,
1334                            flow_control,
1335                            0,
1336                            Some(backpressure_status_tx),
1337                        );
1338
1339                        // If we want to granularly consume the output, we setup the consumer here.
1340                        let tx = if !non_granular_consumer {
1341                            Some(consumer_operator(
1342                                scope.clone(),
1343                                backpressured.clone(),
1344                                granular_feedback_handle.unwrap(),
1345                            ))
1346                        } else {
1347                            None
1348                        };
1349
1350                        let (probe_handle, backpressured) = backpressured.probe();
1351                        (
1352                            probe_handle,
1353                            tx,
1354                            backpressure_status_rx,
1355                            token,
1356                            backpressured.leave(outer_scope),
1357                            finalizer_tx,
1358                        )
1359                    });
1360
1361                    // If we want to non-granularly consume the output, we setup the consumer here.
1362                    let consumer_tx = if non_granular_consumer {
1363                        consumer_operator(
1364                            outer_scope.clone(),
1365                            backpressured,
1366                            non_granular_feedback_handle.unwrap(),
1367                        )
1368                    } else {
1369                        consumer_tx.unwrap()
1370                    };
1371
1372                    (
1373                        backpressure_probe,
1374                        consumer_tx,
1375                        backpressure_status_rx,
1376                        finalizer_tx,
1377                        token,
1378                    )
1379                });
1380
1381            use Step::*;
1382            for step in steps {
1383                match step {
1384                    AssertOutputFrontier(time) => {
1385                        eprintln!("checking advance to {time:?}");
1386                        backpressure_probe.with_frontier(|front| {
1387                            eprintln!("current backpressure output frontier: {front:?}");
1388                        });
1389                        while backpressure_probe.less_than(&time) {
1390                            worker.step();
1391                            backpressure_probe.with_frontier(|front| {
1392                                eprintln!("current backpressure output frontier: {front:?}");
1393                            });
1394                            std::thread::sleep(std::time::Duration::from_millis(25));
1395                        }
1396                    }
1397                    ProcessXParts(parts) => {
1398                        eprintln!("processing {parts:?} parts");
1399                        for _ in 0..parts {
1400                            consumer_tx.send(()).unwrap();
1401                        }
1402                    }
1403                    AssertBackpressured {
1404                        frontier,
1405                        inflight_parts,
1406                        retired_parts,
1407                    } => {
1408                        let frontier = Antichain::from_elem(frontier);
1409                        eprintln!(
1410                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1411                            and {retired_parts:?} retired"
1412                        );
1413                        let (new_frontier, new_count, new_retired_count) = loop {
1414                            if let Ok(val) = backpressure_status_rx.try_recv() {
1415                                break val;
1416                            }
1417                            worker.step();
1418                            std::thread::sleep(std::time::Duration::from_millis(25));
1419                        };
1420                        assert_eq!(
1421                            (frontier, inflight_parts, retired_parts),
1422                            (new_frontier, new_count, new_retired_count)
1423                        );
1424                    }
1425                }
1426            }
1427            // Send the input to the empty frontier.
1428            let _ = finalizer_tx.send(());
1429        });
1430    }
1431
1432    /// An operator that emits `Part`'s at the specified timestamps. Does not
1433    /// drop its capability until it gets a signal from the `Sender` it returns.
1434    fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1435        scope: Scope<'scope, (u64, Subtime)>,
1436        mut input: I,
1437    ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1438        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1439        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1440        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1441
1442        iterator.build(|mut caps| async move {
1443            let mut capability = Some(caps.pop().unwrap());
1444            let mut last = None;
1445            while let Some(element) = input.next() {
1446                let time = element.0.clone();
1447                let part = element.1;
1448                last = Some((time, Subtime(0)));
1449                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1450            }
1451            if let Some(last) = last {
1452                capability
1453                    .as_mut()
1454                    .unwrap()
1455                    .downgrade(&(last.0 + 1, last.1));
1456            }
1457
1458            let _ = finalizer_rx.await;
1459            capability.take();
1460        });
1461
1462        (output, finalizer_tx)
1463    }
1464
1465    /// An operator that consumes its input ONLY when given a signal to do from
1466    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1467    /// being processed. Also connects the `feedback` handle to its output.
1468    fn consumer_operator<
1469        'scope,
1470        T: timely::progress::Timestamp,
1471        O: Backpressureable + std::fmt::Debug,
1472    >(
1473        scope: Scope<'scope, T>,
1474        input: StreamVec<'scope, T, O>,
1475        feedback: timely::dataflow::operators::feedback::Handle<
1476            'scope,
1477            T,
1478            Vec<std::convert::Infallible>,
1479        >,
1480    ) -> UnboundedSender<()> {
1481        let (tx, mut rx) = unbounded_channel::<()>();
1482        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1483        let (output_handle, output) =
1484            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1485        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1486
1487        consumer.build(|_caps| async move {
1488            while let Some(()) = rx.recv().await {
1489                // Consume exactly one messages (unless the input is exhausted).
1490                while let Some(Event::Progress(_)) = input.next().await {}
1491            }
1492        });
1493        output.connect_loop(feedback);
1494
1495        tx
1496    }
1497}