Skip to main content

mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30    ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{
37    Datum, DatumVec, Diff, GlobalId, RelationDesc, ReprRelationType, Row, RowArena, Timestamp,
38};
39use mz_storage_types::StorageDiff;
40use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
41use mz_storage_types::errors::DataflowError;
42use mz_storage_types::sources::SourceData;
43use mz_storage_types::stats::RelationPartStats;
44use mz_timely_util::builder_async::{
45    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::probe::ProbeNotify;
48use mz_txn_wal::operator::{TxnsContext, txns_progress};
49use serde::{Deserialize, Serialize};
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
54use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
55use timely::dataflow::operators::{Capability, Leave, OkErr};
56use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
57use timely::dataflow::{Scope, Stream, StreamVec};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
69/// progress than would otherwise be possible.
70///
71/// This is "opaque" since we'd like to reserve the right to change the definition in the future
72/// without downstreams being able to rely on the precise representation. (At the moment, this
73/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
74/// in the future.)
75#[derive(
76    Copy,
77    Clone,
78    PartialEq,
79    Default,
80    Eq,
81    PartialOrd,
82    Ord,
83    Debug,
84    Serialize,
85    Deserialize,
86    Hash
87)]
88pub struct Subtime(u64);
89
90impl PartialOrder for Subtime {
91    fn less_equal(&self, other: &Self) -> bool {
92        self.0.less_equal(&other.0)
93    }
94}
95
96impl TotalOrder for Subtime {}
97
98impl PathSummary<Subtime> for Subtime {
99    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
100        self.0.results_in(&src.0).map(Subtime)
101    }
102
103    fn followed_by(&self, other: &Self) -> Option<Self> {
104        self.0.followed_by(&other.0).map(Subtime)
105    }
106}
107
108impl TimelyTimestamp for Subtime {
109    type Summary = Subtime;
110
111    fn minimum() -> Self {
112        Subtime(0)
113    }
114}
115
116impl differential_dataflow::lattice::Lattice for Subtime {
117    fn join(&self, other: &Self) -> Self {
118        Subtime(std::cmp::max(self.0, other.0))
119    }
120    fn meet(&self, other: &Self) -> Self {
121        Subtime(std::cmp::min(self.0, other.0))
122    }
123}
124
125impl differential_dataflow::lattice::Maximum for Subtime {
126    fn maximum() -> Self {
127        Subtime(u64::MAX)
128    }
129}
130
131impl Subtime {
132    /// The smallest non-zero summary for the opaque timestamp type.
133    pub const fn least_summary() -> Self {
134        Subtime(1)
135    }
136}
137
138/// Creates a new source that reads from a persist shard, distributing the work
139/// of reading data to all timely workers.
140///
141/// All times emitted will have been [advanced by] the given `as_of` frontier.
142/// All updates at times greater or equal to `until` will be suppressed.
143/// The `map_filter_project` argument, if supplied, may be partially applied,
144/// and any un-applied part of the argument will be left behind in the argument.
145///
146/// Users of this function have the ability to apply flow control to the output
147/// to limit the in-flight data (measured in bytes) it can emit. The flow control
148/// input is a timely stream that communicates the frontier at which the data
149/// emitted from by this source have been dropped.
150///
151/// **Note:** Because this function is reading batches from `persist`, it is working
152/// at batch granularity. In practice, the source will be overshooting the target
153/// flow control upper by an amount that is related to the size of batches.
154///
155/// If no flow control is desired an empty stream whose frontier immediately advances
156/// to the empty antichain can be used. An easy easy of creating such stream is by
157/// using [`timely::dataflow::operators::generic::operator::empty`].
158///
159/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
160pub fn persist_source<'scope>(
161    scope: Scope<'scope, mz_repr::Timestamp>,
162    source_id: GlobalId,
163    persist_clients: Arc<PersistClientCache>,
164    txns_ctx: &TxnsContext,
165    metadata: CollectionMetadata,
166    read_schema: Option<RelationDesc>,
167    as_of: Option<Antichain<Timestamp>>,
168    snapshot_mode: SnapshotMode,
169    until: Antichain<Timestamp>,
170    map_filter_project: Option<&mut MfpPlan>,
171    max_inflight_bytes: Option<usize>,
172    start_signal: impl Future<Output = ()> + 'static,
173    error_handler: ErrorHandler,
174) -> (
175    StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
176    StreamVec<'scope, mz_repr::Timestamp, (DataflowError, Timestamp, Diff)>,
177    Vec<PressOnDropButton>,
178) {
179    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
180
181    let mut tokens = vec![];
182
183    let outer = scope.clone();
184    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
185        let (flow_control, flow_control_probe) = match max_inflight_bytes {
186            Some(max_inflight_bytes) => {
187                let backpressure_metrics = BackpressureMetrics {
188                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
189                    last_backpressured_bytes: Arc::clone(
190                        &shard_metrics.backpressure_last_backpressured_bytes,
191                    ),
192                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
193                };
194
195                let probe = mz_timely_util::probe::Handle::default();
196                let progress_stream = mz_timely_util::probe::source(
197                    scope.clone(),
198                    format!("decode_backpressure_probe({source_id})"),
199                    probe.clone(),
200                );
201                let flow_control = FlowControl {
202                    progress_stream,
203                    max_inflight_bytes,
204                    summary: (Default::default(), Subtime::least_summary()),
205                    metrics: Some(backpressure_metrics),
206                };
207                (Some(flow_control), Some(probe))
208            }
209            None => (None, None),
210        };
211
212        // Our default listen sleeps are tuned for the case of a shard that is
213        // written once a second, but txn-wal allows these to be lazy.
214        // Override the tuning to reduce crdb load. The pubsub fallback
215        // responsibility is then replaced by manual "one state" wakeups in the
216        // txns_progress operator.
217        let cfg = Arc::clone(&persist_clients.cfg().configs);
218        let subscribe_sleep = match metadata.txns_shard {
219            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
220            None => None,
221        };
222
223        let (stream, source_tokens) = persist_source_core(
224            outer,
225            scope,
226            source_id,
227            Arc::clone(&persist_clients),
228            metadata.clone(),
229            read_schema,
230            as_of.clone(),
231            snapshot_mode,
232            until.clone(),
233            map_filter_project,
234            flow_control,
235            subscribe_sleep,
236            start_signal,
237            error_handler,
238        );
239        tokens.extend(source_tokens);
240
241        let stream = match flow_control_probe {
242            Some(probe) => stream.probe_notify_with(vec![probe]),
243            None => stream,
244        };
245
246        stream.leave(outer)
247    });
248
249    // If a txns_shard was provided, then this shard is in the txn-wal
250    // system. This means the "logical" upper may be ahead of the "physical"
251    // upper. Render a dataflow operator that passes through the input and
252    // translates the progress frontiers as necessary.
253    let (stream, txns_tokens) = match metadata.txns_shard {
254        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _>(
255            stream,
256            &source_id.to_string(),
257            txns_ctx,
258            move || {
259                let (c, l) = (
260                    Arc::clone(&persist_clients),
261                    metadata.persist_location.clone(),
262                );
263                async move { c.open(l).await.expect("location is valid") }
264            },
265            txns_shard,
266            metadata.data_shard,
267            as_of
268                .expect("as_of is provided for table sources")
269                .into_option()
270                .expect("shard is not closed"),
271            until,
272            Arc::new(metadata.relation_desc),
273            Arc::new(UnitSchema),
274        ),
275        None => (stream, vec![]),
276    };
277    tokens.extend(txns_tokens);
278    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
279        Ok(row) => Ok((row, t.0, r)),
280        Err(err) => Err((err, t.0, r)),
281    });
282    (ok_stream, err_stream, tokens)
283}
284
285type RefinedScope<'scope, T> = Scope<'scope, (T, Subtime)>;
286
287/// Creates a new source that reads from a persist shard, distributing the work
288/// of reading data to all timely workers.
289///
290/// All times emitted will have been [advanced by] the given `as_of` frontier.
291///
292/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
293#[allow(clippy::needless_borrow)]
294pub fn persist_source_core<'g, 'outer>(
295    outer: Scope<'outer, mz_repr::Timestamp>,
296    scope: RefinedScope<'g, mz_repr::Timestamp>,
297    source_id: GlobalId,
298    persist_clients: Arc<PersistClientCache>,
299    metadata: CollectionMetadata,
300    read_schema: Option<RelationDesc>,
301    as_of: Option<Antichain<Timestamp>>,
302    snapshot_mode: SnapshotMode,
303    until: Antichain<Timestamp>,
304    map_filter_project: Option<&mut MfpPlan>,
305    flow_control: Option<FlowControl<'g, (mz_repr::Timestamp, Subtime)>>,
306    // If Some, an override for the default listen sleep retry parameters.
307    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
308    start_signal: impl Future<Output = ()> + 'static,
309    error_handler: ErrorHandler,
310) -> (
311    Stream<
312        'g,
313        (mz_repr::Timestamp, Subtime),
314        Vec<(
315            Result<Row, DataflowError>,
316            (mz_repr::Timestamp, Subtime),
317            Diff,
318        )>,
319    >,
320    Vec<PressOnDropButton>,
321) {
322    let cfg = persist_clients.cfg().clone();
323    let name = source_id.to_string();
324    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
325
326    // N.B. `read_schema` may be a subset of the total columns for this shard.
327    let read_desc = match read_schema {
328        Some(desc) => desc,
329        None => metadata.relation_desc,
330    };
331
332    let desc_transformer = match flow_control {
333        Some(flow_control) => Some(move |scope, descs, chosen_worker| {
334            let (stream, token) = backpressure(
335                scope,
336                &format!("backpressure({source_id})"),
337                descs,
338                flow_control,
339                chosen_worker,
340                None,
341            );
342            (stream, vec![token])
343        }),
344        None => None,
345    };
346
347    let metrics = Arc::clone(persist_clients.metrics());
348    let filter_name = name.clone();
349    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
350    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
351    // `until ` is the empty antichain.
352    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
353    let (fetched, token) = shard_source(
354        outer,
355        scope,
356        &name,
357        move || {
358            let (c, l) = (
359                Arc::clone(&persist_clients),
360                metadata.persist_location.clone(),
361            );
362            async move { c.open(l).await.unwrap() }
363        },
364        metadata.data_shard,
365        as_of,
366        snapshot_mode,
367        until.clone(),
368        desc_transformer,
369        Arc::new(read_desc.clone()),
370        Arc::new(UnitSchema),
371        move |stats, frontier| {
372            let Some(lower) = frontier.as_option().copied() else {
373                // If the frontier has advanced to the empty antichain,
374                // we'll never emit any rows from any part.
375                return FilterResult::Discard;
376            };
377
378            if lower > upper {
379                // The frontier timestamp is larger than the until of the dataflow:
380                // anything from this part will necessarily be filtered out.
381                return FilterResult::Discard;
382            }
383
384            let time_range =
385                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
386            if let Some(plan) = &filter_plan {
387                let metrics = &metrics.pushdown.part_stats;
388                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
389                filter_result(&read_desc, time_range, stats, plan)
390            } else {
391                FilterResult::Keep
392            }
393        },
394        listen_sleep,
395        start_signal,
396        error_handler,
397    );
398    let rows = decode_and_mfp(cfg, fetched, &name, until, map_filter_project);
399    (rows, token)
400}
401
402fn filter_result(
403    relation_desc: &RelationDesc,
404    time_range: ResultSpec,
405    stats: RelationPartStats,
406    plan: &MfpPlan,
407) -> FilterResult {
408    let arena = RowArena::new();
409    let relation = ReprRelationType::from(relation_desc.typ());
410    let mut ranges = ColumnSpecs::new(&relation, &arena);
411    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
412
413    let may_error = stats.err_count().map_or(true, |count| count > 0);
414
415    // N.B. We may have pushed down column "demands" into Persist, so this
416    // relation desc may have a different set of columns than the stats.
417    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
418        let result_spec = stats.col_stats(idx, &arena);
419        ranges.push_column(pos, result_spec);
420    }
421    let result = ranges.mfp_plan_filter(plan).range;
422    let may_error = may_error || result.may_fail();
423    let may_keep = result.may_contain(Datum::True);
424    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
425    if relation_desc.len() == 0 && !may_error && !may_skip {
426        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
427            return FilterResult::Keep;
428        };
429        key.append(&SourceData(Ok(Row::default())));
430        let key = key.finish();
431        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
432            return FilterResult::Keep;
433        };
434        val.append(&());
435        let val = val.finish();
436
437        FilterResult::ReplaceWith {
438            key: Arc::new(key),
439            val: Arc::new(val),
440        }
441    } else if may_error || may_keep {
442        FilterResult::Keep
443    } else {
444        FilterResult::Discard
445    }
446}
447
448pub fn decode_and_mfp<'scope>(
449    cfg: PersistConfig,
450    fetched: StreamVec<
451        'scope,
452        (mz_repr::Timestamp, Subtime),
453        FetchedBlob<SourceData, (), Timestamp, StorageDiff>,
454    >,
455    name: &str,
456    until: Antichain<Timestamp>,
457    mut map_filter_project: Option<&mut MfpPlan>,
458) -> StreamVec<
459    'scope,
460    (mz_repr::Timestamp, Subtime),
461    (
462        Result<Row, DataflowError>,
463        (mz_repr::Timestamp, Subtime),
464        Diff,
465    ),
466> {
467    let scope = fetched.scope();
468    let mut builder = OperatorBuilder::new(
469        format!("persist_source::decode_and_mfp({})", name),
470        scope.clone(),
471    );
472    let operator_info = builder.operator_info();
473
474    let mut fetched_input = builder.new_input(fetched, Pipeline);
475    let (updates_output, updates_stream) = builder.new_output();
476    let mut updates_output = OutputBuilder::from(updates_output);
477
478    // Re-used state for processing and building rows.
479    let mut datum_vec = mz_repr::DatumVec::new();
480    let mut row_builder = Row::default();
481
482    // Extract the MFP if it exists; leave behind an identity MFP in that case.
483    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
484
485    builder.build(move |_caps| {
486        let name = name.to_owned();
487        // Acquire an activator to reschedule the operator when it has unfinished work.
488        let activations = scope.activations();
489        let activator = Activator::new(operator_info.address, activations);
490        // Maintain a list of work to do
491        let mut pending_work = std::collections::VecDeque::new();
492        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
493
494        move |_frontier| {
495            fetched_input.for_each(|time, data| {
496                let capability = time.retain(0);
497                for fetched_blob in data.drain(..) {
498                    pending_work.push_back(PendingWork {
499                        panic_on_audit_failure: panic_on_audit_failure.get(),
500                        capability: capability.clone(),
501                        part: PendingPart::Unparsed(fetched_blob),
502                    })
503                }
504            });
505
506            // Get dyncfg values once per schedule to amortize the cost of
507            // loading the atomics.
508            let yield_fuel = cfg.storage_source_decode_fuel();
509            let yield_fn = |_, work| work >= yield_fuel;
510
511            let mut work = 0;
512            let start_time = Instant::now();
513            let mut output = updates_output.activate();
514            while !pending_work.is_empty() && !yield_fn(start_time, work) {
515                let done = pending_work.front_mut().unwrap().do_work(
516                    &mut work,
517                    &name,
518                    start_time,
519                    yield_fn,
520                    &until,
521                    map_filter_project.as_ref(),
522                    &mut datum_vec,
523                    &mut row_builder,
524                    &mut output,
525                );
526                if done {
527                    pending_work.pop_front();
528                }
529            }
530            if !pending_work.is_empty() {
531                activator.activate();
532            }
533        }
534    });
535
536    updates_stream
537}
538
539/// Pending work to read from fetched parts
540struct PendingWork {
541    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
542    panic_on_audit_failure: bool,
543    /// The time at which the work should happen.
544    capability: Capability<(mz_repr::Timestamp, Subtime)>,
545    /// Pending fetched part.
546    part: PendingPart,
547}
548
549enum PendingPart {
550    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
551    Parsed {
552        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
553    },
554}
555
556impl PendingPart {
557    /// Returns the contained `FetchedPart`, first parsing it from a
558    /// `FetchedBlob` if necessary.
559    ///
560    /// Also returns a bool, which is true if the part is known (from pushdown
561    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
562    /// is known to contain errors or if it's unknown.
563    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
564        match self {
565            PendingPart::Unparsed(x) => {
566                *self = PendingPart::Parsed { part: x.parse() };
567                // Won't recurse any further.
568                self.part_mut()
569            }
570            PendingPart::Parsed { part } => &mut part.part,
571        }
572    }
573}
574
575impl PendingWork {
576    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
577    /// `yield_fn` whether more fuel is available.
578    fn do_work<YFn>(
579        &mut self,
580        work: &mut usize,
581        name: &str,
582        start_time: Instant,
583        yield_fn: YFn,
584        until: &Antichain<Timestamp>,
585        map_filter_project: Option<&MfpPlan>,
586        datum_vec: &mut DatumVec,
587        row_builder: &mut Row,
588        output: &mut OutputBuilderSession<
589            '_,
590            (mz_repr::Timestamp, Subtime),
591            ConsolidatingContainerBuilder<
592                Vec<(
593                    Result<Row, DataflowError>,
594                    (mz_repr::Timestamp, Subtime),
595                    Diff,
596                )>,
597            >,
598        >,
599    ) -> bool
600    where
601        YFn: Fn(Instant, usize) -> bool,
602    {
603        let mut session = output.session_with_builder(&self.capability);
604        let fetched_part = self.part.part_mut();
605        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
606        let mut row_buf = None;
607        while let Some(((key, val), time, diff)) =
608            fetched_part.next_with_storage(&mut row_buf, &mut None)
609        {
610            if until.less_equal(&time) {
611                continue;
612            }
613            match (key, val) {
614                (SourceData(Ok(row)), ()) => {
615                    if let Some(mfp) = map_filter_project {
616                        // We originally accounted work as the number of outputs, to give downstream
617                        // operators a chance to reduce down anything we've emitted. This mfp call
618                        // might have a restrictive filter, which would have been counted as no
619                        // work. However, in practice, we've been decode_and_mfp be a source of
620                        // interactivity loss during rehydration, so we now also count each mfp
621                        // evaluation against our fuel.
622                        *work += 1;
623                        let arena = mz_repr::RowArena::new();
624                        let mut datums_local = datum_vec.borrow_with(&row);
625                        for result in mfp.evaluate(
626                            &mut datums_local,
627                            &arena,
628                            time,
629                            diff.into(),
630                            |time| !until.less_equal(time),
631                            row_builder,
632                        ) {
633                            // Earlier we decided this Part doesn't need to be fetched, but to
634                            // audit our logic we fetched it any way. If the MFP returned data it
635                            // means our earlier decision to not fetch this part was incorrect.
636                            if let Some(stats) = &is_filter_pushdown_audit {
637                                // NB: The tag added by this scope is used for alerting. The panic
638                                // message may be changed arbitrarily, but the tag key and val must
639                                // stay the same.
640                                sentry::with_scope(
641                                    |scope| {
642                                        scope
643                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
644                                    },
645                                    || {
646                                        error!(
647                                            ?stats,
648                                            name,
649                                            ?mfp,
650                                            ?result,
651                                            "persist filter pushdown correctness violation!"
652                                        );
653                                        if self.panic_on_audit_failure {
654                                            panic!(
655                                                "persist filter pushdown correctness violation! {}",
656                                                name
657                                            );
658                                        }
659                                    },
660                                );
661                            }
662                            match result {
663                                Ok((row, time, diff)) => {
664                                    // Additional `until` filtering due to temporal filters.
665                                    if !until.less_equal(&time) {
666                                        let mut emit_time = *self.capability.time();
667                                        emit_time.0 = time;
668                                        session.give((Ok(row), emit_time, diff));
669                                        *work += 1;
670                                    }
671                                }
672                                Err((err, time, diff)) => {
673                                    // Additional `until` filtering due to temporal filters.
674                                    if !until.less_equal(&time) {
675                                        let mut emit_time = *self.capability.time();
676                                        emit_time.0 = time;
677                                        session.give((Err(err), emit_time, diff));
678                                        *work += 1;
679                                    }
680                                }
681                            }
682                        }
683                        // At the moment, this is the only case where we can re-use the allocs for
684                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
685                        // operator used a different container than `Vec<Row>`.
686                        drop(datums_local);
687                        row_buf.replace(SourceData(Ok(row)));
688                    } else {
689                        let mut emit_time = *self.capability.time();
690                        emit_time.0 = time;
691                        // Clone row so we retain our row allocation.
692                        session.give((Ok(row.clone()), emit_time, diff.into()));
693                        row_buf.replace(SourceData(Ok(row)));
694                        *work += 1;
695                    }
696                }
697                (SourceData(Err(err)), ()) => {
698                    let mut emit_time = *self.capability.time();
699                    emit_time.0 = time;
700                    session.give((Err(err), emit_time, diff.into()));
701                    *work += 1;
702                }
703            }
704            if yield_fn(start_time, *work) {
705                return false;
706            }
707        }
708        true
709    }
710}
711
712/// A trait representing a type that can be used in `backpressure`.
713pub trait Backpressureable: Clone + 'static {
714    /// Return the weight of the object, in bytes.
715    fn byte_size(&self) -> usize;
716}
717
718impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
719    fn byte_size(&self) -> usize {
720        self.1.encoded_size_bytes()
721    }
722}
723
724/// Flow control configuration.
725#[derive(Debug)]
726pub struct FlowControl<'scope, T: timely::progress::Timestamp> {
727    /// Stream providing in-flight frontier updates.
728    ///
729    /// As implied by its type, this stream never emits data, only progress updates.
730    ///
731    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
732    pub progress_stream: StreamVec<'scope, T, Infallible>,
733    /// Maximum number of in-flight bytes.
734    pub max_inflight_bytes: usize,
735    /// The minimum range of timestamps (be they granular or not) that must be emitted,
736    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
737    pub summary: T::Summary,
738
739    /// Optional metrics for the `backpressure` operator to keep up-to-date.
740    pub metrics: Option<BackpressureMetrics>,
741}
742
743/// Apply flow control to the `data` input, based on the given `FlowControl`.
744///
745/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
746/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
747/// and a `summary`. Note that the `data` input expects all the second part of the tuple
748/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
749///
750/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
751/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
752/// `max_inflight_bytes`.
753///
754/// The implementation of this operator is very subtle. Many inline comments have been added.
755pub fn backpressure<'scope, T, O>(
756    scope: Scope<'scope, (T, Subtime)>,
757    name: &str,
758    data: StreamVec<'scope, (T, Subtime), O>,
759    flow_control: FlowControl<'scope, (T, Subtime)>,
760    chosen_worker: usize,
761    // A probe used to inspect this operator during unit-testing
762    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
763) -> (StreamVec<'scope, (T, Subtime), O>, PressOnDropButton)
764where
765    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
766    O: Backpressureable + std::fmt::Debug,
767{
768    let worker_index = scope.index();
769
770    let (flow_control_stream, flow_control_max_bytes, metrics) = (
771        flow_control.progress_stream,
772        flow_control.max_inflight_bytes,
773        flow_control.metrics,
774    );
775
776    // Both the `flow_control` input and the data input are disconnected from the output. We manually
777    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
778    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
779    // non-circular fashion.
780    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
781    flow_control_stream.connect_loop(handle);
782
783    let mut builder = AsyncOperatorBuilder::new(
784        format!("persist_source_backpressure({})", name),
785        scope.clone(),
786    );
787    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
788
789    let mut data_input = builder.new_disconnected_input(data, Pipeline);
790    let mut flow_control_input = builder.new_disconnected_input(summaried_flow, Pipeline);
791
792    // Helper method used to synthesize current and next frontier for ordered times.
793    fn synthesize_frontiers<T: PartialOrder + Clone>(
794        mut frontier: Antichain<(T, Subtime)>,
795        mut time: (T, Subtime),
796        part_number: &mut u64,
797    ) -> (
798        (T, Subtime),
799        Antichain<(T, Subtime)>,
800        Antichain<(T, Subtime)>,
801    ) {
802        let mut next_frontier = frontier.clone();
803        time.1 = Subtime(*part_number);
804        frontier.insert(time.clone());
805        *part_number += 1;
806        let mut next_time = time.clone();
807        next_time.1 = Subtime(*part_number);
808        next_frontier.insert(next_time);
809        (time, frontier, next_frontier)
810    }
811
812    // _Refine_ the data stream by amending the second input with the part number. This also
813    // ensures that we order the parts by time.
814    let data_input = async_stream::stream!({
815        let mut part_number = 0;
816        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
817        loop {
818            match data_input.next().await {
819                None => {
820                    let empty = Antichain::new();
821                    parts.sort_by_key(|val| val.0.clone());
822                    for (part_time, d) in parts.drain(..) {
823                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
824                            empty.clone(),
825                            part_time.clone(),
826                            &mut part_number,
827                        );
828                        yield Either::Right((part_time, d, frontier, next_frontier))
829                    }
830                    break;
831                }
832                Some(Event::Data(time, data)) => {
833                    for d in data {
834                        parts.push((time.clone(), d));
835                    }
836                }
837                Some(Event::Progress(prog)) => {
838                    parts.sort_by_key(|val| val.0.clone());
839                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
840                        let (part_time, frontier, next_frontier) =
841                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
842                        yield Either::Right((part_time, d, frontier, next_frontier))
843                    }
844                    yield Either::Left(prog)
845                }
846            }
847        }
848    });
849    let shutdown_button = builder.build(move |caps| async move {
850        // The output capability.
851        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
852
853        // The frontier of our output. This matches the `CapabilitySet` above.
854        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
855        // The frontier of the `flow_control` input.
856        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
857
858        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
859        let mut inflight_parts = Vec::new();
860        // Parts we have not yet emitted, but do participate in the `input_frontier`.
861        let mut pending_parts = std::collections::VecDeque::new();
862
863        // Only one worker is responsible for distributing parts
864        if worker_index != chosen_worker {
865            trace!(
866                "We are not the chosen worker ({}), exiting...",
867                chosen_worker
868            );
869            return;
870        }
871        tokio::pin!(data_input);
872        'emitting_parts: loop {
873            // At the beginning of our main loop, we determine the total size of
874            // inflight parts.
875            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
876
877            // There are 2 main cases where we can continue to emit parts:
878            // - The total emitted bytes is less than `flow_control_max_bytes`.
879            // - The output frontier is not beyond the `flow_control_frontier`
880            //
881            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
882            // block, as we wait for progress tracking to keep the `flow_control` frontier
883            // up-to-date. This is tested in unit-tests.
884            if inflight_bytes < flow_control_max_bytes
885                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
886            {
887                let (time, part, next_frontier) =
888                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
889                        (time, part, next_frontier)
890                    } else {
891                        match data_input.next().await {
892                            Some(Either::Right((time, part, frontier, next_frontier))) => {
893                                // Downgrade the output frontier to this part's time. This is useful
894                                // "close" timestamp's from previous parts, even if we don't yet
895                                // emit this part. Note that this is safe because `data_input` ensures
896                                // time-ordering.
897                                output_frontier = frontier;
898                                cap_set.downgrade(output_frontier.iter());
899
900                                // If the most recent value's time is _beyond_ the
901                                // `flow_control` frontier (which takes into account the `summary`), we
902                                // have emitted an entire `summary` worth of data, and can store this
903                                // value for later.
904                                if inflight_bytes >= flow_control_max_bytes
905                                    && !PartialOrder::less_than(
906                                        &output_frontier,
907                                        &flow_control_frontier,
908                                    )
909                                {
910                                    pending_parts.push_back((time, part, next_frontier));
911                                    continue 'emitting_parts;
912                                }
913                                (time, part, next_frontier)
914                            }
915                            Some(Either::Left(prog)) => {
916                                output_frontier = prog;
917                                cap_set.downgrade(output_frontier.iter());
918                                continue 'emitting_parts;
919                            }
920                            None => {
921                                if pending_parts.is_empty() {
922                                    break 'emitting_parts;
923                                } else {
924                                    continue 'emitting_parts;
925                                }
926                            }
927                        }
928                    };
929
930                let byte_size = part.byte_size();
931                // Store the value with the _frontier_ the `flow_control_input` must reach
932                // to retire it. Note that if this `results_in` is `None`, then we
933                // are at `T::MAX`, and give up on flow_control entirely.
934                //
935                // SUBTLE: If we stop storing these parts, we will likely never check the
936                // `flow_control_input` ever again. This won't pile up data as that input
937                // only has frontier updates. There may be spurious activations from it though.
938                //
939                // Also note that we don't attempt to handle overflowing the `u64` part counter.
940                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
941                    inflight_parts.push((emission_ts, byte_size));
942                }
943
944                // Emit the data at the given time, and update the frontier and capabilities
945                // to just beyond the part.
946                data_output.give(&cap_set.delayed(&time), part);
947
948                if let Some(metrics) = &metrics {
949                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
950                }
951
952                output_frontier = next_frontier;
953                cap_set.downgrade(output_frontier.iter())
954            } else {
955                if let Some(metrics) = &metrics {
956                    metrics
957                        .last_backpressured_bytes
958                        .set(u64::cast_from(inflight_bytes))
959                }
960                let parts_count = inflight_parts.len();
961                // We've exhausted our budget, listen for updates to the flow_control
962                // input's frontier until we free up new budget. If we don't interact with
963                // with this side of the if statement, because the stream has no data, we
964                // don't cause unbounded buffering in timely.
965                let new_flow_control_frontier = match flow_control_input.next().await {
966                    Some(Event::Progress(frontier)) => frontier,
967                    Some(Event::Data(_, _)) => {
968                        unreachable!("flow_control_input should not contain data")
969                    }
970                    None => Antichain::new(),
971                };
972
973                // Update the `flow_control_frontier` if its advanced.
974                flow_control_frontier.clone_from(&new_flow_control_frontier);
975
976                // Retire parts that are processed downstream.
977                let retired_parts = inflight_parts
978                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
979                let (retired_size, retired_count): (usize, usize) = retired_parts
980                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
981                        (accum_size + size, accum_count + 1)
982                    });
983                trace!(
984                    "returning {} parts with {} bytes, frontier: {:?}",
985                    retired_count, retired_size, flow_control_frontier,
986                );
987
988                if let Some(metrics) = &metrics {
989                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
990                }
991
992                // Optionally emit some information for tests to examine.
993                if let Some(probe) = probe.as_ref() {
994                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
995                }
996            }
997        }
998    });
999    (data_stream, shutdown_button.press_on_drop())
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use timely::container::CapacityContainerBuilder;
1005    use timely::dataflow::operators::{Enter, Probe};
1006    use tokio::sync::mpsc::unbounded_channel;
1007    use tokio::sync::oneshot;
1008
1009    use super::*;
1010
1011    #[mz_ore::test]
1012    fn test_backpressure_non_granular() {
1013        use Step::*;
1014        backpressure_runner(
1015            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1016            100,
1017            (1, Subtime(0)),
1018            vec![
1019                // Assert we backpressure only after we have emitted
1020                // the entire timestamp.
1021                AssertOutputFrontier((50, Subtime(2))),
1022                AssertBackpressured {
1023                    frontier: (1, Subtime(0)),
1024                    inflight_parts: 1,
1025                    retired_parts: 0,
1026                },
1027                AssertBackpressured {
1028                    frontier: (51, Subtime(0)),
1029                    inflight_parts: 1,
1030                    retired_parts: 0,
1031                },
1032                ProcessXParts(2),
1033                AssertBackpressured {
1034                    frontier: (101, Subtime(0)),
1035                    inflight_parts: 2,
1036                    retired_parts: 2,
1037                },
1038                // Assert we make later progress once processing
1039                // the parts.
1040                AssertOutputFrontier((100, Subtime(3))),
1041            ],
1042            true,
1043        );
1044
1045        backpressure_runner(
1046            vec![
1047                (50, Part(10)),
1048                (50, Part(10)),
1049                (51, Part(100)),
1050                (52, Part(1000)),
1051            ],
1052            50,
1053            (1, Subtime(0)),
1054            vec![
1055                // Assert we backpressure only after we emitted enough bytes
1056                AssertOutputFrontier((51, Subtime(3))),
1057                AssertBackpressured {
1058                    frontier: (1, Subtime(0)),
1059                    inflight_parts: 3,
1060                    retired_parts: 0,
1061                },
1062                ProcessXParts(3),
1063                AssertBackpressured {
1064                    frontier: (52, Subtime(0)),
1065                    inflight_parts: 3,
1066                    retired_parts: 2,
1067                },
1068                AssertBackpressured {
1069                    frontier: (53, Subtime(0)),
1070                    inflight_parts: 1,
1071                    retired_parts: 1,
1072                },
1073                // Assert we make later progress once processing
1074                // the parts.
1075                AssertOutputFrontier((52, Subtime(4))),
1076            ],
1077            true,
1078        );
1079
1080        backpressure_runner(
1081            vec![
1082                (50, Part(98)),
1083                (50, Part(1)),
1084                (51, Part(10)),
1085                (52, Part(100)),
1086                // Additional parts at the same timestamp
1087                (52, Part(10)),
1088                (52, Part(10)),
1089                (52, Part(10)),
1090                (52, Part(100)),
1091                // A later part with a later ts.
1092                (100, Part(100)),
1093            ],
1094            100,
1095            (1, Subtime(0)),
1096            vec![
1097                AssertOutputFrontier((51, Subtime(3))),
1098                // Assert we backpressure after we have emitted enough bytes.
1099                // We assert twice here because we get updates as
1100                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1101                AssertBackpressured {
1102                    frontier: (1, Subtime(0)),
1103                    inflight_parts: 3,
1104                    retired_parts: 0,
1105                },
1106                AssertBackpressured {
1107                    frontier: (51, Subtime(0)),
1108                    inflight_parts: 3,
1109                    retired_parts: 0,
1110                },
1111                ProcessXParts(1),
1112                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1113                // 50.
1114                AssertOutputFrontier((51, Subtime(3))),
1115                // After we process all of `50`, we can start emitting data at `52`, but only until
1116                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1117                // all of `51`.
1118                ProcessXParts(1),
1119                AssertOutputFrontier((52, Subtime(4))),
1120                AssertBackpressured {
1121                    frontier: (52, Subtime(0)),
1122                    inflight_parts: 3,
1123                    retired_parts: 2,
1124                },
1125                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1126                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1127                // time `100`.
1128                ProcessXParts(1),
1129                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1130                // parts at `52`
1131                // This is an intermediate state.
1132                AssertBackpressured {
1133                    frontier: (53, Subtime(0)),
1134                    inflight_parts: 2,
1135                    retired_parts: 1,
1136                },
1137                // After we process all of `52`, we can continue to the next time.
1138                ProcessXParts(5),
1139                AssertBackpressured {
1140                    frontier: (101, Subtime(0)),
1141                    inflight_parts: 5,
1142                    retired_parts: 5,
1143                },
1144                AssertOutputFrontier((100, Subtime(9))),
1145            ],
1146            true,
1147        );
1148    }
1149
1150    #[mz_ore::test]
1151    fn test_backpressure_granular() {
1152        use Step::*;
1153        backpressure_runner(
1154            vec![(50, Part(101)), (50, Part(101))],
1155            100,
1156            (0, Subtime(1)),
1157            vec![
1158                // Advance our frontier to outputting a single part.
1159                AssertOutputFrontier((50, Subtime(1))),
1160                // Receive backpressure updates until our frontier is up-to-date but
1161                // not beyond the parts (while considering the summary).
1162                AssertBackpressured {
1163                    frontier: (0, Subtime(1)),
1164                    inflight_parts: 1,
1165                    retired_parts: 0,
1166                },
1167                AssertBackpressured {
1168                    frontier: (50, Subtime(1)),
1169                    inflight_parts: 1,
1170                    retired_parts: 0,
1171                },
1172                // Process that part.
1173                ProcessXParts(1),
1174                // Assert that we clear the backpressure status
1175                AssertBackpressured {
1176                    frontier: (50, Subtime(2)),
1177                    inflight_parts: 1,
1178                    retired_parts: 1,
1179                },
1180                // Ensure we make progress to the next part.
1181                AssertOutputFrontier((50, Subtime(2))),
1182            ],
1183            false,
1184        );
1185
1186        backpressure_runner(
1187            vec![
1188                (50, Part(10)),
1189                (50, Part(10)),
1190                (51, Part(35)),
1191                (52, Part(100)),
1192            ],
1193            50,
1194            (0, Subtime(1)),
1195            vec![
1196                // we can emit 3 parts before we hit the backpressure limit
1197                AssertOutputFrontier((51, Subtime(3))),
1198                AssertBackpressured {
1199                    frontier: (0, Subtime(1)),
1200                    inflight_parts: 3,
1201                    retired_parts: 0,
1202                },
1203                AssertBackpressured {
1204                    frontier: (50, Subtime(1)),
1205                    inflight_parts: 3,
1206                    retired_parts: 0,
1207                },
1208                // Retire the single part.
1209                ProcessXParts(1),
1210                AssertBackpressured {
1211                    frontier: (50, Subtime(2)),
1212                    inflight_parts: 3,
1213                    retired_parts: 1,
1214                },
1215                // Ensure we make progress, and then
1216                // can retire the next 2 parts.
1217                AssertOutputFrontier((52, Subtime(4))),
1218                ProcessXParts(2),
1219                AssertBackpressured {
1220                    frontier: (52, Subtime(4)),
1221                    inflight_parts: 3,
1222                    retired_parts: 2,
1223                },
1224            ],
1225            false,
1226        );
1227    }
1228
1229    type Time = (u64, Subtime);
1230    #[derive(Clone, Debug)]
1231    struct Part(usize);
1232    impl Backpressureable for Part {
1233        fn byte_size(&self) -> usize {
1234            self.0
1235        }
1236    }
1237
1238    /// Actions taken by `backpressure_runner`.
1239    enum Step {
1240        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1241        /// this far. This is a single time because we assume
1242        AssertOutputFrontier(Time),
1243        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1244        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1245        /// retired.
1246        AssertBackpressured {
1247            frontier: Time,
1248            inflight_parts: usize,
1249            retired_parts: usize,
1250        },
1251        /// Process X parts in the downstream operator. This affects the feedback frontier.
1252        ProcessXParts(usize),
1253    }
1254
1255    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1256    fn backpressure_runner(
1257        // The input data to the `backpressure` operator
1258        input: Vec<(u64, Part)>,
1259        // The maximum inflight bytes the `backpressure` operator allows through.
1260        max_inflight_bytes: usize,
1261        // The feedback summary used by the `backpressure` operator.
1262        summary: Time,
1263        // List of steps to run through.
1264        steps: Vec<Step>,
1265        // Whether or not to consume records in the non-granular scope. This is useful when the
1266        // `summary` is something like `(1, 0)`.
1267        non_granular_consumer: bool,
1268    ) {
1269        timely::execute::execute_directly(move |worker| {
1270            let (
1271                backpressure_probe,
1272                consumer_tx,
1273                mut backpressure_status_rx,
1274                finalizer_tx,
1275                _token,
1276            ) =
1277                // Set up the top-level non-granular scope.
1278                worker.dataflow::<u64, _, _>(|outer_scope| {
1279                    let (non_granular_feedback_handle, non_granular_feedback) =
1280                        if non_granular_consumer {
1281                            let (h, f) = outer_scope.feedback(Default::default());
1282                            (Some(h), Some(f))
1283                        } else {
1284                            (None, None)
1285                        };
1286                    let (
1287                        backpressure_probe,
1288                        consumer_tx,
1289                        backpressure_status_rx,
1290                        token,
1291                        backpressured,
1292                        finalizer_tx,
1293                    ) = outer_scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1294                        let (input, finalizer_tx) =
1295                            iterator_operator(scope.clone(), input.into_iter());
1296
1297                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1298                            (
1299                                FlowControl {
1300                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1301                                    max_inflight_bytes,
1302                                    summary,
1303                                    metrics: None
1304                                },
1305                                None,
1306                            )
1307                        } else {
1308                            let (granular_feedback_handle, granular_feedback) =
1309                                scope.feedback(Default::default());
1310                            (
1311                                FlowControl {
1312                                    progress_stream: granular_feedback,
1313                                    max_inflight_bytes,
1314                                    summary,
1315                                    metrics: None,
1316                                },
1317                                Some(granular_feedback_handle),
1318                            )
1319                        };
1320
1321                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1322
1323                        let (backpressured, token) = backpressure(
1324                            scope,
1325                            "test",
1326                            input,
1327                            flow_control,
1328                            0,
1329                            Some(backpressure_status_tx),
1330                        );
1331
1332                        // If we want to granularly consume the output, we setup the consumer here.
1333                        let tx = if !non_granular_consumer {
1334                            Some(consumer_operator(
1335                                scope.clone(),
1336                                backpressured.clone(),
1337                                granular_feedback_handle.unwrap(),
1338                            ))
1339                        } else {
1340                            None
1341                        };
1342
1343                        let (probe_handle, backpressured) = backpressured.probe();
1344                        (
1345                            probe_handle,
1346                            tx,
1347                            backpressure_status_rx,
1348                            token,
1349                            backpressured.leave(outer_scope),
1350                            finalizer_tx,
1351                        )
1352                    });
1353
1354                    // If we want to non-granularly consume the output, we setup the consumer here.
1355                    let consumer_tx = if non_granular_consumer {
1356                        consumer_operator(
1357                            outer_scope.clone(),
1358                            backpressured,
1359                            non_granular_feedback_handle.unwrap(),
1360                        )
1361                    } else {
1362                        consumer_tx.unwrap()
1363                    };
1364
1365                    (
1366                        backpressure_probe,
1367                        consumer_tx,
1368                        backpressure_status_rx,
1369                        finalizer_tx,
1370                        token,
1371                    )
1372                });
1373
1374            use Step::*;
1375            for step in steps {
1376                match step {
1377                    AssertOutputFrontier(time) => {
1378                        eprintln!("checking advance to {time:?}");
1379                        backpressure_probe.with_frontier(|front| {
1380                            eprintln!("current backpressure output frontier: {front:?}");
1381                        });
1382                        while backpressure_probe.less_than(&time) {
1383                            worker.step();
1384                            backpressure_probe.with_frontier(|front| {
1385                                eprintln!("current backpressure output frontier: {front:?}");
1386                            });
1387                            std::thread::sleep(std::time::Duration::from_millis(25));
1388                        }
1389                    }
1390                    ProcessXParts(parts) => {
1391                        eprintln!("processing {parts:?} parts");
1392                        for _ in 0..parts {
1393                            consumer_tx.send(()).unwrap();
1394                        }
1395                    }
1396                    AssertBackpressured {
1397                        frontier,
1398                        inflight_parts,
1399                        retired_parts,
1400                    } => {
1401                        let frontier = Antichain::from_elem(frontier);
1402                        eprintln!(
1403                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1404                            and {retired_parts:?} retired"
1405                        );
1406                        let (new_frontier, new_count, new_retired_count) = loop {
1407                            if let Ok(val) = backpressure_status_rx.try_recv() {
1408                                break val;
1409                            }
1410                            worker.step();
1411                            std::thread::sleep(std::time::Duration::from_millis(25));
1412                        };
1413                        assert_eq!(
1414                            (frontier, inflight_parts, retired_parts),
1415                            (new_frontier, new_count, new_retired_count)
1416                        );
1417                    }
1418                }
1419            }
1420            // Send the input to the empty frontier.
1421            let _ = finalizer_tx.send(());
1422        });
1423    }
1424
1425    /// An operator that emits `Part`'s at the specified timestamps. Does not
1426    /// drop its capability until it gets a signal from the `Sender` it returns.
1427    fn iterator_operator<'scope, I: Iterator<Item = (u64, Part)> + 'static>(
1428        scope: Scope<'scope, (u64, Subtime)>,
1429        mut input: I,
1430    ) -> (StreamVec<'scope, (u64, Subtime), Part>, oneshot::Sender<()>) {
1431        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1432        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1433        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1434
1435        iterator.build(|mut caps| async move {
1436            let mut capability = Some(caps.pop().unwrap());
1437            let mut last = None;
1438            while let Some(element) = input.next() {
1439                let time = element.0.clone();
1440                let part = element.1;
1441                last = Some((time, Subtime(0)));
1442                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1443            }
1444            if let Some(last) = last {
1445                capability
1446                    .as_mut()
1447                    .unwrap()
1448                    .downgrade(&(last.0 + 1, last.1));
1449            }
1450
1451            let _ = finalizer_rx.await;
1452            capability.take();
1453        });
1454
1455        (output, finalizer_tx)
1456    }
1457
1458    /// An operator that consumes its input ONLY when given a signal to do from
1459    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1460    /// being processed. Also connects the `feedback` handle to its output.
1461    fn consumer_operator<
1462        'scope,
1463        T: timely::progress::Timestamp,
1464        O: Backpressureable + std::fmt::Debug,
1465    >(
1466        scope: Scope<'scope, T>,
1467        input: StreamVec<'scope, T, O>,
1468        feedback: timely::dataflow::operators::feedback::Handle<
1469            'scope,
1470            T,
1471            Vec<std::convert::Infallible>,
1472        >,
1473    ) -> UnboundedSender<()> {
1474        let (tx, mut rx) = unbounded_channel::<()>();
1475        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1476        let (output_handle, output) =
1477            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1478        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1479
1480        consumer.build(|_caps| async move {
1481            while let Some(()) = rx.recv().await {
1482                // Consume exactly one messages (unless the input is exhausted).
1483                while let Some(Event::Progress(_)) = input.next().await {}
1484            }
1485        });
1486        output.connect_loop(feedback);
1487
1488        tx
1489    }
1490}