Skip to main content

mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30    ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::container::CapacityContainerBuilder;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::pact::Pipeline;
52use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
53use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
54use timely::dataflow::operators::{Capability, Leave, OkErr};
55use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
56use timely::dataflow::scopes::Child;
57use timely::dataflow::{Scope, Stream};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
69/// progress than would otherwise be possible.
70///
71/// This is "opaque" since we'd like to reserve the right to change the definition in the future
72/// without downstreams being able to rely on the precise representation. (At the moment, this
73/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
74/// in the future.)
75#[derive(
76    Copy,
77    Clone,
78    PartialEq,
79    Default,
80    Eq,
81    PartialOrd,
82    Ord,
83    Debug,
84    Serialize,
85    Deserialize,
86    Hash
87)]
88pub struct Subtime(u64);
89
90impl PartialOrder for Subtime {
91    fn less_equal(&self, other: &Self) -> bool {
92        self.0.less_equal(&other.0)
93    }
94}
95
96impl TotalOrder for Subtime {}
97
98impl PathSummary<Subtime> for Subtime {
99    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
100        self.0.results_in(&src.0).map(Subtime)
101    }
102
103    fn followed_by(&self, other: &Self) -> Option<Self> {
104        self.0.followed_by(&other.0).map(Subtime)
105    }
106}
107
108impl TimelyTimestamp for Subtime {
109    type Summary = Subtime;
110
111    fn minimum() -> Self {
112        Subtime(0)
113    }
114}
115
116impl Subtime {
117    /// The smallest non-zero summary for the opaque timestamp type.
118    pub const fn least_summary() -> Self {
119        Subtime(1)
120    }
121}
122
123/// Creates a new source that reads from a persist shard, distributing the work
124/// of reading data to all timely workers.
125///
126/// All times emitted will have been [advanced by] the given `as_of` frontier.
127/// All updates at times greater or equal to `until` will be suppressed.
128/// The `map_filter_project` argument, if supplied, may be partially applied,
129/// and any un-applied part of the argument will be left behind in the argument.
130///
131/// Users of this function have the ability to apply flow control to the output
132/// to limit the in-flight data (measured in bytes) it can emit. The flow control
133/// input is a timely stream that communicates the frontier at which the data
134/// emitted from by this source have been dropped.
135///
136/// **Note:** Because this function is reading batches from `persist`, it is working
137/// at batch granularity. In practice, the source will be overshooting the target
138/// flow control upper by an amount that is related to the size of batches.
139///
140/// If no flow control is desired an empty stream whose frontier immediately advances
141/// to the empty antichain can be used. An easy easy of creating such stream is by
142/// using [`timely::dataflow::operators::generic::operator::empty`].
143///
144/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
145pub fn persist_source<G>(
146    scope: &mut G,
147    source_id: GlobalId,
148    persist_clients: Arc<PersistClientCache>,
149    txns_ctx: &TxnsContext,
150    metadata: CollectionMetadata,
151    read_schema: Option<RelationDesc>,
152    as_of: Option<Antichain<Timestamp>>,
153    snapshot_mode: SnapshotMode,
154    until: Antichain<Timestamp>,
155    map_filter_project: Option<&mut MfpPlan>,
156    max_inflight_bytes: Option<usize>,
157    start_signal: impl Future<Output = ()> + 'static,
158    error_handler: ErrorHandler,
159) -> (
160    Stream<G, (Row, Timestamp, Diff)>,
161    Stream<G, (DataflowError, Timestamp, Diff)>,
162    Vec<PressOnDropButton>,
163)
164where
165    G: Scope<Timestamp = mz_repr::Timestamp>,
166{
167    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
168
169    let mut tokens = vec![];
170
171    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
172        let (flow_control, flow_control_probe) = match max_inflight_bytes {
173            Some(max_inflight_bytes) => {
174                let backpressure_metrics = BackpressureMetrics {
175                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
176                    last_backpressured_bytes: Arc::clone(
177                        &shard_metrics.backpressure_last_backpressured_bytes,
178                    ),
179                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
180                };
181
182                let probe = mz_timely_util::probe::Handle::default();
183                let progress_stream = mz_timely_util::probe::source(
184                    scope.clone(),
185                    format!("decode_backpressure_probe({source_id})"),
186                    probe.clone(),
187                );
188                let flow_control = FlowControl {
189                    progress_stream,
190                    max_inflight_bytes,
191                    summary: (Default::default(), Subtime::least_summary()),
192                    metrics: Some(backpressure_metrics),
193                };
194                (Some(flow_control), Some(probe))
195            }
196            None => (None, None),
197        };
198
199        // Our default listen sleeps are tuned for the case of a shard that is
200        // written once a second, but txn-wal allows these to be lazy.
201        // Override the tuning to reduce crdb load. The pubsub fallback
202        // responsibility is then replaced by manual "one state" wakeups in the
203        // txns_progress operator.
204        let cfg = Arc::clone(&persist_clients.cfg().configs);
205        let subscribe_sleep = match metadata.txns_shard {
206            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
207            None => None,
208        };
209
210        let (stream, source_tokens) = persist_source_core(
211            scope,
212            source_id,
213            Arc::clone(&persist_clients),
214            metadata.clone(),
215            read_schema,
216            as_of.clone(),
217            snapshot_mode,
218            until.clone(),
219            map_filter_project,
220            flow_control,
221            subscribe_sleep,
222            start_signal,
223            error_handler,
224        );
225        tokens.extend(source_tokens);
226
227        let stream = match flow_control_probe {
228            Some(probe) => stream.probe_notify_with(vec![probe]),
229            None => stream,
230        };
231
232        stream.leave()
233    });
234
235    // If a txns_shard was provided, then this shard is in the txn-wal
236    // system. This means the "logical" upper may be ahead of the "physical"
237    // upper. Render a dataflow operator that passes through the input and
238    // translates the progress frontiers as necessary.
239    let (stream, txns_tokens) = match metadata.txns_shard {
240        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
241            stream,
242            &source_id.to_string(),
243            txns_ctx,
244            move || {
245                let (c, l) = (
246                    Arc::clone(&persist_clients),
247                    metadata.persist_location.clone(),
248                );
249                async move { c.open(l).await.expect("location is valid") }
250            },
251            txns_shard,
252            metadata.data_shard,
253            as_of
254                .expect("as_of is provided for table sources")
255                .into_option()
256                .expect("shard is not closed"),
257            until,
258            Arc::new(metadata.relation_desc),
259            Arc::new(UnitSchema),
260        ),
261        None => (stream, vec![]),
262    };
263    tokens.extend(txns_tokens);
264    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
265        Ok(row) => Ok((row, t.0, r)),
266        Err(err) => Err((err, t.0, r)),
267    });
268    (ok_stream, err_stream, tokens)
269}
270
271type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
272
273/// Creates a new source that reads from a persist shard, distributing the work
274/// of reading data to all timely workers.
275///
276/// All times emitted will have been [advanced by] the given `as_of` frontier.
277///
278/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
279#[allow(clippy::needless_borrow)]
280pub fn persist_source_core<'g, G>(
281    scope: &RefinedScope<'g, G>,
282    source_id: GlobalId,
283    persist_clients: Arc<PersistClientCache>,
284    metadata: CollectionMetadata,
285    read_schema: Option<RelationDesc>,
286    as_of: Option<Antichain<Timestamp>>,
287    snapshot_mode: SnapshotMode,
288    until: Antichain<Timestamp>,
289    map_filter_project: Option<&mut MfpPlan>,
290    flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
291    // If Some, an override for the default listen sleep retry parameters.
292    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
293    start_signal: impl Future<Output = ()> + 'static,
294    error_handler: ErrorHandler,
295) -> (
296    Stream<
297        RefinedScope<'g, G>,
298        (
299            Result<Row, DataflowError>,
300            (mz_repr::Timestamp, Subtime),
301            Diff,
302        ),
303    >,
304    Vec<PressOnDropButton>,
305)
306where
307    G: Scope<Timestamp = mz_repr::Timestamp>,
308{
309    let cfg = persist_clients.cfg().clone();
310    let name = source_id.to_string();
311    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
312
313    // N.B. `read_schema` may be a subset of the total columns for this shard.
314    let read_desc = match read_schema {
315        Some(desc) => desc,
316        None => metadata.relation_desc,
317    };
318
319    let desc_transformer = match flow_control {
320        Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
321            let (stream, token) = backpressure(
322                &mut scope,
323                &format!("backpressure({source_id})"),
324                descs,
325                flow_control,
326                chosen_worker,
327                None,
328            );
329            (stream, vec![token])
330        }),
331        None => None,
332    };
333
334    let metrics = Arc::clone(persist_clients.metrics());
335    let filter_name = name.clone();
336    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
337    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
338    // `until ` is the empty antichain.
339    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
340    let (fetched, token) = shard_source(
341        &mut scope.clone(),
342        &name,
343        move || {
344            let (c, l) = (
345                Arc::clone(&persist_clients),
346                metadata.persist_location.clone(),
347            );
348            async move { c.open(l).await.unwrap() }
349        },
350        metadata.data_shard,
351        as_of,
352        snapshot_mode,
353        until.clone(),
354        desc_transformer,
355        Arc::new(read_desc.clone()),
356        Arc::new(UnitSchema),
357        move |stats, frontier| {
358            let Some(lower) = frontier.as_option().copied() else {
359                // If the frontier has advanced to the empty antichain,
360                // we'll never emit any rows from any part.
361                return FilterResult::Discard;
362            };
363
364            if lower > upper {
365                // The frontier timestamp is larger than the until of the dataflow:
366                // anything from this part will necessarily be filtered out.
367                return FilterResult::Discard;
368            }
369
370            let time_range =
371                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
372            if let Some(plan) = &filter_plan {
373                let metrics = &metrics.pushdown.part_stats;
374                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
375                filter_result(&read_desc, time_range, stats, plan)
376            } else {
377                FilterResult::Keep
378            }
379        },
380        listen_sleep,
381        start_signal,
382        error_handler,
383    );
384    let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
385    (rows, token)
386}
387
388fn filter_result(
389    relation_desc: &RelationDesc,
390    time_range: ResultSpec,
391    stats: RelationPartStats,
392    plan: &MfpPlan,
393) -> FilterResult {
394    let arena = RowArena::new();
395    let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
396    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
397
398    let may_error = stats.err_count().map_or(true, |count| count > 0);
399
400    // N.B. We may have pushed down column "demands" into Persist, so this
401    // relation desc may have a different set of columns than the stats.
402    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
403        let result_spec = stats.col_stats(idx, &arena);
404        ranges.push_column(pos, result_spec);
405    }
406    let result = ranges.mfp_plan_filter(plan).range;
407    let may_error = may_error || result.may_fail();
408    let may_keep = result.may_contain(Datum::True);
409    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
410    if relation_desc.len() == 0 && !may_error && !may_skip {
411        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
412            return FilterResult::Keep;
413        };
414        key.append(&SourceData(Ok(Row::default())));
415        let key = key.finish();
416        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
417            return FilterResult::Keep;
418        };
419        val.append(&());
420        let val = val.finish();
421
422        FilterResult::ReplaceWith {
423            key: Arc::new(key),
424            val: Arc::new(val),
425        }
426    } else if may_error || may_keep {
427        FilterResult::Keep
428    } else {
429        FilterResult::Discard
430    }
431}
432
433pub fn decode_and_mfp<G>(
434    cfg: PersistConfig,
435    fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
436    name: &str,
437    until: Antichain<Timestamp>,
438    mut map_filter_project: Option<&mut MfpPlan>,
439) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
440where
441    G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
442{
443    let scope = fetched.scope();
444    let mut builder = OperatorBuilder::new(
445        format!("persist_source::decode_and_mfp({})", name),
446        scope.clone(),
447    );
448    let operator_info = builder.operator_info();
449
450    let mut fetched_input = builder.new_input(fetched, Pipeline);
451    let (updates_output, updates_stream) = builder.new_output();
452    let mut updates_output = OutputBuilder::from(updates_output);
453
454    // Re-used state for processing and building rows.
455    let mut datum_vec = mz_repr::DatumVec::new();
456    let mut row_builder = Row::default();
457
458    // Extract the MFP if it exists; leave behind an identity MFP in that case.
459    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
460
461    builder.build(move |_caps| {
462        let name = name.to_owned();
463        // Acquire an activator to reschedule the operator when it has unfinished work.
464        let activations = scope.activations();
465        let activator = Activator::new(operator_info.address, activations);
466        // Maintain a list of work to do
467        let mut pending_work = std::collections::VecDeque::new();
468        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
469
470        move |_frontier| {
471            fetched_input.for_each(|time, data| {
472                let capability = time.retain();
473                for fetched_blob in data.drain(..) {
474                    pending_work.push_back(PendingWork {
475                        panic_on_audit_failure: panic_on_audit_failure.get(),
476                        capability: capability.clone(),
477                        part: PendingPart::Unparsed(fetched_blob),
478                    })
479                }
480            });
481
482            // Get dyncfg values once per schedule to amortize the cost of
483            // loading the atomics.
484            let yield_fuel = cfg.storage_source_decode_fuel();
485            let yield_fn = |_, work| work >= yield_fuel;
486
487            let mut work = 0;
488            let start_time = Instant::now();
489            let mut output = updates_output.activate();
490            while !pending_work.is_empty() && !yield_fn(start_time, work) {
491                let done = pending_work.front_mut().unwrap().do_work(
492                    &mut work,
493                    &name,
494                    start_time,
495                    yield_fn,
496                    &until,
497                    map_filter_project.as_ref(),
498                    &mut datum_vec,
499                    &mut row_builder,
500                    &mut output,
501                );
502                if done {
503                    pending_work.pop_front();
504                }
505            }
506            if !pending_work.is_empty() {
507                activator.activate();
508            }
509        }
510    });
511
512    updates_stream
513}
514
515/// Pending work to read from fetched parts
516struct PendingWork {
517    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
518    panic_on_audit_failure: bool,
519    /// The time at which the work should happen.
520    capability: Capability<(mz_repr::Timestamp, Subtime)>,
521    /// Pending fetched part.
522    part: PendingPart,
523}
524
525enum PendingPart {
526    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
527    Parsed {
528        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
529    },
530}
531
532impl PendingPart {
533    /// Returns the contained `FetchedPart`, first parsing it from a
534    /// `FetchedBlob` if necessary.
535    ///
536    /// Also returns a bool, which is true if the part is known (from pushdown
537    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
538    /// is known to contain errors or if it's unknown.
539    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
540        match self {
541            PendingPart::Unparsed(x) => {
542                *self = PendingPart::Parsed { part: x.parse() };
543                // Won't recurse any further.
544                self.part_mut()
545            }
546            PendingPart::Parsed { part } => &mut part.part,
547        }
548    }
549}
550
551impl PendingWork {
552    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
553    /// `yield_fn` whether more fuel is available.
554    fn do_work<YFn>(
555        &mut self,
556        work: &mut usize,
557        name: &str,
558        start_time: Instant,
559        yield_fn: YFn,
560        until: &Antichain<Timestamp>,
561        map_filter_project: Option<&MfpPlan>,
562        datum_vec: &mut DatumVec,
563        row_builder: &mut Row,
564        output: &mut OutputBuilderSession<
565            '_,
566            (mz_repr::Timestamp, Subtime),
567            ConsolidatingContainerBuilder<
568                Vec<(
569                    Result<Row, DataflowError>,
570                    (mz_repr::Timestamp, Subtime),
571                    Diff,
572                )>,
573            >,
574        >,
575    ) -> bool
576    where
577        YFn: Fn(Instant, usize) -> bool,
578    {
579        let mut session = output.session_with_builder(&self.capability);
580        let fetched_part = self.part.part_mut();
581        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
582        let mut row_buf = None;
583        while let Some(((key, val), time, diff)) =
584            fetched_part.next_with_storage(&mut row_buf, &mut None)
585        {
586            if until.less_equal(&time) {
587                continue;
588            }
589            match (key, val) {
590                (SourceData(Ok(row)), ()) => {
591                    if let Some(mfp) = map_filter_project {
592                        // We originally accounted work as the number of outputs, to give downstream
593                        // operators a chance to reduce down anything we've emitted. This mfp call
594                        // might have a restrictive filter, which would have been counted as no
595                        // work. However, in practice, we've been decode_and_mfp be a source of
596                        // interactivity loss during rehydration, so we now also count each mfp
597                        // evaluation against our fuel.
598                        *work += 1;
599                        let arena = mz_repr::RowArena::new();
600                        let mut datums_local = datum_vec.borrow_with(&row);
601                        for result in mfp.evaluate(
602                            &mut datums_local,
603                            &arena,
604                            time,
605                            diff.into(),
606                            |time| !until.less_equal(time),
607                            row_builder,
608                        ) {
609                            // Earlier we decided this Part doesn't need to be fetched, but to
610                            // audit our logic we fetched it any way. If the MFP returned data it
611                            // means our earlier decision to not fetch this part was incorrect.
612                            if let Some(stats) = &is_filter_pushdown_audit {
613                                // NB: The tag added by this scope is used for alerting. The panic
614                                // message may be changed arbitrarily, but the tag key and val must
615                                // stay the same.
616                                sentry::with_scope(
617                                    |scope| {
618                                        scope
619                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
620                                    },
621                                    || {
622                                        error!(
623                                            ?stats,
624                                            name,
625                                            ?mfp,
626                                            ?result,
627                                            "persist filter pushdown correctness violation!"
628                                        );
629                                        if self.panic_on_audit_failure {
630                                            panic!(
631                                                "persist filter pushdown correctness violation! {}",
632                                                name
633                                            );
634                                        }
635                                    },
636                                );
637                            }
638                            match result {
639                                Ok((row, time, diff)) => {
640                                    // Additional `until` filtering due to temporal filters.
641                                    if !until.less_equal(&time) {
642                                        let mut emit_time = *self.capability.time();
643                                        emit_time.0 = time;
644                                        session.give((Ok(row), emit_time, diff));
645                                        *work += 1;
646                                    }
647                                }
648                                Err((err, time, diff)) => {
649                                    // Additional `until` filtering due to temporal filters.
650                                    if !until.less_equal(&time) {
651                                        let mut emit_time = *self.capability.time();
652                                        emit_time.0 = time;
653                                        session.give((Err(err), emit_time, diff));
654                                        *work += 1;
655                                    }
656                                }
657                            }
658                        }
659                        // At the moment, this is the only case where we can re-use the allocs for
660                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
661                        // operator used a different container than `Vec<Row>`.
662                        drop(datums_local);
663                        row_buf.replace(SourceData(Ok(row)));
664                    } else {
665                        let mut emit_time = *self.capability.time();
666                        emit_time.0 = time;
667                        // Clone row so we retain our row allocation.
668                        session.give((Ok(row.clone()), emit_time, diff.into()));
669                        row_buf.replace(SourceData(Ok(row)));
670                        *work += 1;
671                    }
672                }
673                (SourceData(Err(err)), ()) => {
674                    let mut emit_time = *self.capability.time();
675                    emit_time.0 = time;
676                    session.give((Err(err), emit_time, diff.into()));
677                    *work += 1;
678                }
679            }
680            if yield_fn(start_time, *work) {
681                return false;
682            }
683        }
684        true
685    }
686}
687
688/// A trait representing a type that can be used in `backpressure`.
689pub trait Backpressureable: Clone + 'static {
690    /// Return the weight of the object, in bytes.
691    fn byte_size(&self) -> usize;
692}
693
694impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
695    fn byte_size(&self) -> usize {
696        self.1.encoded_size_bytes()
697    }
698}
699
700/// Flow control configuration.
701#[derive(Debug)]
702pub struct FlowControl<G: Scope> {
703    /// Stream providing in-flight frontier updates.
704    ///
705    /// As implied by its type, this stream never emits data, only progress updates.
706    ///
707    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
708    pub progress_stream: Stream<G, Infallible>,
709    /// Maximum number of in-flight bytes.
710    pub max_inflight_bytes: usize,
711    /// The minimum range of timestamps (be they granular or not) that must be emitted,
712    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
713    pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
714
715    /// Optional metrics for the `backpressure` operator to keep up-to-date.
716    pub metrics: Option<BackpressureMetrics>,
717}
718
719/// Apply flow control to the `data` input, based on the given `FlowControl`.
720///
721/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
722/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
723/// and a `summary`. Note that the `data` input expects all the second part of the tuple
724/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
725///
726/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
727/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
728/// `max_inflight_bytes`.
729///
730/// The implementation of this operator is very subtle. Many inline comments have been added.
731pub fn backpressure<T, G, O>(
732    scope: &mut G,
733    name: &str,
734    data: &Stream<G, O>,
735    flow_control: FlowControl<G>,
736    chosen_worker: usize,
737    // A probe used to inspect this operator during unit-testing
738    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
739) -> (Stream<G, O>, PressOnDropButton)
740where
741    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
742    G: Scope<Timestamp = (T, Subtime)>,
743    O: Backpressureable + std::fmt::Debug,
744{
745    let worker_index = scope.index();
746
747    let (flow_control_stream, flow_control_max_bytes, metrics) = (
748        flow_control.progress_stream,
749        flow_control.max_inflight_bytes,
750        flow_control.metrics,
751    );
752
753    // Both the `flow_control` input and the data input are disconnected from the output. We manually
754    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
755    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
756    // non-circular fashion.
757    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
758    flow_control_stream.connect_loop(handle);
759
760    let mut builder = AsyncOperatorBuilder::new(
761        format!("persist_source_backpressure({})", name),
762        scope.clone(),
763    );
764    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
765
766    let mut data_input = builder.new_disconnected_input(data, Pipeline);
767    let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
768
769    // Helper method used to synthesize current and next frontier for ordered times.
770    fn synthesize_frontiers<T: PartialOrder + Clone>(
771        mut frontier: Antichain<(T, Subtime)>,
772        mut time: (T, Subtime),
773        part_number: &mut u64,
774    ) -> (
775        (T, Subtime),
776        Antichain<(T, Subtime)>,
777        Antichain<(T, Subtime)>,
778    ) {
779        let mut next_frontier = frontier.clone();
780        time.1 = Subtime(*part_number);
781        frontier.insert(time.clone());
782        *part_number += 1;
783        let mut next_time = time.clone();
784        next_time.1 = Subtime(*part_number);
785        next_frontier.insert(next_time);
786        (time, frontier, next_frontier)
787    }
788
789    // _Refine_ the data stream by amending the second input with the part number. This also
790    // ensures that we order the parts by time.
791    let data_input = async_stream::stream!({
792        let mut part_number = 0;
793        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
794        loop {
795            match data_input.next().await {
796                None => {
797                    let empty = Antichain::new();
798                    parts.sort_by_key(|val| val.0.clone());
799                    for (part_time, d) in parts.drain(..) {
800                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
801                            empty.clone(),
802                            part_time.clone(),
803                            &mut part_number,
804                        );
805                        yield Either::Right((part_time, d, frontier, next_frontier))
806                    }
807                    break;
808                }
809                Some(Event::Data(time, data)) => {
810                    for d in data {
811                        parts.push((time.clone(), d));
812                    }
813                }
814                Some(Event::Progress(prog)) => {
815                    parts.sort_by_key(|val| val.0.clone());
816                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
817                        let (part_time, frontier, next_frontier) =
818                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
819                        yield Either::Right((part_time, d, frontier, next_frontier))
820                    }
821                    yield Either::Left(prog)
822                }
823            }
824        }
825    });
826    let shutdown_button = builder.build(move |caps| async move {
827        // The output capability.
828        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
829
830        // The frontier of our output. This matches the `CapabilitySet` above.
831        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
832        // The frontier of the `flow_control` input.
833        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
834
835        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
836        let mut inflight_parts = Vec::new();
837        // Parts we have not yet emitted, but do participate in the `input_frontier`.
838        let mut pending_parts = std::collections::VecDeque::new();
839
840        // Only one worker is responsible for distributing parts
841        if worker_index != chosen_worker {
842            trace!(
843                "We are not the chosen worker ({}), exiting...",
844                chosen_worker
845            );
846            return;
847        }
848        tokio::pin!(data_input);
849        'emitting_parts: loop {
850            // At the beginning of our main loop, we determine the total size of
851            // inflight parts.
852            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
853
854            // There are 2 main cases where we can continue to emit parts:
855            // - The total emitted bytes is less than `flow_control_max_bytes`.
856            // - The output frontier is not beyond the `flow_control_frontier`
857            //
858            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
859            // block, as we wait for progress tracking to keep the `flow_control` frontier
860            // up-to-date. This is tested in unit-tests.
861            if inflight_bytes < flow_control_max_bytes
862                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
863            {
864                let (time, part, next_frontier) =
865                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
866                        (time, part, next_frontier)
867                    } else {
868                        match data_input.next().await {
869                            Some(Either::Right((time, part, frontier, next_frontier))) => {
870                                // Downgrade the output frontier to this part's time. This is useful
871                                // "close" timestamp's from previous parts, even if we don't yet
872                                // emit this part. Note that this is safe because `data_input` ensures
873                                // time-ordering.
874                                output_frontier = frontier;
875                                cap_set.downgrade(output_frontier.iter());
876
877                                // If the most recent value's time is _beyond_ the
878                                // `flow_control` frontier (which takes into account the `summary`), we
879                                // have emitted an entire `summary` worth of data, and can store this
880                                // value for later.
881                                if inflight_bytes >= flow_control_max_bytes
882                                    && !PartialOrder::less_than(
883                                        &output_frontier,
884                                        &flow_control_frontier,
885                                    )
886                                {
887                                    pending_parts.push_back((time, part, next_frontier));
888                                    continue 'emitting_parts;
889                                }
890                                (time, part, next_frontier)
891                            }
892                            Some(Either::Left(prog)) => {
893                                output_frontier = prog;
894                                cap_set.downgrade(output_frontier.iter());
895                                continue 'emitting_parts;
896                            }
897                            None => {
898                                if pending_parts.is_empty() {
899                                    break 'emitting_parts;
900                                } else {
901                                    continue 'emitting_parts;
902                                }
903                            }
904                        }
905                    };
906
907                let byte_size = part.byte_size();
908                // Store the value with the _frontier_ the `flow_control_input` must reach
909                // to retire it. Note that if this `results_in` is `None`, then we
910                // are at `T::MAX`, and give up on flow_control entirely.
911                //
912                // SUBTLE: If we stop storing these parts, we will likely never check the
913                // `flow_control_input` ever again. This won't pile up data as that input
914                // only has frontier updates. There may be spurious activations from it though.
915                //
916                // Also note that we don't attempt to handle overflowing the `u64` part counter.
917                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
918                    inflight_parts.push((emission_ts, byte_size));
919                }
920
921                // Emit the data at the given time, and update the frontier and capabilities
922                // to just beyond the part.
923                data_output.give(&cap_set.delayed(&time), part);
924
925                if let Some(metrics) = &metrics {
926                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
927                }
928
929                output_frontier = next_frontier;
930                cap_set.downgrade(output_frontier.iter())
931            } else {
932                if let Some(metrics) = &metrics {
933                    metrics
934                        .last_backpressured_bytes
935                        .set(u64::cast_from(inflight_bytes))
936                }
937                let parts_count = inflight_parts.len();
938                // We've exhausted our budget, listen for updates to the flow_control
939                // input's frontier until we free up new budget. If we don't interact with
940                // with this side of the if statement, because the stream has no data, we
941                // don't cause unbounded buffering in timely.
942                let new_flow_control_frontier = match flow_control_input.next().await {
943                    Some(Event::Progress(frontier)) => frontier,
944                    Some(Event::Data(_, _)) => {
945                        unreachable!("flow_control_input should not contain data")
946                    }
947                    None => Antichain::new(),
948                };
949
950                // Update the `flow_control_frontier` if its advanced.
951                flow_control_frontier.clone_from(&new_flow_control_frontier);
952
953                // Retire parts that are processed downstream.
954                let retired_parts = inflight_parts
955                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
956                let (retired_size, retired_count): (usize, usize) = retired_parts
957                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
958                        (accum_size + size, accum_count + 1)
959                    });
960                trace!(
961                    "returning {} parts with {} bytes, frontier: {:?}",
962                    retired_count, retired_size, flow_control_frontier,
963                );
964
965                if let Some(metrics) = &metrics {
966                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
967                }
968
969                // Optionally emit some information for tests to examine.
970                if let Some(probe) = probe.as_ref() {
971                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
972                }
973            }
974        }
975    });
976    (data_stream, shutdown_button.press_on_drop())
977}
978
979#[cfg(test)]
980mod tests {
981    use timely::container::CapacityContainerBuilder;
982    use timely::dataflow::operators::{Enter, Probe};
983    use tokio::sync::mpsc::unbounded_channel;
984    use tokio::sync::oneshot;
985
986    use super::*;
987
988    #[mz_ore::test]
989    fn test_backpressure_non_granular() {
990        use Step::*;
991        backpressure_runner(
992            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
993            100,
994            (1, Subtime(0)),
995            vec![
996                // Assert we backpressure only after we have emitted
997                // the entire timestamp.
998                AssertOutputFrontier((50, Subtime(2))),
999                AssertBackpressured {
1000                    frontier: (1, Subtime(0)),
1001                    inflight_parts: 1,
1002                    retired_parts: 0,
1003                },
1004                AssertBackpressured {
1005                    frontier: (51, Subtime(0)),
1006                    inflight_parts: 1,
1007                    retired_parts: 0,
1008                },
1009                ProcessXParts(2),
1010                AssertBackpressured {
1011                    frontier: (101, Subtime(0)),
1012                    inflight_parts: 2,
1013                    retired_parts: 2,
1014                },
1015                // Assert we make later progress once processing
1016                // the parts.
1017                AssertOutputFrontier((100, Subtime(3))),
1018            ],
1019            true,
1020        );
1021
1022        backpressure_runner(
1023            vec![
1024                (50, Part(10)),
1025                (50, Part(10)),
1026                (51, Part(100)),
1027                (52, Part(1000)),
1028            ],
1029            50,
1030            (1, Subtime(0)),
1031            vec![
1032                // Assert we backpressure only after we emitted enough bytes
1033                AssertOutputFrontier((51, Subtime(3))),
1034                AssertBackpressured {
1035                    frontier: (1, Subtime(0)),
1036                    inflight_parts: 3,
1037                    retired_parts: 0,
1038                },
1039                ProcessXParts(3),
1040                AssertBackpressured {
1041                    frontier: (52, Subtime(0)),
1042                    inflight_parts: 3,
1043                    retired_parts: 2,
1044                },
1045                AssertBackpressured {
1046                    frontier: (53, Subtime(0)),
1047                    inflight_parts: 1,
1048                    retired_parts: 1,
1049                },
1050                // Assert we make later progress once processing
1051                // the parts.
1052                AssertOutputFrontier((52, Subtime(4))),
1053            ],
1054            true,
1055        );
1056
1057        backpressure_runner(
1058            vec![
1059                (50, Part(98)),
1060                (50, Part(1)),
1061                (51, Part(10)),
1062                (52, Part(100)),
1063                // Additional parts at the same timestamp
1064                (52, Part(10)),
1065                (52, Part(10)),
1066                (52, Part(10)),
1067                (52, Part(100)),
1068                // A later part with a later ts.
1069                (100, Part(100)),
1070            ],
1071            100,
1072            (1, Subtime(0)),
1073            vec![
1074                AssertOutputFrontier((51, Subtime(3))),
1075                // Assert we backpressure after we have emitted enough bytes.
1076                // We assert twice here because we get updates as
1077                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1078                AssertBackpressured {
1079                    frontier: (1, Subtime(0)),
1080                    inflight_parts: 3,
1081                    retired_parts: 0,
1082                },
1083                AssertBackpressured {
1084                    frontier: (51, Subtime(0)),
1085                    inflight_parts: 3,
1086                    retired_parts: 0,
1087                },
1088                ProcessXParts(1),
1089                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1090                // 50.
1091                AssertOutputFrontier((51, Subtime(3))),
1092                // After we process all of `50`, we can start emitting data at `52`, but only until
1093                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1094                // all of `51`.
1095                ProcessXParts(1),
1096                AssertOutputFrontier((52, Subtime(4))),
1097                AssertBackpressured {
1098                    frontier: (52, Subtime(0)),
1099                    inflight_parts: 3,
1100                    retired_parts: 2,
1101                },
1102                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1103                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1104                // time `100`.
1105                ProcessXParts(1),
1106                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1107                // parts at `52`
1108                // This is an intermediate state.
1109                AssertBackpressured {
1110                    frontier: (53, Subtime(0)),
1111                    inflight_parts: 2,
1112                    retired_parts: 1,
1113                },
1114                // After we process all of `52`, we can continue to the next time.
1115                ProcessXParts(5),
1116                AssertBackpressured {
1117                    frontier: (101, Subtime(0)),
1118                    inflight_parts: 5,
1119                    retired_parts: 5,
1120                },
1121                AssertOutputFrontier((100, Subtime(9))),
1122            ],
1123            true,
1124        );
1125    }
1126
1127    #[mz_ore::test]
1128    fn test_backpressure_granular() {
1129        use Step::*;
1130        backpressure_runner(
1131            vec![(50, Part(101)), (50, Part(101))],
1132            100,
1133            (0, Subtime(1)),
1134            vec![
1135                // Advance our frontier to outputting a single part.
1136                AssertOutputFrontier((50, Subtime(1))),
1137                // Receive backpressure updates until our frontier is up-to-date but
1138                // not beyond the parts (while considering the summary).
1139                AssertBackpressured {
1140                    frontier: (0, Subtime(1)),
1141                    inflight_parts: 1,
1142                    retired_parts: 0,
1143                },
1144                AssertBackpressured {
1145                    frontier: (50, Subtime(1)),
1146                    inflight_parts: 1,
1147                    retired_parts: 0,
1148                },
1149                // Process that part.
1150                ProcessXParts(1),
1151                // Assert that we clear the backpressure status
1152                AssertBackpressured {
1153                    frontier: (50, Subtime(2)),
1154                    inflight_parts: 1,
1155                    retired_parts: 1,
1156                },
1157                // Ensure we make progress to the next part.
1158                AssertOutputFrontier((50, Subtime(2))),
1159            ],
1160            false,
1161        );
1162
1163        backpressure_runner(
1164            vec![
1165                (50, Part(10)),
1166                (50, Part(10)),
1167                (51, Part(35)),
1168                (52, Part(100)),
1169            ],
1170            50,
1171            (0, Subtime(1)),
1172            vec![
1173                // we can emit 3 parts before we hit the backpressure limit
1174                AssertOutputFrontier((51, Subtime(3))),
1175                AssertBackpressured {
1176                    frontier: (0, Subtime(1)),
1177                    inflight_parts: 3,
1178                    retired_parts: 0,
1179                },
1180                AssertBackpressured {
1181                    frontier: (50, Subtime(1)),
1182                    inflight_parts: 3,
1183                    retired_parts: 0,
1184                },
1185                // Retire the single part.
1186                ProcessXParts(1),
1187                AssertBackpressured {
1188                    frontier: (50, Subtime(2)),
1189                    inflight_parts: 3,
1190                    retired_parts: 1,
1191                },
1192                // Ensure we make progress, and then
1193                // can retire the next 2 parts.
1194                AssertOutputFrontier((52, Subtime(4))),
1195                ProcessXParts(2),
1196                AssertBackpressured {
1197                    frontier: (52, Subtime(4)),
1198                    inflight_parts: 3,
1199                    retired_parts: 2,
1200                },
1201            ],
1202            false,
1203        );
1204    }
1205
1206    type Time = (u64, Subtime);
1207    #[derive(Clone, Debug)]
1208    struct Part(usize);
1209    impl Backpressureable for Part {
1210        fn byte_size(&self) -> usize {
1211            self.0
1212        }
1213    }
1214
1215    /// Actions taken by `backpressure_runner`.
1216    enum Step {
1217        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1218        /// this far. This is a single time because we assume
1219        AssertOutputFrontier(Time),
1220        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1221        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1222        /// retired.
1223        AssertBackpressured {
1224            frontier: Time,
1225            inflight_parts: usize,
1226            retired_parts: usize,
1227        },
1228        /// Process X parts in the downstream operator. This affects the feedback frontier.
1229        ProcessXParts(usize),
1230    }
1231
1232    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1233    fn backpressure_runner(
1234        // The input data to the `backpressure` operator
1235        input: Vec<(u64, Part)>,
1236        // The maximum inflight bytes the `backpressure` operator allows through.
1237        max_inflight_bytes: usize,
1238        // The feedback summary used by the `backpressure` operator.
1239        summary: Time,
1240        // List of steps to run through.
1241        steps: Vec<Step>,
1242        // Whether or not to consume records in the non-granular scope. This is useful when the
1243        // `summary` is something like `(1, 0)`.
1244        non_granular_consumer: bool,
1245    ) {
1246        timely::execute::execute_directly(move |worker| {
1247            let (
1248                backpressure_probe,
1249                consumer_tx,
1250                mut backpressure_status_rx,
1251                finalizer_tx,
1252                _token,
1253            ) =
1254                // Set up the top-level non-granular scope.
1255                worker.dataflow::<u64, _, _>(|scope| {
1256                    let (non_granular_feedback_handle, non_granular_feedback) =
1257                        if non_granular_consumer {
1258                            let (h, f) = scope.feedback(Default::default());
1259                            (Some(h), Some(f))
1260                        } else {
1261                            (None, None)
1262                        };
1263                    let (
1264                        backpressure_probe,
1265                        consumer_tx,
1266                        backpressure_status_rx,
1267                        token,
1268                        backpressured,
1269                        finalizer_tx,
1270                    ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1271                        let (input, finalizer_tx) =
1272                            iterator_operator(scope.clone(), input.into_iter());
1273
1274                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1275                            (
1276                                FlowControl {
1277                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1278                                    max_inflight_bytes,
1279                                    summary,
1280                                    metrics: None
1281                                },
1282                                None,
1283                            )
1284                        } else {
1285                            let (granular_feedback_handle, granular_feedback) =
1286                                scope.feedback(Default::default());
1287                            (
1288                                FlowControl {
1289                                    progress_stream: granular_feedback,
1290                                    max_inflight_bytes,
1291                                    summary,
1292                                    metrics: None,
1293                                },
1294                                Some(granular_feedback_handle),
1295                            )
1296                        };
1297
1298                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1299
1300                        let (backpressured, token) = backpressure(
1301                            scope,
1302                            "test",
1303                            &input,
1304                            flow_control,
1305                            0,
1306                            Some(backpressure_status_tx),
1307                        );
1308
1309                        // If we want to granularly consume the output, we setup the consumer here.
1310                        let tx = if !non_granular_consumer {
1311                            Some(consumer_operator(
1312                                scope.clone(),
1313                                &backpressured,
1314                                granular_feedback_handle.unwrap(),
1315                            ))
1316                        } else {
1317                            None
1318                        };
1319
1320                        (
1321                            backpressured.probe(),
1322                            tx,
1323                            backpressure_status_rx,
1324                            token,
1325                            backpressured.leave(),
1326                            finalizer_tx,
1327                        )
1328                    });
1329
1330                    // If we want to non-granularly consume the output, we setup the consumer here.
1331                    let consumer_tx = if non_granular_consumer {
1332                        consumer_operator(
1333                            scope.clone(),
1334                            &backpressured,
1335                            non_granular_feedback_handle.unwrap(),
1336                        )
1337                    } else {
1338                        consumer_tx.unwrap()
1339                    };
1340
1341                    (
1342                        backpressure_probe,
1343                        consumer_tx,
1344                        backpressure_status_rx,
1345                        finalizer_tx,
1346                        token,
1347                    )
1348                });
1349
1350            use Step::*;
1351            for step in steps {
1352                match step {
1353                    AssertOutputFrontier(time) => {
1354                        eprintln!("checking advance to {time:?}");
1355                        backpressure_probe.with_frontier(|front| {
1356                            eprintln!("current backpressure output frontier: {front:?}");
1357                        });
1358                        while backpressure_probe.less_than(&time) {
1359                            worker.step();
1360                            backpressure_probe.with_frontier(|front| {
1361                                eprintln!("current backpressure output frontier: {front:?}");
1362                            });
1363                            std::thread::sleep(std::time::Duration::from_millis(25));
1364                        }
1365                    }
1366                    ProcessXParts(parts) => {
1367                        eprintln!("processing {parts:?} parts");
1368                        for _ in 0..parts {
1369                            consumer_tx.send(()).unwrap();
1370                        }
1371                    }
1372                    AssertBackpressured {
1373                        frontier,
1374                        inflight_parts,
1375                        retired_parts,
1376                    } => {
1377                        let frontier = Antichain::from_elem(frontier);
1378                        eprintln!(
1379                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1380                            and {retired_parts:?} retired"
1381                        );
1382                        let (new_frontier, new_count, new_retired_count) = loop {
1383                            if let Ok(val) = backpressure_status_rx.try_recv() {
1384                                break val;
1385                            }
1386                            worker.step();
1387                            std::thread::sleep(std::time::Duration::from_millis(25));
1388                        };
1389                        assert_eq!(
1390                            (frontier, inflight_parts, retired_parts),
1391                            (new_frontier, new_count, new_retired_count)
1392                        );
1393                    }
1394                }
1395            }
1396            // Send the input to the empty frontier.
1397            let _ = finalizer_tx.send(());
1398        });
1399    }
1400
1401    /// An operator that emits `Part`'s at the specified timestamps. Does not
1402    /// drop its capability until it gets a signal from the `Sender` it returns.
1403    fn iterator_operator<
1404        G: Scope<Timestamp = (u64, Subtime)>,
1405        I: Iterator<Item = (u64, Part)> + 'static,
1406    >(
1407        scope: G,
1408        mut input: I,
1409    ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1410        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1411        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1412        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1413
1414        iterator.build(|mut caps| async move {
1415            let mut capability = Some(caps.pop().unwrap());
1416            let mut last = None;
1417            while let Some(element) = input.next() {
1418                let time = element.0.clone();
1419                let part = element.1;
1420                last = Some((time, Subtime(0)));
1421                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1422            }
1423            if let Some(last) = last {
1424                capability
1425                    .as_mut()
1426                    .unwrap()
1427                    .downgrade(&(last.0 + 1, last.1));
1428            }
1429
1430            let _ = finalizer_rx.await;
1431            capability.take();
1432        });
1433
1434        (output, finalizer_tx)
1435    }
1436
1437    /// An operator that consumes its input ONLY when given a signal to do from
1438    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1439    /// being processed. Also connects the `feedback` handle to its output.
1440    fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1441        scope: G,
1442        input: &Stream<G, O>,
1443        feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1444    ) -> UnboundedSender<()> {
1445        let (tx, mut rx) = unbounded_channel::<()>();
1446        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1447        let (output_handle, output) =
1448            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1449        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1450
1451        consumer.build(|_caps| async move {
1452            while let Some(()) = rx.recv().await {
1453                // Consume exactly one messages (unless the input is exhausted).
1454                while let Some(Event::Progress(_)) = input.next().await {}
1455            }
1456        });
1457        output.connect_loop(feedback);
1458
1459        tx
1460    }
1461}