mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_dyncfg::ConfigSet;
14use std::convert::Infallible;
15use std::fmt::Debug;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::Instant;
21
22use differential_dataflow::lattice::Lattice;
23use futures::{StreamExt, future::Either};
24use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
25use mz_ore::cast::CastFrom;
26use mz_ore::collections::CollectionExt;
27use mz_ore::vec::VecExt;
28use mz_persist_client::cache::PersistClientCache;
29use mz_persist_client::cfg::{PersistConfig, RetryParameters};
30use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
31use mz_persist_client::fetch::{SerdeLeasedBatchPart, ShardSourcePart};
32use mz_persist_client::operators::shard_source::{FilterResult, SnapshotMode, shard_source};
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::communication::Push;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::Message;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::generic::OutputHandleCore;
54use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
55use timely::dataflow::operators::{Capability, Leave, OkErr};
56use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
57use timely::dataflow::scopes::Child;
58use timely::dataflow::{Scope, Stream};
59use timely::order::TotalOrder;
60use timely::progress::Antichain;
61use timely::progress::Timestamp as TimelyTimestamp;
62use timely::progress::timestamp::PathSummary;
63use timely::scheduling::Activator;
64use tokio::sync::mpsc::UnboundedSender;
65use tracing::trace;
66
67use crate::metrics::BackpressureMetrics;
68
69/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
70/// progress than would otherwise be possible.
71///
72/// This is "opaque" since we'd like to reserve the right to change the definition in the future
73/// without downstreams being able to rely on the precise representation. (At the moment, this
74/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
75/// in the future.)
76#[derive(
77    Copy, Clone, PartialEq, Default, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Hash,
78)]
79pub struct Subtime(u64);
80
81impl PartialOrder for Subtime {
82    fn less_equal(&self, other: &Self) -> bool {
83        self.0.less_equal(&other.0)
84    }
85}
86
87impl TotalOrder for Subtime {}
88
89impl PathSummary<Subtime> for Subtime {
90    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
91        self.0.results_in(&src.0).map(Subtime)
92    }
93
94    fn followed_by(&self, other: &Self) -> Option<Self> {
95        self.0.followed_by(&other.0).map(Subtime)
96    }
97}
98
99impl TimelyTimestamp for Subtime {
100    type Summary = Subtime;
101
102    fn minimum() -> Self {
103        Subtime(0)
104    }
105}
106
107impl Subtime {
108    /// The smallest non-zero summary for the opaque timestamp type.
109    pub const fn least_summary() -> Self {
110        Subtime(1)
111    }
112}
113
114/// Creates a new source that reads from a persist shard, distributing the work
115/// of reading data to all timely workers.
116///
117/// All times emitted will have been [advanced by] the given `as_of` frontier.
118/// All updates at times greater or equal to `until` will be suppressed.
119/// The `map_filter_project` argument, if supplied, may be partially applied,
120/// and any un-applied part of the argument will be left behind in the argument.
121///
122/// Users of this function have the ability to apply flow control to the output
123/// to limit the in-flight data (measured in bytes) it can emit. The flow control
124/// input is a timely stream that communicates the frontier at which the data
125/// emitted from by this source have been dropped.
126///
127/// **Note:** Because this function is reading batches from `persist`, it is working
128/// at batch granularity. In practice, the source will be overshooting the target
129/// flow control upper by an amount that is related to the size of batches.
130///
131/// If no flow control is desired an empty stream whose frontier immediately advances
132/// to the empty antichain can be used. An easy easy of creating such stream is by
133/// using [`timely::dataflow::operators::generic::operator::empty`].
134///
135/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
136pub fn persist_source<G>(
137    scope: &mut G,
138    source_id: GlobalId,
139    persist_clients: Arc<PersistClientCache>,
140    txns_ctx: &TxnsContext,
141    // In case we need to use a dyncfg to decide which operators to render in a
142    // dataflow.
143    worker_dyncfgs: &ConfigSet,
144    metadata: CollectionMetadata,
145    read_schema: Option<RelationDesc>,
146    as_of: Option<Antichain<Timestamp>>,
147    snapshot_mode: SnapshotMode,
148    until: Antichain<Timestamp>,
149    map_filter_project: Option<&mut MfpPlan>,
150    max_inflight_bytes: Option<usize>,
151    start_signal: impl Future<Output = ()> + 'static,
152    error_handler: impl FnOnce(String) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
153) -> (
154    Stream<G, (Row, Timestamp, Diff)>,
155    Stream<G, (DataflowError, Timestamp, Diff)>,
156    Vec<PressOnDropButton>,
157)
158where
159    G: Scope<Timestamp = mz_repr::Timestamp>,
160{
161    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
162
163    let mut tokens = vec![];
164
165    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
166        let (flow_control, flow_control_probe) = match max_inflight_bytes {
167            Some(max_inflight_bytes) => {
168                let backpressure_metrics = BackpressureMetrics {
169                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
170                    last_backpressured_bytes: Arc::clone(
171                        &shard_metrics.backpressure_last_backpressured_bytes,
172                    ),
173                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
174                };
175
176                let probe = mz_timely_util::probe::Handle::default();
177                let progress_stream = mz_timely_util::probe::source(
178                    scope.clone(),
179                    format!("decode_backpressure_probe({source_id})"),
180                    probe.clone(),
181                );
182                let flow_control = FlowControl {
183                    progress_stream,
184                    max_inflight_bytes,
185                    summary: (Default::default(), Subtime::least_summary()),
186                    metrics: Some(backpressure_metrics),
187                };
188                (Some(flow_control), Some(probe))
189            }
190            None => (None, None),
191        };
192
193        // Our default listen sleeps are tuned for the case of a shard that is
194        // written once a second, but txn-wal allows these to be lazy.
195        // Override the tuning to reduce crdb load. The pubsub fallback
196        // responsibility is then replaced by manual "one state" wakeups in the
197        // txns_progress operator.
198        let cfg = Arc::clone(&persist_clients.cfg().configs);
199        let subscribe_sleep = match metadata.txns_shard {
200            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
201            None => None,
202        };
203
204        let (stream, source_tokens) = persist_source_core(
205            scope,
206            source_id,
207            Arc::clone(&persist_clients),
208            metadata.clone(),
209            read_schema,
210            as_of.clone(),
211            snapshot_mode,
212            until.clone(),
213            map_filter_project,
214            flow_control,
215            subscribe_sleep,
216            start_signal,
217            error_handler,
218        );
219        tokens.extend(source_tokens);
220
221        let stream = match flow_control_probe {
222            Some(probe) => stream.probe_notify_with(vec![probe]),
223            None => stream,
224        };
225
226        stream.leave()
227    });
228
229    // If a txns_shard was provided, then this shard is in the txn-wal
230    // system. This means the "logical" upper may be ahead of the "physical"
231    // upper. Render a dataflow operator that passes through the input and
232    // translates the progress frontiers as necessary.
233    let (stream, txns_tokens) = match metadata.txns_shard {
234        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
235            stream,
236            &source_id.to_string(),
237            txns_ctx,
238            worker_dyncfgs,
239            move || {
240                let (c, l) = (
241                    Arc::clone(&persist_clients),
242                    metadata.persist_location.clone(),
243                );
244                async move { c.open(l).await.expect("location is valid") }
245            },
246            txns_shard,
247            metadata.data_shard,
248            as_of
249                .expect("as_of is provided for table sources")
250                .into_option()
251                .expect("shard is not closed"),
252            until,
253            Arc::new(metadata.relation_desc),
254            Arc::new(UnitSchema),
255        ),
256        None => (stream, vec![]),
257    };
258    tokens.extend(txns_tokens);
259    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
260        Ok(row) => Ok((row, t.0, r)),
261        Err(err) => Err((err, t.0, r)),
262    });
263    (ok_stream, err_stream, tokens)
264}
265
266type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
267
268/// Creates a new source that reads from a persist shard, distributing the work
269/// of reading data to all timely workers.
270///
271/// All times emitted will have been [advanced by] the given `as_of` frontier.
272///
273/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
274#[allow(clippy::needless_borrow)]
275pub fn persist_source_core<'g, G>(
276    scope: &RefinedScope<'g, G>,
277    source_id: GlobalId,
278    persist_clients: Arc<PersistClientCache>,
279    metadata: CollectionMetadata,
280    read_schema: Option<RelationDesc>,
281    as_of: Option<Antichain<Timestamp>>,
282    snapshot_mode: SnapshotMode,
283    until: Antichain<Timestamp>,
284    map_filter_project: Option<&mut MfpPlan>,
285    flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
286    // If Some, an override for the default listen sleep retry parameters.
287    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
288    start_signal: impl Future<Output = ()> + 'static,
289    error_handler: impl FnOnce(String) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
290) -> (
291    Stream<
292        RefinedScope<'g, G>,
293        (
294            Result<Row, DataflowError>,
295            (mz_repr::Timestamp, Subtime),
296            Diff,
297        ),
298    >,
299    Vec<PressOnDropButton>,
300)
301where
302    G: Scope<Timestamp = mz_repr::Timestamp>,
303{
304    let cfg = persist_clients.cfg().clone();
305    let name = source_id.to_string();
306    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
307
308    // N.B. `read_schema` may be a subset of the total columns for this shard.
309    let read_desc = match read_schema {
310        Some(desc) => desc,
311        None => metadata.relation_desc,
312    };
313
314    let desc_transformer = match flow_control {
315        Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
316            let (stream, token) = backpressure(
317                &mut scope,
318                &format!("backpressure({source_id})"),
319                descs,
320                flow_control,
321                chosen_worker,
322                None,
323            );
324            (stream, vec![token])
325        }),
326        None => None,
327    };
328
329    let metrics = Arc::clone(persist_clients.metrics());
330    let filter_name = name.clone();
331    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
332    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
333    // `until ` is the empty antichain.
334    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
335    let (fetched, token) = shard_source(
336        &mut scope.clone(),
337        &name,
338        move || {
339            let (c, l) = (
340                Arc::clone(&persist_clients),
341                metadata.persist_location.clone(),
342            );
343            async move { c.open(l).await.unwrap() }
344        },
345        metadata.data_shard,
346        as_of,
347        snapshot_mode,
348        until.clone(),
349        desc_transformer,
350        Arc::new(read_desc.clone()),
351        Arc::new(UnitSchema),
352        move |stats, frontier| {
353            let Some(lower) = frontier.as_option().copied() else {
354                // If the frontier has advanced to the empty antichain,
355                // we'll never emit any rows from any part.
356                return FilterResult::Discard;
357            };
358
359            if lower > upper {
360                // The frontier timestamp is larger than the until of the dataflow:
361                // anything from this part will necessarily be filtered out.
362                return FilterResult::Discard;
363            }
364
365            let time_range =
366                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
367            if let Some(plan) = &filter_plan {
368                let metrics = &metrics.pushdown.part_stats;
369                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
370                filter_result(&read_desc, time_range, stats, plan)
371            } else {
372                FilterResult::Keep
373            }
374        },
375        listen_sleep,
376        start_signal,
377        error_handler,
378    );
379    let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
380    (rows, token)
381}
382
383fn filter_result(
384    relation_desc: &RelationDesc,
385    time_range: ResultSpec,
386    stats: RelationPartStats,
387    plan: &MfpPlan,
388) -> FilterResult {
389    let arena = RowArena::new();
390    let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
391    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
392
393    let may_error = stats.err_count().map_or(true, |count| count > 0);
394
395    // N.B. We may have pushed down column "demands" into Persist, so this
396    // relation desc may have a different set of columns than the stats.
397    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
398        let result_spec = stats.col_stats(idx, &arena);
399        ranges.push_column(pos, result_spec);
400    }
401    let result = ranges.mfp_plan_filter(plan).range;
402    let may_error = may_error || result.may_fail();
403    let may_keep = result.may_contain(Datum::True);
404    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
405    if relation_desc.len() == 0 && !may_error && !may_skip {
406        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
407            return FilterResult::Keep;
408        };
409        key.append(&SourceData(Ok(Row::default())));
410        let key = key.finish();
411        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
412            return FilterResult::Keep;
413        };
414        val.append(&());
415        let val = val.finish();
416
417        FilterResult::ReplaceWith {
418            key: Arc::new(key),
419            val: Arc::new(val),
420        }
421    } else if may_error || may_keep {
422        FilterResult::Keep
423    } else {
424        FilterResult::Discard
425    }
426}
427
428pub fn decode_and_mfp<G>(
429    cfg: PersistConfig,
430    fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
431    name: &str,
432    until: Antichain<Timestamp>,
433    mut map_filter_project: Option<&mut MfpPlan>,
434) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
435where
436    G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
437{
438    let scope = fetched.scope();
439    let mut builder = OperatorBuilder::new(
440        format!("persist_source::decode_and_mfp({})", name),
441        scope.clone(),
442    );
443    let operator_info = builder.operator_info();
444
445    let mut fetched_input = builder.new_input(fetched, Pipeline);
446    let (mut updates_output, updates_stream) =
447        builder.new_output::<ConsolidatingContainerBuilder<_>>();
448
449    // Re-used state for processing and building rows.
450    let mut datum_vec = mz_repr::DatumVec::new();
451    let mut row_builder = Row::default();
452
453    // Extract the MFP if it exists; leave behind an identity MFP in that case.
454    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
455
456    builder.build(move |_caps| {
457        let name = name.to_owned();
458        // Acquire an activator to reschedule the operator when it has unfinished work.
459        let activations = scope.activations();
460        let activator = Activator::new(operator_info.address, activations);
461        // Maintain a list of work to do
462        let mut pending_work = std::collections::VecDeque::new();
463
464        move |_frontier| {
465            fetched_input.for_each(|time, data| {
466                let capability = time.retain();
467                for fetched_blob in data.drain(..) {
468                    pending_work.push_back(PendingWork {
469                        capability: capability.clone(),
470                        part: PendingPart::Unparsed(fetched_blob),
471                    })
472                }
473            });
474
475            // Get dyncfg values once per schedule to amortize the cost of
476            // loading the atomics.
477            let yield_fuel = cfg.storage_source_decode_fuel();
478            let yield_fn = |_, work| work >= yield_fuel;
479
480            let mut work = 0;
481            let start_time = Instant::now();
482            let mut output = updates_output.activate();
483            while !pending_work.is_empty() && !yield_fn(start_time, work) {
484                let done = pending_work.front_mut().unwrap().do_work(
485                    &mut work,
486                    &name,
487                    start_time,
488                    yield_fn,
489                    &until,
490                    map_filter_project.as_ref(),
491                    &mut datum_vec,
492                    &mut row_builder,
493                    &mut output,
494                );
495                if done {
496                    pending_work.pop_front();
497                }
498            }
499            if !pending_work.is_empty() {
500                activator.activate();
501            }
502        }
503    });
504
505    updates_stream
506}
507
508/// Pending work to read from fetched parts
509struct PendingWork {
510    /// The time at which the work should happen.
511    capability: Capability<(mz_repr::Timestamp, Subtime)>,
512    /// Pending fetched part.
513    part: PendingPart,
514}
515
516enum PendingPart {
517    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
518    Parsed {
519        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
520    },
521}
522
523impl PendingPart {
524    /// Returns the contained `FetchedPart`, first parsing it from a
525    /// `FetchedBlob` if necessary.
526    ///
527    /// Also returns a bool, which is true if the part is known (from pushdown
528    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
529    /// is known to contain errors or if it's unknown.
530    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
531        match self {
532            PendingPart::Unparsed(x) => {
533                *self = PendingPart::Parsed { part: x.parse() };
534                // Won't recurse any further.
535                self.part_mut()
536            }
537            PendingPart::Parsed { part } => &mut part.part,
538        }
539    }
540}
541
542impl PendingWork {
543    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
544    /// `yield_fn` whether more fuel is available.
545    fn do_work<P, YFn>(
546        &mut self,
547        work: &mut usize,
548        name: &str,
549        start_time: Instant,
550        yield_fn: YFn,
551        until: &Antichain<Timestamp>,
552        map_filter_project: Option<&MfpPlan>,
553        datum_vec: &mut DatumVec,
554        row_builder: &mut Row,
555        output: &mut OutputHandleCore<
556            '_,
557            (mz_repr::Timestamp, Subtime),
558            ConsolidatingContainerBuilder<
559                Vec<(
560                    Result<Row, DataflowError>,
561                    (mz_repr::Timestamp, Subtime),
562                    Diff,
563                )>,
564            >,
565            P,
566        >,
567    ) -> bool
568    where
569        P: Push<
570            Message<
571                (mz_repr::Timestamp, Subtime),
572                Vec<(
573                    Result<Row, DataflowError>,
574                    (mz_repr::Timestamp, Subtime),
575                    Diff,
576                )>,
577            >,
578        >,
579        YFn: Fn(Instant, usize) -> bool,
580    {
581        let mut session = output.session_with_builder(&self.capability);
582        let fetched_part = self.part.part_mut();
583        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
584        let mut row_buf = None;
585        while let Some(((key, val), time, diff)) =
586            fetched_part.next_with_storage(&mut row_buf, &mut None)
587        {
588            if until.less_equal(&time) {
589                continue;
590            }
591            match (key, val) {
592                (Ok(SourceData(Ok(row))), Ok(())) => {
593                    if let Some(mfp) = map_filter_project {
594                        // We originally accounted work as the number of outputs, to give downstream
595                        // operators a chance to reduce down anything we've emitted. This mfp call
596                        // might have a restrictive filter, which would have been counted as no
597                        // work. However, in practice, we've been decode_and_mfp be a source of
598                        // interactivity loss during rehydration, so we now also count each mfp
599                        // evaluation against our fuel.
600                        *work += 1;
601                        let arena = mz_repr::RowArena::new();
602                        let mut datums_local = datum_vec.borrow_with(&row);
603                        for result in mfp.evaluate(
604                            &mut datums_local,
605                            &arena,
606                            time,
607                            diff.into(),
608                            |time| !until.less_equal(time),
609                            row_builder,
610                        ) {
611                            // Earlier we decided this Part doesn't need to be fetched, but to
612                            // audit our logic we fetched it any way. If the MFP returned data it
613                            // means our earlier decision to not fetch this part was incorrect.
614                            if let Some(_stats) = &is_filter_pushdown_audit {
615                                // NB: The tag added by this scope is used for alerting. The panic
616                                // message may be changed arbitrarily, but the tag key and val must
617                                // stay the same.
618                                sentry::with_scope(
619                                    |scope| {
620                                        scope
621                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
622                                    },
623                                    || {
624                                        // TODO: include more (redacted) information here.
625                                        panic!(
626                                            "persist filter pushdown correctness violation! {}",
627                                            name
628                                        );
629                                    },
630                                );
631                            }
632                            match result {
633                                Ok((row, time, diff)) => {
634                                    // Additional `until` filtering due to temporal filters.
635                                    if !until.less_equal(&time) {
636                                        let mut emit_time = *self.capability.time();
637                                        emit_time.0 = time;
638                                        session.give((Ok(row), emit_time, diff));
639                                        *work += 1;
640                                    }
641                                }
642                                Err((err, time, diff)) => {
643                                    // Additional `until` filtering due to temporal filters.
644                                    if !until.less_equal(&time) {
645                                        let mut emit_time = *self.capability.time();
646                                        emit_time.0 = time;
647                                        session.give((Err(err), emit_time, diff));
648                                        *work += 1;
649                                    }
650                                }
651                            }
652                        }
653                        // At the moment, this is the only case where we can re-use the allocs for
654                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
655                        // operator used a different container than `Vec<Row>`.
656                        drop(datums_local);
657                        row_buf.replace(SourceData(Ok(row)));
658                    } else {
659                        let mut emit_time = *self.capability.time();
660                        emit_time.0 = time;
661                        session.give((Ok(row), emit_time, diff.into()));
662                        *work += 1;
663                    }
664                }
665                (Ok(SourceData(Err(err))), Ok(())) => {
666                    let mut emit_time = *self.capability.time();
667                    emit_time.0 = time;
668                    session.give((Err(err), emit_time, diff.into()));
669                    *work += 1;
670                }
671                // TODO(petrosagg): error handling
672                (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
673                    panic!("decoding failed")
674                }
675            }
676            if yield_fn(start_time, *work) {
677                return false;
678            }
679        }
680        true
681    }
682}
683
684/// A trait representing a type that can be used in `backpressure`.
685pub trait Backpressureable: Clone + 'static {
686    /// Return the weight of the object, in bytes.
687    fn byte_size(&self) -> usize;
688}
689
690impl Backpressureable for (usize, SerdeLeasedBatchPart) {
691    fn byte_size(&self) -> usize {
692        self.1.encoded_size_bytes()
693    }
694}
695
696/// Flow control configuration.
697#[derive(Debug)]
698pub struct FlowControl<G: Scope> {
699    /// Stream providing in-flight frontier updates.
700    ///
701    /// As implied by its type, this stream never emits data, only progress updates.
702    ///
703    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
704    pub progress_stream: Stream<G, Infallible>,
705    /// Maximum number of in-flight bytes.
706    pub max_inflight_bytes: usize,
707    /// The minimum range of timestamps (be they granular or not) that must be emitted,
708    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
709    pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
710
711    /// Optional metrics for the `backpressure` operator to keep up-to-date.
712    pub metrics: Option<BackpressureMetrics>,
713}
714
715/// Apply flow control to the `data` input, based on the given `FlowControl`.
716///
717/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
718/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
719/// and a `summary`. Note that the `data` input expects all the second part of the tuple
720/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
721///
722/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
723/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
724/// `max_inflight_bytes`.
725///
726/// The implementation of this operator is very subtle. Many inline comments have been added.
727pub fn backpressure<T, G, O>(
728    scope: &mut G,
729    name: &str,
730    data: &Stream<G, O>,
731    flow_control: FlowControl<G>,
732    chosen_worker: usize,
733    // A probe used to inspect this operator during unit-testing
734    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
735) -> (Stream<G, O>, PressOnDropButton)
736where
737    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
738    G: Scope<Timestamp = (T, Subtime)>,
739    O: Backpressureable + std::fmt::Debug,
740{
741    let worker_index = scope.index();
742
743    let (flow_control_stream, flow_control_max_bytes, metrics) = (
744        flow_control.progress_stream,
745        flow_control.max_inflight_bytes,
746        flow_control.metrics,
747    );
748
749    // Both the `flow_control` input and the data input are disconnected from the output. We manually
750    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
751    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
752    // non-circular fashion.
753    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
754    flow_control_stream.connect_loop(handle);
755
756    let mut builder = AsyncOperatorBuilder::new(
757        format!("persist_source_backpressure({})", name),
758        scope.clone(),
759    );
760    let (data_output, data_stream) = builder.new_output();
761
762    let mut data_input = builder.new_disconnected_input(data, Pipeline);
763    let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
764
765    // Helper method used to synthesize current and next frontier for ordered times.
766    fn synthesize_frontiers<T: PartialOrder + Clone>(
767        mut frontier: Antichain<(T, Subtime)>,
768        mut time: (T, Subtime),
769        part_number: &mut u64,
770    ) -> (
771        (T, Subtime),
772        Antichain<(T, Subtime)>,
773        Antichain<(T, Subtime)>,
774    ) {
775        let mut next_frontier = frontier.clone();
776        time.1 = Subtime(*part_number);
777        frontier.insert(time.clone());
778        *part_number += 1;
779        let mut next_time = time.clone();
780        next_time.1 = Subtime(*part_number);
781        next_frontier.insert(next_time);
782        (time, frontier, next_frontier)
783    }
784
785    // _Refine_ the data stream by amending the second input with the part number. This also
786    // ensures that we order the parts by time.
787    let data_input = async_stream::stream!({
788        let mut part_number = 0;
789        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
790        loop {
791            match data_input.next().await {
792                None => {
793                    let empty = Antichain::new();
794                    parts.sort_by_key(|val| val.0.clone());
795                    for (part_time, d) in parts.drain(..) {
796                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
797                            empty.clone(),
798                            part_time.clone(),
799                            &mut part_number,
800                        );
801                        yield Either::Right((part_time, d, frontier, next_frontier))
802                    }
803                    break;
804                }
805                Some(Event::Data(time, data)) => {
806                    for d in data {
807                        parts.push((time.clone(), d));
808                    }
809                }
810                Some(Event::Progress(prog)) => {
811                    let mut i = 0;
812                    parts.sort_by_key(|val| val.0.clone());
813                    // This can be replaced with `Vec::drain_filter` when it stabilizes.
814                    // `drain_filter_swapping` doesn't work as it reorders the vec.
815                    while i < parts.len() {
816                        if !prog.less_equal(&parts[i].0) {
817                            let (part_time, d) = parts.remove(i);
818                            let (part_time, frontier, next_frontier) = synthesize_frontiers(
819                                prog.clone(),
820                                part_time.clone(),
821                                &mut part_number,
822                            );
823                            yield Either::Right((part_time, d, frontier, next_frontier))
824                        } else {
825                            i += 1;
826                        }
827                    }
828                    yield Either::Left(prog)
829                }
830            }
831        }
832    });
833    let shutdown_button = builder.build(move |caps| async move {
834        // The output capability.
835        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
836
837        // The frontier of our output. This matches the `CapabilitySet` above.
838        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
839        // The frontier of the `flow_control` input.
840        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
841
842        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
843        let mut inflight_parts = Vec::new();
844        // Parts we have not yet emitted, but do participate in the `input_frontier`.
845        let mut pending_parts = std::collections::VecDeque::new();
846
847        // Only one worker is responsible for distributing parts
848        if worker_index != chosen_worker {
849            trace!(
850                "We are not the chosen worker ({}), exiting...",
851                chosen_worker
852            );
853            return;
854        }
855        tokio::pin!(data_input);
856        'emitting_parts: loop {
857            // At the beginning of our main loop, we determine the total size of
858            // inflight parts.
859            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
860
861            // There are 2 main cases where we can continue to emit parts:
862            // - The total emitted bytes is less than `flow_control_max_bytes`.
863            // - The output frontier is not beyond the `flow_control_frontier`
864            //
865            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
866            // block, as we wait for progress tracking to keep the `flow_control` frontier
867            // up-to-date. This is tested in unit-tests.
868            if inflight_bytes < flow_control_max_bytes
869                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
870            {
871                let (time, part, next_frontier) =
872                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
873                        (time, part, next_frontier)
874                    } else {
875                        match data_input.next().await {
876                            Some(Either::Right((time, part, frontier, next_frontier))) => {
877                                // Downgrade the output frontier to this part's time. This is useful
878                                // "close" timestamp's from previous parts, even if we don't yet
879                                // emit this part. Note that this is safe because `data_input` ensures
880                                // time-ordering.
881                                output_frontier = frontier;
882                                cap_set.downgrade(output_frontier.iter());
883
884                                // If the most recent value's time is _beyond_ the
885                                // `flow_control` frontier (which takes into account the `summary`), we
886                                // have emitted an entire `summary` worth of data, and can store this
887                                // value for later.
888                                if inflight_bytes >= flow_control_max_bytes
889                                    && !PartialOrder::less_than(
890                                        &output_frontier,
891                                        &flow_control_frontier,
892                                    )
893                                {
894                                    pending_parts.push_back((time, part, next_frontier));
895                                    continue 'emitting_parts;
896                                }
897                                (time, part, next_frontier)
898                            }
899                            Some(Either::Left(prog)) => {
900                                output_frontier = prog;
901                                cap_set.downgrade(output_frontier.iter());
902                                continue 'emitting_parts;
903                            }
904                            None => {
905                                if pending_parts.is_empty() {
906                                    break 'emitting_parts;
907                                } else {
908                                    continue 'emitting_parts;
909                                }
910                            }
911                        }
912                    };
913
914                let byte_size = part.byte_size();
915                // Store the value with the _frontier_ the `flow_control_input` must reach
916                // to retire it. Note that if this `results_in` is `None`, then we
917                // are at `T::MAX`, and give up on flow_control entirely.
918                //
919                // SUBTLE: If we stop storing these parts, we will likely never check the
920                // `flow_control_input` ever again. This won't pile up data as that input
921                // only has frontier updates. There may be spurious activations from it though.
922                //
923                // Also note that we don't attempt to handle overflowing the `u64` part counter.
924                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
925                    inflight_parts.push((emission_ts, byte_size));
926                }
927
928                // Emit the data at the given time, and update the frontier and capabilities
929                // to just beyond the part.
930                data_output.give(&cap_set.delayed(&time), part);
931
932                if let Some(metrics) = &metrics {
933                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
934                }
935
936                output_frontier = next_frontier;
937                cap_set.downgrade(output_frontier.iter())
938            } else {
939                if let Some(metrics) = &metrics {
940                    metrics
941                        .last_backpressured_bytes
942                        .set(u64::cast_from(inflight_bytes))
943                }
944                let parts_count = inflight_parts.len();
945                // We've exhausted our budget, listen for updates to the flow_control
946                // input's frontier until we free up new budget. If we don't interact with
947                // with this side of the if statement, because the stream has no data, we
948                // don't cause unbounded buffering in timely.
949                let new_flow_control_frontier = match flow_control_input.next().await {
950                    Some(Event::Progress(frontier)) => frontier,
951                    Some(Event::Data(_, _)) => {
952                        unreachable!("flow_control_input should not contain data")
953                    }
954                    None => Antichain::new(),
955                };
956
957                // Update the `flow_control_frontier` if its advanced.
958                flow_control_frontier.clone_from(&new_flow_control_frontier);
959
960                // Retire parts that are processed downstream.
961                let retired_parts = inflight_parts
962                    .drain_filter_swapping(|(ts, _size)| !flow_control_frontier.less_equal(ts));
963                let (retired_size, retired_count): (usize, usize) = retired_parts
964                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
965                        (accum_size + size, accum_count + 1)
966                    });
967                trace!(
968                    "returning {} parts with {} bytes, frontier: {:?}",
969                    retired_count, retired_size, flow_control_frontier,
970                );
971
972                if let Some(metrics) = &metrics {
973                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
974                }
975
976                // Optionally emit some information for tests to examine.
977                if let Some(probe) = probe.as_ref() {
978                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
979                }
980            }
981        }
982    });
983    (data_stream, shutdown_button.press_on_drop())
984}
985
986#[cfg(test)]
987mod tests {
988    use timely::container::CapacityContainerBuilder;
989    use timely::dataflow::operators::{Enter, Probe};
990    use tokio::sync::mpsc::unbounded_channel;
991    use tokio::sync::oneshot;
992
993    use super::*;
994
995    #[mz_ore::test]
996    fn test_backpressure_non_granular() {
997        use Step::*;
998        backpressure_runner(
999            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
1000            100,
1001            (1, Subtime(0)),
1002            vec![
1003                // Assert we backpressure only after we have emitted
1004                // the entire timestamp.
1005                AssertOutputFrontier((50, Subtime(2))),
1006                AssertBackpressured {
1007                    frontier: (1, Subtime(0)),
1008                    inflight_parts: 1,
1009                    retired_parts: 0,
1010                },
1011                AssertBackpressured {
1012                    frontier: (51, Subtime(0)),
1013                    inflight_parts: 1,
1014                    retired_parts: 0,
1015                },
1016                ProcessXParts(2),
1017                AssertBackpressured {
1018                    frontier: (101, Subtime(0)),
1019                    inflight_parts: 2,
1020                    retired_parts: 2,
1021                },
1022                // Assert we make later progress once processing
1023                // the parts.
1024                AssertOutputFrontier((100, Subtime(3))),
1025            ],
1026            true,
1027        );
1028
1029        backpressure_runner(
1030            vec![
1031                (50, Part(10)),
1032                (50, Part(10)),
1033                (51, Part(100)),
1034                (52, Part(1000)),
1035            ],
1036            50,
1037            (1, Subtime(0)),
1038            vec![
1039                // Assert we backpressure only after we emitted enough bytes
1040                AssertOutputFrontier((51, Subtime(3))),
1041                AssertBackpressured {
1042                    frontier: (1, Subtime(0)),
1043                    inflight_parts: 3,
1044                    retired_parts: 0,
1045                },
1046                ProcessXParts(3),
1047                AssertBackpressured {
1048                    frontier: (52, Subtime(0)),
1049                    inflight_parts: 3,
1050                    retired_parts: 2,
1051                },
1052                AssertBackpressured {
1053                    frontier: (53, Subtime(0)),
1054                    inflight_parts: 1,
1055                    retired_parts: 1,
1056                },
1057                // Assert we make later progress once processing
1058                // the parts.
1059                AssertOutputFrontier((52, Subtime(4))),
1060            ],
1061            true,
1062        );
1063
1064        backpressure_runner(
1065            vec![
1066                (50, Part(98)),
1067                (50, Part(1)),
1068                (51, Part(10)),
1069                (52, Part(100)),
1070                // Additional parts at the same timestamp
1071                (52, Part(10)),
1072                (52, Part(10)),
1073                (52, Part(10)),
1074                (52, Part(100)),
1075                // A later part with a later ts.
1076                (100, Part(100)),
1077            ],
1078            100,
1079            (1, Subtime(0)),
1080            vec![
1081                AssertOutputFrontier((51, Subtime(3))),
1082                // Assert we backpressure after we have emitted enough bytes.
1083                // We assert twice here because we get updates as
1084                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1085                AssertBackpressured {
1086                    frontier: (1, Subtime(0)),
1087                    inflight_parts: 3,
1088                    retired_parts: 0,
1089                },
1090                AssertBackpressured {
1091                    frontier: (51, Subtime(0)),
1092                    inflight_parts: 3,
1093                    retired_parts: 0,
1094                },
1095                ProcessXParts(1),
1096                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1097                // 50.
1098                AssertOutputFrontier((51, Subtime(3))),
1099                // After we process all of `50`, we can start emitting data at `52`, but only until
1100                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1101                // all of `51`.
1102                ProcessXParts(1),
1103                AssertOutputFrontier((52, Subtime(4))),
1104                AssertBackpressured {
1105                    frontier: (52, Subtime(0)),
1106                    inflight_parts: 3,
1107                    retired_parts: 2,
1108                },
1109                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1110                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1111                // time `100`.
1112                ProcessXParts(1),
1113                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1114                // parts at `52`
1115                // This is an intermediate state.
1116                AssertBackpressured {
1117                    frontier: (53, Subtime(0)),
1118                    inflight_parts: 2,
1119                    retired_parts: 1,
1120                },
1121                // After we process all of `52`, we can continue to the next time.
1122                ProcessXParts(5),
1123                AssertBackpressured {
1124                    frontier: (101, Subtime(0)),
1125                    inflight_parts: 5,
1126                    retired_parts: 5,
1127                },
1128                AssertOutputFrontier((100, Subtime(9))),
1129            ],
1130            true,
1131        );
1132    }
1133
1134    #[mz_ore::test]
1135    fn test_backpressure_granular() {
1136        use Step::*;
1137        backpressure_runner(
1138            vec![(50, Part(101)), (50, Part(101))],
1139            100,
1140            (0, Subtime(1)),
1141            vec![
1142                // Advance our frontier to outputting a single part.
1143                AssertOutputFrontier((50, Subtime(1))),
1144                // Receive backpressure updates until our frontier is up-to-date but
1145                // not beyond the parts (while considering the summary).
1146                AssertBackpressured {
1147                    frontier: (0, Subtime(1)),
1148                    inflight_parts: 1,
1149                    retired_parts: 0,
1150                },
1151                AssertBackpressured {
1152                    frontier: (50, Subtime(1)),
1153                    inflight_parts: 1,
1154                    retired_parts: 0,
1155                },
1156                // Process that part.
1157                ProcessXParts(1),
1158                // Assert that we clear the backpressure status
1159                AssertBackpressured {
1160                    frontier: (50, Subtime(2)),
1161                    inflight_parts: 1,
1162                    retired_parts: 1,
1163                },
1164                // Ensure we make progress to the next part.
1165                AssertOutputFrontier((50, Subtime(2))),
1166            ],
1167            false,
1168        );
1169
1170        backpressure_runner(
1171            vec![
1172                (50, Part(10)),
1173                (50, Part(10)),
1174                (51, Part(35)),
1175                (52, Part(100)),
1176            ],
1177            50,
1178            (0, Subtime(1)),
1179            vec![
1180                // we can emit 3 parts before we hit the backpressure limit
1181                AssertOutputFrontier((51, Subtime(3))),
1182                AssertBackpressured {
1183                    frontier: (0, Subtime(1)),
1184                    inflight_parts: 3,
1185                    retired_parts: 0,
1186                },
1187                AssertBackpressured {
1188                    frontier: (50, Subtime(1)),
1189                    inflight_parts: 3,
1190                    retired_parts: 0,
1191                },
1192                // Retire the single part.
1193                ProcessXParts(1),
1194                AssertBackpressured {
1195                    frontier: (50, Subtime(2)),
1196                    inflight_parts: 3,
1197                    retired_parts: 1,
1198                },
1199                // Ensure we make progress, and then
1200                // can retire the next 2 parts.
1201                AssertOutputFrontier((52, Subtime(4))),
1202                ProcessXParts(2),
1203                AssertBackpressured {
1204                    frontier: (52, Subtime(4)),
1205                    inflight_parts: 3,
1206                    retired_parts: 2,
1207                },
1208            ],
1209            false,
1210        );
1211    }
1212
1213    type Time = (u64, Subtime);
1214    #[derive(Clone, Debug)]
1215    struct Part(usize);
1216    impl Backpressureable for Part {
1217        fn byte_size(&self) -> usize {
1218            self.0
1219        }
1220    }
1221
1222    /// Actions taken by `backpressure_runner`.
1223    enum Step {
1224        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1225        /// this far. This is a single time because we assume
1226        AssertOutputFrontier(Time),
1227        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1228        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1229        /// retired.
1230        AssertBackpressured {
1231            frontier: Time,
1232            inflight_parts: usize,
1233            retired_parts: usize,
1234        },
1235        /// Process X parts in the downstream operator. This affects the feedback frontier.
1236        ProcessXParts(usize),
1237    }
1238
1239    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1240    fn backpressure_runner(
1241        // The input data to the `backpressure` operator
1242        input: Vec<(u64, Part)>,
1243        // The maximum inflight bytes the `backpressure` operator allows through.
1244        max_inflight_bytes: usize,
1245        // The feedback summary used by the `backpressure` operator.
1246        summary: Time,
1247        // List of steps to run through.
1248        steps: Vec<Step>,
1249        // Whether or not to consume records in the non-granular scope. This is useful when the
1250        // `summary` is something like `(1, 0)`.
1251        non_granular_consumer: bool,
1252    ) {
1253        timely::execute::execute_directly(move |worker| {
1254            let (backpressure_probe, consumer_tx, mut backpressure_status_rx, finalizer_tx, _token) =
1255                // Set up the top-level non-granular scope.
1256                worker.dataflow::<u64, _, _>(|scope| {
1257                    let (non_granular_feedback_handle, non_granular_feedback) =
1258                        if non_granular_consumer {
1259                            let (h, f) = scope.feedback(Default::default());
1260                            (Some(h), Some(f))
1261                        } else {
1262                            (None, None)
1263                        };
1264                    let (
1265                        backpressure_probe,
1266                        consumer_tx,
1267                        backpressure_status_rx,
1268                        token,
1269                        backpressured,
1270                        finalizer_tx,
1271                    ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1272                        let (input, finalizer_tx) =
1273                            iterator_operator(scope.clone(), input.into_iter());
1274
1275                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1276                            (
1277                                FlowControl {
1278                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1279                                    max_inflight_bytes,
1280                                    summary,
1281                                    metrics: None
1282                                },
1283                                None,
1284                            )
1285                        } else {
1286                            let (granular_feedback_handle, granular_feedback) =
1287                                scope.feedback(Default::default());
1288                            (
1289                                FlowControl {
1290                                    progress_stream: granular_feedback,
1291                                    max_inflight_bytes,
1292                                    summary,
1293                                    metrics: None,
1294                                },
1295                                Some(granular_feedback_handle),
1296                            )
1297                        };
1298
1299                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1300
1301                        let (backpressured, token) = backpressure(
1302                            scope,
1303                            "test",
1304                            &input,
1305                            flow_control,
1306                            0,
1307                            Some(backpressure_status_tx),
1308                        );
1309
1310                        // If we want to granularly consume the output, we setup the consumer here.
1311                        let tx = if !non_granular_consumer {
1312                            Some(consumer_operator(
1313                                scope.clone(),
1314                                &backpressured,
1315                                granular_feedback_handle.unwrap(),
1316                            ))
1317                        } else {
1318                            None
1319                        };
1320
1321                        (
1322                            backpressured.probe(),
1323                            tx,
1324                            backpressure_status_rx,
1325                            token,
1326                            backpressured.leave(),
1327                            finalizer_tx,
1328                        )
1329                    });
1330
1331                    // If we want to non-granularly consume the output, we setup the consumer here.
1332                    let consumer_tx = if non_granular_consumer {
1333                        consumer_operator(
1334                            scope.clone(),
1335                            &backpressured,
1336                            non_granular_feedback_handle.unwrap(),
1337                        )
1338                    } else {
1339                        consumer_tx.unwrap()
1340                    };
1341
1342                    (
1343                        backpressure_probe,
1344                        consumer_tx,
1345                        backpressure_status_rx,
1346                        finalizer_tx,
1347                        token,
1348                    )
1349                });
1350
1351            use Step::*;
1352            for step in steps {
1353                match step {
1354                    AssertOutputFrontier(time) => {
1355                        eprintln!("checking advance to {time:?}");
1356                        backpressure_probe.with_frontier(|front| {
1357                            eprintln!("current backpressure output frontier: {front:?}");
1358                        });
1359                        while backpressure_probe.less_than(&time) {
1360                            worker.step();
1361                            backpressure_probe.with_frontier(|front| {
1362                                eprintln!("current backpressure output frontier: {front:?}");
1363                            });
1364                            std::thread::sleep(std::time::Duration::from_millis(25));
1365                        }
1366                    }
1367                    ProcessXParts(parts) => {
1368                        eprintln!("processing {parts:?} parts");
1369                        for _ in 0..parts {
1370                            consumer_tx.send(()).unwrap();
1371                        }
1372                    }
1373                    AssertBackpressured {
1374                        frontier,
1375                        inflight_parts,
1376                        retired_parts,
1377                    } => {
1378                        let frontier = Antichain::from_elem(frontier);
1379                        eprintln!(
1380                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1381                            and {retired_parts:?} retired"
1382                        );
1383                        let (new_frontier, new_count, new_retired_count) = loop {
1384                            if let Ok(val) = backpressure_status_rx.try_recv() {
1385                                break val;
1386                            }
1387                            worker.step();
1388                            std::thread::sleep(std::time::Duration::from_millis(25));
1389                        };
1390                        assert_eq!(
1391                            (frontier, inflight_parts, retired_parts),
1392                            (new_frontier, new_count, new_retired_count)
1393                        );
1394                    }
1395                }
1396            }
1397            // Send the input to the empty frontier.
1398            let _ = finalizer_tx.send(());
1399        });
1400    }
1401
1402    /// An operator that emits `Part`'s at the specified timestamps. Does not
1403    /// drop its capability until it gets a signal from the `Sender` it returns.
1404    fn iterator_operator<
1405        G: Scope<Timestamp = (u64, Subtime)>,
1406        I: Iterator<Item = (u64, Part)> + 'static,
1407    >(
1408        scope: G,
1409        mut input: I,
1410    ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1411        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1412        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1413        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1414
1415        iterator.build(|mut caps| async move {
1416            let mut capability = Some(caps.pop().unwrap());
1417            let mut last = None;
1418            while let Some(element) = input.next() {
1419                let time = element.0.clone();
1420                let part = element.1;
1421                last = Some((time, Subtime(0)));
1422                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1423            }
1424            if let Some(last) = last {
1425                capability
1426                    .as_mut()
1427                    .unwrap()
1428                    .downgrade(&(last.0 + 1, last.1));
1429            }
1430
1431            let _ = finalizer_rx.await;
1432            capability.take();
1433        });
1434
1435        (output, finalizer_tx)
1436    }
1437
1438    /// An operator that consumes its input ONLY when given a signal to do from
1439    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1440    /// being processed. Also connects the `feedback` handle to its output.
1441    fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1442        scope: G,
1443        input: &Stream<G, O>,
1444        feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1445    ) -> UnboundedSender<()> {
1446        let (tx, mut rx) = unbounded_channel::<()>();
1447        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1448        let (output_handle, output) =
1449            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1450        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1451
1452        consumer.build(|_caps| async move {
1453            while let Some(()) = rx.recv().await {
1454                // Consume exactly one messages (unless the input is exhausted).
1455                while let Some(Event::Progress(_)) = input.next().await {}
1456            }
1457        });
1458        output.connect_loop(feedback);
1459
1460        tx
1461    }
1462}