mz_storage_operators/
persist_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from an a persist shard.
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use std::convert::Infallible;
14use std::fmt::Debug;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::Arc;
18use std::time::Instant;
19
20use differential_dataflow::lattice::Lattice;
21use futures::{StreamExt, future::Either};
22use mz_expr::{ColumnSpecs, Interpreter, MfpPlan, ResultSpec, UnmaterializableFunc};
23use mz_ore::cast::CastFrom;
24use mz_ore::collections::CollectionExt;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::cfg::{PersistConfig, RetryParameters};
27use mz_persist_client::fetch::{ExchangeableBatchPart, ShardSourcePart};
28use mz_persist_client::fetch::{FetchedBlob, FetchedPart};
29use mz_persist_client::operators::shard_source::{
30    ErrorHandler, FilterResult, SnapshotMode, shard_source,
31};
32use mz_persist_client::stats::STATS_AUDIT_PANIC;
33use mz_persist_types::Codec64;
34use mz_persist_types::codec_impls::UnitSchema;
35use mz_persist_types::columnar::{ColumnEncoder, Schema};
36use mz_repr::{Datum, DatumVec, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
37use mz_storage_types::StorageDiff;
38use mz_storage_types::controller::{CollectionMetadata, TxnsCodecRow};
39use mz_storage_types::errors::DataflowError;
40use mz_storage_types::sources::SourceData;
41use mz_storage_types::stats::RelationPartStats;
42use mz_timely_util::builder_async::{
43    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::probe::ProbeNotify;
46use mz_txn_wal::operator::{TxnsContext, txns_progress};
47use serde::{Deserialize, Serialize};
48use timely::PartialOrder;
49use timely::container::CapacityContainerBuilder;
50use timely::dataflow::ScopeParent;
51use timely::dataflow::channels::pact::Pipeline;
52use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
53use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
54use timely::dataflow::operators::{Capability, Leave, OkErr};
55use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
56use timely::dataflow::scopes::Child;
57use timely::dataflow::{Scope, Stream};
58use timely::order::TotalOrder;
59use timely::progress::Antichain;
60use timely::progress::Timestamp as TimelyTimestamp;
61use timely::progress::timestamp::PathSummary;
62use timely::scheduling::Activator;
63use tokio::sync::mpsc::UnboundedSender;
64use tracing::{error, trace};
65
66use crate::metrics::BackpressureMetrics;
67
68/// This opaque token represents progress within a timestamp, allowing finer-grained frontier
69/// progress than would otherwise be possible.
70///
71/// This is "opaque" since we'd like to reserve the right to change the definition in the future
72/// without downstreams being able to rely on the precise representation. (At the moment, this
73/// is a simple batch counter, though we may change it to eg. reflect progress through the keyspace
74/// in the future.)
75#[derive(
76    Copy, Clone, PartialEq, Default, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Hash,
77)]
78pub struct Subtime(u64);
79
80impl PartialOrder for Subtime {
81    fn less_equal(&self, other: &Self) -> bool {
82        self.0.less_equal(&other.0)
83    }
84}
85
86impl TotalOrder for Subtime {}
87
88impl PathSummary<Subtime> for Subtime {
89    fn results_in(&self, src: &Subtime) -> Option<Subtime> {
90        self.0.results_in(&src.0).map(Subtime)
91    }
92
93    fn followed_by(&self, other: &Self) -> Option<Self> {
94        self.0.followed_by(&other.0).map(Subtime)
95    }
96}
97
98impl TimelyTimestamp for Subtime {
99    type Summary = Subtime;
100
101    fn minimum() -> Self {
102        Subtime(0)
103    }
104}
105
106impl Subtime {
107    /// The smallest non-zero summary for the opaque timestamp type.
108    pub const fn least_summary() -> Self {
109        Subtime(1)
110    }
111}
112
113/// Creates a new source that reads from a persist shard, distributing the work
114/// of reading data to all timely workers.
115///
116/// All times emitted will have been [advanced by] the given `as_of` frontier.
117/// All updates at times greater or equal to `until` will be suppressed.
118/// The `map_filter_project` argument, if supplied, may be partially applied,
119/// and any un-applied part of the argument will be left behind in the argument.
120///
121/// Users of this function have the ability to apply flow control to the output
122/// to limit the in-flight data (measured in bytes) it can emit. The flow control
123/// input is a timely stream that communicates the frontier at which the data
124/// emitted from by this source have been dropped.
125///
126/// **Note:** Because this function is reading batches from `persist`, it is working
127/// at batch granularity. In practice, the source will be overshooting the target
128/// flow control upper by an amount that is related to the size of batches.
129///
130/// If no flow control is desired an empty stream whose frontier immediately advances
131/// to the empty antichain can be used. An easy easy of creating such stream is by
132/// using [`timely::dataflow::operators::generic::operator::empty`].
133///
134/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
135pub fn persist_source<G>(
136    scope: &mut G,
137    source_id: GlobalId,
138    persist_clients: Arc<PersistClientCache>,
139    txns_ctx: &TxnsContext,
140    metadata: CollectionMetadata,
141    read_schema: Option<RelationDesc>,
142    as_of: Option<Antichain<Timestamp>>,
143    snapshot_mode: SnapshotMode,
144    until: Antichain<Timestamp>,
145    map_filter_project: Option<&mut MfpPlan>,
146    max_inflight_bytes: Option<usize>,
147    start_signal: impl Future<Output = ()> + 'static,
148    error_handler: ErrorHandler,
149) -> (
150    Stream<G, (Row, Timestamp, Diff)>,
151    Stream<G, (DataflowError, Timestamp, Diff)>,
152    Vec<PressOnDropButton>,
153)
154where
155    G: Scope<Timestamp = mz_repr::Timestamp>,
156{
157    let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());
158
159    let mut tokens = vec![];
160
161    let stream = scope.scoped(&format!("granular_backpressure({})", source_id), |scope| {
162        let (flow_control, flow_control_probe) = match max_inflight_bytes {
163            Some(max_inflight_bytes) => {
164                let backpressure_metrics = BackpressureMetrics {
165                    emitted_bytes: Arc::clone(&shard_metrics.backpressure_emitted_bytes),
166                    last_backpressured_bytes: Arc::clone(
167                        &shard_metrics.backpressure_last_backpressured_bytes,
168                    ),
169                    retired_bytes: Arc::clone(&shard_metrics.backpressure_retired_bytes),
170                };
171
172                let probe = mz_timely_util::probe::Handle::default();
173                let progress_stream = mz_timely_util::probe::source(
174                    scope.clone(),
175                    format!("decode_backpressure_probe({source_id})"),
176                    probe.clone(),
177                );
178                let flow_control = FlowControl {
179                    progress_stream,
180                    max_inflight_bytes,
181                    summary: (Default::default(), Subtime::least_summary()),
182                    metrics: Some(backpressure_metrics),
183                };
184                (Some(flow_control), Some(probe))
185            }
186            None => (None, None),
187        };
188
189        // Our default listen sleeps are tuned for the case of a shard that is
190        // written once a second, but txn-wal allows these to be lazy.
191        // Override the tuning to reduce crdb load. The pubsub fallback
192        // responsibility is then replaced by manual "one state" wakeups in the
193        // txns_progress operator.
194        let cfg = Arc::clone(&persist_clients.cfg().configs);
195        let subscribe_sleep = match metadata.txns_shard {
196            Some(_) => Some(move || mz_txn_wal::operator::txns_data_shard_retry_params(&cfg)),
197            None => None,
198        };
199
200        let (stream, source_tokens) = persist_source_core(
201            scope,
202            source_id,
203            Arc::clone(&persist_clients),
204            metadata.clone(),
205            read_schema,
206            as_of.clone(),
207            snapshot_mode,
208            until.clone(),
209            map_filter_project,
210            flow_control,
211            subscribe_sleep,
212            start_signal,
213            error_handler,
214        );
215        tokens.extend(source_tokens);
216
217        let stream = match flow_control_probe {
218            Some(probe) => stream.probe_notify_with(vec![probe]),
219            None => stream,
220        };
221
222        stream.leave()
223    });
224
225    // If a txns_shard was provided, then this shard is in the txn-wal
226    // system. This means the "logical" upper may be ahead of the "physical"
227    // upper. Render a dataflow operator that passes through the input and
228    // translates the progress frontiers as necessary.
229    let (stream, txns_tokens) = match metadata.txns_shard {
230        Some(txns_shard) => txns_progress::<SourceData, (), Timestamp, i64, _, TxnsCodecRow, _, _>(
231            stream,
232            &source_id.to_string(),
233            txns_ctx,
234            move || {
235                let (c, l) = (
236                    Arc::clone(&persist_clients),
237                    metadata.persist_location.clone(),
238                );
239                async move { c.open(l).await.expect("location is valid") }
240            },
241            txns_shard,
242            metadata.data_shard,
243            as_of
244                .expect("as_of is provided for table sources")
245                .into_option()
246                .expect("shard is not closed"),
247            until,
248            Arc::new(metadata.relation_desc),
249            Arc::new(UnitSchema),
250        ),
251        None => (stream, vec![]),
252    };
253    tokens.extend(txns_tokens);
254    let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
255        Ok(row) => Ok((row, t.0, r)),
256        Err(err) => Err((err, t.0, r)),
257    });
258    (ok_stream, err_stream, tokens)
259}
260
261type RefinedScope<'g, G> = Child<'g, G, (<G as ScopeParent>::Timestamp, Subtime)>;
262
263/// Creates a new source that reads from a persist shard, distributing the work
264/// of reading data to all timely workers.
265///
266/// All times emitted will have been [advanced by] the given `as_of` frontier.
267///
268/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
269#[allow(clippy::needless_borrow)]
270pub fn persist_source_core<'g, G>(
271    scope: &RefinedScope<'g, G>,
272    source_id: GlobalId,
273    persist_clients: Arc<PersistClientCache>,
274    metadata: CollectionMetadata,
275    read_schema: Option<RelationDesc>,
276    as_of: Option<Antichain<Timestamp>>,
277    snapshot_mode: SnapshotMode,
278    until: Antichain<Timestamp>,
279    map_filter_project: Option<&mut MfpPlan>,
280    flow_control: Option<FlowControl<RefinedScope<'g, G>>>,
281    // If Some, an override for the default listen sleep retry parameters.
282    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
283    start_signal: impl Future<Output = ()> + 'static,
284    error_handler: ErrorHandler,
285) -> (
286    Stream<
287        RefinedScope<'g, G>,
288        (
289            Result<Row, DataflowError>,
290            (mz_repr::Timestamp, Subtime),
291            Diff,
292        ),
293    >,
294    Vec<PressOnDropButton>,
295)
296where
297    G: Scope<Timestamp = mz_repr::Timestamp>,
298{
299    let cfg = persist_clients.cfg().clone();
300    let name = source_id.to_string();
301    let filter_plan = map_filter_project.as_ref().map(|p| (*p).clone());
302
303    // N.B. `read_schema` may be a subset of the total columns for this shard.
304    let read_desc = match read_schema {
305        Some(desc) => desc,
306        None => metadata.relation_desc,
307    };
308
309    let desc_transformer = match flow_control {
310        Some(flow_control) => Some(move |mut scope: _, descs: &Stream<_, _>, chosen_worker| {
311            let (stream, token) = backpressure(
312                &mut scope,
313                &format!("backpressure({source_id})"),
314                descs,
315                flow_control,
316                chosen_worker,
317                None,
318            );
319            (stream, vec![token])
320        }),
321        None => None,
322    };
323
324    let metrics = Arc::clone(persist_clients.metrics());
325    let filter_name = name.clone();
326    // The `until` gives us an upper bound on the possible values of `mz_now` this query may see.
327    // Ranges are inclusive, so it's safe to use the maximum timestamp as the upper bound when
328    // `until ` is the empty antichain.
329    let upper = until.as_option().cloned().unwrap_or(Timestamp::MAX);
330    let (fetched, token) = shard_source(
331        &mut scope.clone(),
332        &name,
333        move || {
334            let (c, l) = (
335                Arc::clone(&persist_clients),
336                metadata.persist_location.clone(),
337            );
338            async move { c.open(l).await.unwrap() }
339        },
340        metadata.data_shard,
341        as_of,
342        snapshot_mode,
343        until.clone(),
344        desc_transformer,
345        Arc::new(read_desc.clone()),
346        Arc::new(UnitSchema),
347        move |stats, frontier| {
348            let Some(lower) = frontier.as_option().copied() else {
349                // If the frontier has advanced to the empty antichain,
350                // we'll never emit any rows from any part.
351                return FilterResult::Discard;
352            };
353
354            if lower > upper {
355                // The frontier timestamp is larger than the until of the dataflow:
356                // anything from this part will necessarily be filtered out.
357                return FilterResult::Discard;
358            }
359
360            let time_range =
361                ResultSpec::value_between(Datum::MzTimestamp(lower), Datum::MzTimestamp(upper));
362            if let Some(plan) = &filter_plan {
363                let metrics = &metrics.pushdown.part_stats;
364                let stats = RelationPartStats::new(&filter_name, metrics, &read_desc, stats);
365                filter_result(&read_desc, time_range, stats, plan)
366            } else {
367                FilterResult::Keep
368            }
369        },
370        listen_sleep,
371        start_signal,
372        error_handler,
373    );
374    let rows = decode_and_mfp(cfg, &fetched, &name, until, map_filter_project);
375    (rows, token)
376}
377
378fn filter_result(
379    relation_desc: &RelationDesc,
380    time_range: ResultSpec,
381    stats: RelationPartStats,
382    plan: &MfpPlan,
383) -> FilterResult {
384    let arena = RowArena::new();
385    let mut ranges = ColumnSpecs::new(relation_desc.typ(), &arena);
386    ranges.push_unmaterializable(UnmaterializableFunc::MzNow, time_range);
387
388    let may_error = stats.err_count().map_or(true, |count| count > 0);
389
390    // N.B. We may have pushed down column "demands" into Persist, so this
391    // relation desc may have a different set of columns than the stats.
392    for (pos, (idx, _, _)) in relation_desc.iter_all().enumerate() {
393        let result_spec = stats.col_stats(idx, &arena);
394        ranges.push_column(pos, result_spec);
395    }
396    let result = ranges.mfp_plan_filter(plan).range;
397    let may_error = may_error || result.may_fail();
398    let may_keep = result.may_contain(Datum::True);
399    let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
400    if relation_desc.len() == 0 && !may_error && !may_skip {
401        let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
402            return FilterResult::Keep;
403        };
404        key.append(&SourceData(Ok(Row::default())));
405        let key = key.finish();
406        let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
407            return FilterResult::Keep;
408        };
409        val.append(&());
410        let val = val.finish();
411
412        FilterResult::ReplaceWith {
413            key: Arc::new(key),
414            val: Arc::new(val),
415        }
416    } else if may_error || may_keep {
417        FilterResult::Keep
418    } else {
419        FilterResult::Discard
420    }
421}
422
423pub fn decode_and_mfp<G>(
424    cfg: PersistConfig,
425    fetched: &Stream<G, FetchedBlob<SourceData, (), Timestamp, StorageDiff>>,
426    name: &str,
427    until: Antichain<Timestamp>,
428    mut map_filter_project: Option<&mut MfpPlan>,
429) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>
430where
431    G: Scope<Timestamp = (mz_repr::Timestamp, Subtime)>,
432{
433    let scope = fetched.scope();
434    let mut builder = OperatorBuilder::new(
435        format!("persist_source::decode_and_mfp({})", name),
436        scope.clone(),
437    );
438    let operator_info = builder.operator_info();
439
440    let mut fetched_input = builder.new_input(fetched, Pipeline);
441    let (updates_output, updates_stream) = builder.new_output();
442    let mut updates_output = OutputBuilder::from(updates_output);
443
444    // Re-used state for processing and building rows.
445    let mut datum_vec = mz_repr::DatumVec::new();
446    let mut row_builder = Row::default();
447
448    // Extract the MFP if it exists; leave behind an identity MFP in that case.
449    let map_filter_project = map_filter_project.as_mut().map(|mfp| mfp.take());
450
451    builder.build(move |_caps| {
452        let name = name.to_owned();
453        // Acquire an activator to reschedule the operator when it has unfinished work.
454        let activations = scope.activations();
455        let activator = Activator::new(operator_info.address, activations);
456        // Maintain a list of work to do
457        let mut pending_work = std::collections::VecDeque::new();
458        let panic_on_audit_failure = STATS_AUDIT_PANIC.handle(&cfg);
459
460        move |_frontier| {
461            fetched_input.for_each(|time, data| {
462                let capability = time.retain();
463                for fetched_blob in data.drain(..) {
464                    pending_work.push_back(PendingWork {
465                        panic_on_audit_failure: panic_on_audit_failure.get(),
466                        capability: capability.clone(),
467                        part: PendingPart::Unparsed(fetched_blob),
468                    })
469                }
470            });
471
472            // Get dyncfg values once per schedule to amortize the cost of
473            // loading the atomics.
474            let yield_fuel = cfg.storage_source_decode_fuel();
475            let yield_fn = |_, work| work >= yield_fuel;
476
477            let mut work = 0;
478            let start_time = Instant::now();
479            let mut output = updates_output.activate();
480            while !pending_work.is_empty() && !yield_fn(start_time, work) {
481                let done = pending_work.front_mut().unwrap().do_work(
482                    &mut work,
483                    &name,
484                    start_time,
485                    yield_fn,
486                    &until,
487                    map_filter_project.as_ref(),
488                    &mut datum_vec,
489                    &mut row_builder,
490                    &mut output,
491                );
492                if done {
493                    pending_work.pop_front();
494                }
495            }
496            if !pending_work.is_empty() {
497                activator.activate();
498            }
499        }
500    });
501
502    updates_stream
503}
504
505/// Pending work to read from fetched parts
506struct PendingWork {
507    /// Whether to panic if a part fails an audit, or to just pass along the audited data.
508    panic_on_audit_failure: bool,
509    /// The time at which the work should happen.
510    capability: Capability<(mz_repr::Timestamp, Subtime)>,
511    /// Pending fetched part.
512    part: PendingPart,
513}
514
515enum PendingPart {
516    Unparsed(FetchedBlob<SourceData, (), Timestamp, StorageDiff>),
517    Parsed {
518        part: ShardSourcePart<SourceData, (), Timestamp, StorageDiff>,
519    },
520}
521
522impl PendingPart {
523    /// Returns the contained `FetchedPart`, first parsing it from a
524    /// `FetchedBlob` if necessary.
525    ///
526    /// Also returns a bool, which is true if the part is known (from pushdown
527    /// stats) to be free of `SourceData(Err(_))`s. It will be false if the part
528    /// is known to contain errors or if it's unknown.
529    fn part_mut(&mut self) -> &mut FetchedPart<SourceData, (), Timestamp, StorageDiff> {
530        match self {
531            PendingPart::Unparsed(x) => {
532                *self = PendingPart::Parsed { part: x.parse() };
533                // Won't recurse any further.
534                self.part_mut()
535            }
536            PendingPart::Parsed { part } => &mut part.part,
537        }
538    }
539}
540
541impl PendingWork {
542    /// Perform work, reading from the fetched part, decoding, and sending outputs, while checking
543    /// `yield_fn` whether more fuel is available.
544    fn do_work<YFn>(
545        &mut self,
546        work: &mut usize,
547        name: &str,
548        start_time: Instant,
549        yield_fn: YFn,
550        until: &Antichain<Timestamp>,
551        map_filter_project: Option<&MfpPlan>,
552        datum_vec: &mut DatumVec,
553        row_builder: &mut Row,
554        output: &mut OutputBuilderSession<
555            '_,
556            (mz_repr::Timestamp, Subtime),
557            ConsolidatingContainerBuilder<
558                Vec<(
559                    Result<Row, DataflowError>,
560                    (mz_repr::Timestamp, Subtime),
561                    Diff,
562                )>,
563            >,
564        >,
565    ) -> bool
566    where
567        YFn: Fn(Instant, usize) -> bool,
568    {
569        let mut session = output.session_with_builder(&self.capability);
570        let fetched_part = self.part.part_mut();
571        let is_filter_pushdown_audit = fetched_part.is_filter_pushdown_audit();
572        let mut row_buf = None;
573        while let Some(((key, val), time, diff)) =
574            fetched_part.next_with_storage(&mut row_buf, &mut None)
575        {
576            if until.less_equal(&time) {
577                continue;
578            }
579            match (key, val) {
580                (Ok(SourceData(Ok(row))), Ok(())) => {
581                    if let Some(mfp) = map_filter_project {
582                        // We originally accounted work as the number of outputs, to give downstream
583                        // operators a chance to reduce down anything we've emitted. This mfp call
584                        // might have a restrictive filter, which would have been counted as no
585                        // work. However, in practice, we've been decode_and_mfp be a source of
586                        // interactivity loss during rehydration, so we now also count each mfp
587                        // evaluation against our fuel.
588                        *work += 1;
589                        let arena = mz_repr::RowArena::new();
590                        let mut datums_local = datum_vec.borrow_with(&row);
591                        for result in mfp.evaluate(
592                            &mut datums_local,
593                            &arena,
594                            time,
595                            diff.into(),
596                            |time| !until.less_equal(time),
597                            row_builder,
598                        ) {
599                            // Earlier we decided this Part doesn't need to be fetched, but to
600                            // audit our logic we fetched it any way. If the MFP returned data it
601                            // means our earlier decision to not fetch this part was incorrect.
602                            if let Some(stats) = &is_filter_pushdown_audit {
603                                // NB: The tag added by this scope is used for alerting. The panic
604                                // message may be changed arbitrarily, but the tag key and val must
605                                // stay the same.
606                                sentry::with_scope(
607                                    |scope| {
608                                        scope
609                                            .set_tag("alert_id", "persist_pushdown_audit_violation")
610                                    },
611                                    || {
612                                        error!(
613                                            ?stats,
614                                            name,
615                                            ?mfp,
616                                            ?result,
617                                            "persist filter pushdown correctness violation!"
618                                        );
619                                        if self.panic_on_audit_failure {
620                                            panic!(
621                                                "persist filter pushdown correctness violation! {}",
622                                                name
623                                            );
624                                        }
625                                    },
626                                );
627                            }
628                            match result {
629                                Ok((row, time, diff)) => {
630                                    // Additional `until` filtering due to temporal filters.
631                                    if !until.less_equal(&time) {
632                                        let mut emit_time = *self.capability.time();
633                                        emit_time.0 = time;
634                                        session.give((Ok(row), emit_time, diff));
635                                        *work += 1;
636                                    }
637                                }
638                                Err((err, time, diff)) => {
639                                    // Additional `until` filtering due to temporal filters.
640                                    if !until.less_equal(&time) {
641                                        let mut emit_time = *self.capability.time();
642                                        emit_time.0 = time;
643                                        session.give((Err(err), emit_time, diff));
644                                        *work += 1;
645                                    }
646                                }
647                            }
648                        }
649                        // At the moment, this is the only case where we can re-use the allocs for
650                        // the `SourceData`/`Row` we decoded. This could be improved if this timely
651                        // operator used a different container than `Vec<Row>`.
652                        drop(datums_local);
653                        row_buf.replace(SourceData(Ok(row)));
654                    } else {
655                        let mut emit_time = *self.capability.time();
656                        emit_time.0 = time;
657                        // Clone row so we retain our row allocation.
658                        session.give((Ok(row.clone()), emit_time, diff.into()));
659                        row_buf.replace(SourceData(Ok(row)));
660                        *work += 1;
661                    }
662                }
663                (Ok(SourceData(Err(err))), Ok(())) => {
664                    let mut emit_time = *self.capability.time();
665                    emit_time.0 = time;
666                    session.give((Err(err), emit_time, diff.into()));
667                    *work += 1;
668                }
669                // TODO(petrosagg): error handling
670                (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => {
671                    panic!("decoding failed")
672                }
673            }
674            if yield_fn(start_time, *work) {
675                return false;
676            }
677        }
678        true
679    }
680}
681
682/// A trait representing a type that can be used in `backpressure`.
683pub trait Backpressureable: Clone + 'static {
684    /// Return the weight of the object, in bytes.
685    fn byte_size(&self) -> usize;
686}
687
688impl<T: Clone + 'static> Backpressureable for (usize, ExchangeableBatchPart<T>) {
689    fn byte_size(&self) -> usize {
690        self.1.encoded_size_bytes()
691    }
692}
693
694/// Flow control configuration.
695#[derive(Debug)]
696pub struct FlowControl<G: Scope> {
697    /// Stream providing in-flight frontier updates.
698    ///
699    /// As implied by its type, this stream never emits data, only progress updates.
700    ///
701    /// TODO: Replace `Infallible` with `!` once the latter is stabilized.
702    pub progress_stream: Stream<G, Infallible>,
703    /// Maximum number of in-flight bytes.
704    pub max_inflight_bytes: usize,
705    /// The minimum range of timestamps (be they granular or not) that must be emitted,
706    /// ignoring `max_inflight_bytes` to ensure forward progress is made.
707    pub summary: <G::Timestamp as TimelyTimestamp>::Summary,
708
709    /// Optional metrics for the `backpressure` operator to keep up-to-date.
710    pub metrics: Option<BackpressureMetrics>,
711}
712
713/// Apply flow control to the `data` input, based on the given `FlowControl`.
714///
715/// The `FlowControl` should have a `progress_stream` that is the pristine, unaltered
716/// frontier of the downstream operator we want to backpressure from, a `max_inflight_bytes`,
717/// and a `summary`. Note that the `data` input expects all the second part of the tuple
718/// timestamp to be 0, and all data to be on the `chosen_worker` worker.
719///
720/// The `summary` represents the _minimum_ range of timestamps that needs to be emitted before
721/// reasoning about `max_inflight_bytes`. In practice this means that we may overshoot
722/// `max_inflight_bytes`.
723///
724/// The implementation of this operator is very subtle. Many inline comments have been added.
725pub fn backpressure<T, G, O>(
726    scope: &mut G,
727    name: &str,
728    data: &Stream<G, O>,
729    flow_control: FlowControl<G>,
730    chosen_worker: usize,
731    // A probe used to inspect this operator during unit-testing
732    probe: Option<UnboundedSender<(Antichain<(T, Subtime)>, usize, usize)>>,
733) -> (Stream<G, O>, PressOnDropButton)
734where
735    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder,
736    G: Scope<Timestamp = (T, Subtime)>,
737    O: Backpressureable + std::fmt::Debug,
738{
739    let worker_index = scope.index();
740
741    let (flow_control_stream, flow_control_max_bytes, metrics) = (
742        flow_control.progress_stream,
743        flow_control.max_inflight_bytes,
744        flow_control.metrics,
745    );
746
747    // Both the `flow_control` input and the data input are disconnected from the output. We manually
748    // manage the output's frontier using a `CapabilitySet`. Note that we also adjust the
749    // `flow_control` progress stream using the `summary` here, using a `feedback` operator in a
750    // non-circular fashion.
751    let (handle, summaried_flow) = scope.feedback(flow_control.summary.clone());
752    flow_control_stream.connect_loop(handle);
753
754    let mut builder = AsyncOperatorBuilder::new(
755        format!("persist_source_backpressure({})", name),
756        scope.clone(),
757    );
758    let (data_output, data_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
759
760    let mut data_input = builder.new_disconnected_input(data, Pipeline);
761    let mut flow_control_input = builder.new_disconnected_input(&summaried_flow, Pipeline);
762
763    // Helper method used to synthesize current and next frontier for ordered times.
764    fn synthesize_frontiers<T: PartialOrder + Clone>(
765        mut frontier: Antichain<(T, Subtime)>,
766        mut time: (T, Subtime),
767        part_number: &mut u64,
768    ) -> (
769        (T, Subtime),
770        Antichain<(T, Subtime)>,
771        Antichain<(T, Subtime)>,
772    ) {
773        let mut next_frontier = frontier.clone();
774        time.1 = Subtime(*part_number);
775        frontier.insert(time.clone());
776        *part_number += 1;
777        let mut next_time = time.clone();
778        next_time.1 = Subtime(*part_number);
779        next_frontier.insert(next_time);
780        (time, frontier, next_frontier)
781    }
782
783    // _Refine_ the data stream by amending the second input with the part number. This also
784    // ensures that we order the parts by time.
785    let data_input = async_stream::stream!({
786        let mut part_number = 0;
787        let mut parts: Vec<((T, Subtime), O)> = Vec::new();
788        loop {
789            match data_input.next().await {
790                None => {
791                    let empty = Antichain::new();
792                    parts.sort_by_key(|val| val.0.clone());
793                    for (part_time, d) in parts.drain(..) {
794                        let (part_time, frontier, next_frontier) = synthesize_frontiers(
795                            empty.clone(),
796                            part_time.clone(),
797                            &mut part_number,
798                        );
799                        yield Either::Right((part_time, d, frontier, next_frontier))
800                    }
801                    break;
802                }
803                Some(Event::Data(time, data)) => {
804                    for d in data {
805                        parts.push((time.clone(), d));
806                    }
807                }
808                Some(Event::Progress(prog)) => {
809                    parts.sort_by_key(|val| val.0.clone());
810                    for (part_time, d) in parts.extract_if(.., |p| !prog.less_equal(&p.0)) {
811                        let (part_time, frontier, next_frontier) =
812                            synthesize_frontiers(prog.clone(), part_time.clone(), &mut part_number);
813                        yield Either::Right((part_time, d, frontier, next_frontier))
814                    }
815                    yield Either::Left(prog)
816                }
817            }
818        }
819    });
820    let shutdown_button = builder.build(move |caps| async move {
821        // The output capability.
822        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
823
824        // The frontier of our output. This matches the `CapabilitySet` above.
825        let mut output_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
826        // The frontier of the `flow_control` input.
827        let mut flow_control_frontier = Antichain::from_elem(TimelyTimestamp::minimum());
828
829        // Parts we have emitted, but have not yet retired (based on the `flow_control` edge).
830        let mut inflight_parts = Vec::new();
831        // Parts we have not yet emitted, but do participate in the `input_frontier`.
832        let mut pending_parts = std::collections::VecDeque::new();
833
834        // Only one worker is responsible for distributing parts
835        if worker_index != chosen_worker {
836            trace!(
837                "We are not the chosen worker ({}), exiting...",
838                chosen_worker
839            );
840            return;
841        }
842        tokio::pin!(data_input);
843        'emitting_parts: loop {
844            // At the beginning of our main loop, we determine the total size of
845            // inflight parts.
846            let inflight_bytes: usize = inflight_parts.iter().map(|(_, size)| size).sum();
847
848            // There are 2 main cases where we can continue to emit parts:
849            // - The total emitted bytes is less than `flow_control_max_bytes`.
850            // - The output frontier is not beyond the `flow_control_frontier`
851            //
852            // SUBTLE: in the latter case, we may arbitrarily go into the backpressure `else`
853            // block, as we wait for progress tracking to keep the `flow_control` frontier
854            // up-to-date. This is tested in unit-tests.
855            if inflight_bytes < flow_control_max_bytes
856                || !PartialOrder::less_equal(&flow_control_frontier, &output_frontier)
857            {
858                let (time, part, next_frontier) =
859                    if let Some((time, part, next_frontier)) = pending_parts.pop_front() {
860                        (time, part, next_frontier)
861                    } else {
862                        match data_input.next().await {
863                            Some(Either::Right((time, part, frontier, next_frontier))) => {
864                                // Downgrade the output frontier to this part's time. This is useful
865                                // "close" timestamp's from previous parts, even if we don't yet
866                                // emit this part. Note that this is safe because `data_input` ensures
867                                // time-ordering.
868                                output_frontier = frontier;
869                                cap_set.downgrade(output_frontier.iter());
870
871                                // If the most recent value's time is _beyond_ the
872                                // `flow_control` frontier (which takes into account the `summary`), we
873                                // have emitted an entire `summary` worth of data, and can store this
874                                // value for later.
875                                if inflight_bytes >= flow_control_max_bytes
876                                    && !PartialOrder::less_than(
877                                        &output_frontier,
878                                        &flow_control_frontier,
879                                    )
880                                {
881                                    pending_parts.push_back((time, part, next_frontier));
882                                    continue 'emitting_parts;
883                                }
884                                (time, part, next_frontier)
885                            }
886                            Some(Either::Left(prog)) => {
887                                output_frontier = prog;
888                                cap_set.downgrade(output_frontier.iter());
889                                continue 'emitting_parts;
890                            }
891                            None => {
892                                if pending_parts.is_empty() {
893                                    break 'emitting_parts;
894                                } else {
895                                    continue 'emitting_parts;
896                                }
897                            }
898                        }
899                    };
900
901                let byte_size = part.byte_size();
902                // Store the value with the _frontier_ the `flow_control_input` must reach
903                // to retire it. Note that if this `results_in` is `None`, then we
904                // are at `T::MAX`, and give up on flow_control entirely.
905                //
906                // SUBTLE: If we stop storing these parts, we will likely never check the
907                // `flow_control_input` ever again. This won't pile up data as that input
908                // only has frontier updates. There may be spurious activations from it though.
909                //
910                // Also note that we don't attempt to handle overflowing the `u64` part counter.
911                if let Some(emission_ts) = flow_control.summary.results_in(&time) {
912                    inflight_parts.push((emission_ts, byte_size));
913                }
914
915                // Emit the data at the given time, and update the frontier and capabilities
916                // to just beyond the part.
917                data_output.give(&cap_set.delayed(&time), part);
918
919                if let Some(metrics) = &metrics {
920                    metrics.emitted_bytes.inc_by(u64::cast_from(byte_size))
921                }
922
923                output_frontier = next_frontier;
924                cap_set.downgrade(output_frontier.iter())
925            } else {
926                if let Some(metrics) = &metrics {
927                    metrics
928                        .last_backpressured_bytes
929                        .set(u64::cast_from(inflight_bytes))
930                }
931                let parts_count = inflight_parts.len();
932                // We've exhausted our budget, listen for updates to the flow_control
933                // input's frontier until we free up new budget. If we don't interact with
934                // with this side of the if statement, because the stream has no data, we
935                // don't cause unbounded buffering in timely.
936                let new_flow_control_frontier = match flow_control_input.next().await {
937                    Some(Event::Progress(frontier)) => frontier,
938                    Some(Event::Data(_, _)) => {
939                        unreachable!("flow_control_input should not contain data")
940                    }
941                    None => Antichain::new(),
942                };
943
944                // Update the `flow_control_frontier` if its advanced.
945                flow_control_frontier.clone_from(&new_flow_control_frontier);
946
947                // Retire parts that are processed downstream.
948                let retired_parts = inflight_parts
949                    .extract_if(.., |(ts, _size)| !flow_control_frontier.less_equal(ts));
950                let (retired_size, retired_count): (usize, usize) = retired_parts
951                    .fold((0, 0), |(accum_size, accum_count), (_ts, size)| {
952                        (accum_size + size, accum_count + 1)
953                    });
954                trace!(
955                    "returning {} parts with {} bytes, frontier: {:?}",
956                    retired_count, retired_size, flow_control_frontier,
957                );
958
959                if let Some(metrics) = &metrics {
960                    metrics.retired_bytes.inc_by(u64::cast_from(retired_size))
961                }
962
963                // Optionally emit some information for tests to examine.
964                if let Some(probe) = probe.as_ref() {
965                    let _ = probe.send((new_flow_control_frontier, parts_count, retired_count));
966                }
967            }
968        }
969    });
970    (data_stream, shutdown_button.press_on_drop())
971}
972
973#[cfg(test)]
974mod tests {
975    use timely::container::CapacityContainerBuilder;
976    use timely::dataflow::operators::{Enter, Probe};
977    use tokio::sync::mpsc::unbounded_channel;
978    use tokio::sync::oneshot;
979
980    use super::*;
981
982    #[mz_ore::test]
983    fn test_backpressure_non_granular() {
984        use Step::*;
985        backpressure_runner(
986            vec![(50, Part(101)), (50, Part(102)), (100, Part(1))],
987            100,
988            (1, Subtime(0)),
989            vec![
990                // Assert we backpressure only after we have emitted
991                // the entire timestamp.
992                AssertOutputFrontier((50, Subtime(2))),
993                AssertBackpressured {
994                    frontier: (1, Subtime(0)),
995                    inflight_parts: 1,
996                    retired_parts: 0,
997                },
998                AssertBackpressured {
999                    frontier: (51, Subtime(0)),
1000                    inflight_parts: 1,
1001                    retired_parts: 0,
1002                },
1003                ProcessXParts(2),
1004                AssertBackpressured {
1005                    frontier: (101, Subtime(0)),
1006                    inflight_parts: 2,
1007                    retired_parts: 2,
1008                },
1009                // Assert we make later progress once processing
1010                // the parts.
1011                AssertOutputFrontier((100, Subtime(3))),
1012            ],
1013            true,
1014        );
1015
1016        backpressure_runner(
1017            vec![
1018                (50, Part(10)),
1019                (50, Part(10)),
1020                (51, Part(100)),
1021                (52, Part(1000)),
1022            ],
1023            50,
1024            (1, Subtime(0)),
1025            vec![
1026                // Assert we backpressure only after we emitted enough bytes
1027                AssertOutputFrontier((51, Subtime(3))),
1028                AssertBackpressured {
1029                    frontier: (1, Subtime(0)),
1030                    inflight_parts: 3,
1031                    retired_parts: 0,
1032                },
1033                ProcessXParts(3),
1034                AssertBackpressured {
1035                    frontier: (52, Subtime(0)),
1036                    inflight_parts: 3,
1037                    retired_parts: 2,
1038                },
1039                AssertBackpressured {
1040                    frontier: (53, Subtime(0)),
1041                    inflight_parts: 1,
1042                    retired_parts: 1,
1043                },
1044                // Assert we make later progress once processing
1045                // the parts.
1046                AssertOutputFrontier((52, Subtime(4))),
1047            ],
1048            true,
1049        );
1050
1051        backpressure_runner(
1052            vec![
1053                (50, Part(98)),
1054                (50, Part(1)),
1055                (51, Part(10)),
1056                (52, Part(100)),
1057                // Additional parts at the same timestamp
1058                (52, Part(10)),
1059                (52, Part(10)),
1060                (52, Part(10)),
1061                (52, Part(100)),
1062                // A later part with a later ts.
1063                (100, Part(100)),
1064            ],
1065            100,
1066            (1, Subtime(0)),
1067            vec![
1068                AssertOutputFrontier((51, Subtime(3))),
1069                // Assert we backpressure after we have emitted enough bytes.
1070                // We assert twice here because we get updates as
1071                // `flow_control` progresses from `(0, 0)`->`(0, 1)`-> a real frontier.
1072                AssertBackpressured {
1073                    frontier: (1, Subtime(0)),
1074                    inflight_parts: 3,
1075                    retired_parts: 0,
1076                },
1077                AssertBackpressured {
1078                    frontier: (51, Subtime(0)),
1079                    inflight_parts: 3,
1080                    retired_parts: 0,
1081                },
1082                ProcessXParts(1),
1083                // Our output frontier doesn't move, as the downstream frontier hasn't moved past
1084                // 50.
1085                AssertOutputFrontier((51, Subtime(3))),
1086                // After we process all of `50`, we can start emitting data at `52`, but only until
1087                // we exhaust out budget. We don't need to emit all of `52` because we have emitted
1088                // all of `51`.
1089                ProcessXParts(1),
1090                AssertOutputFrontier((52, Subtime(4))),
1091                AssertBackpressured {
1092                    frontier: (52, Subtime(0)),
1093                    inflight_parts: 3,
1094                    retired_parts: 2,
1095                },
1096                // After processing `50` and `51`, the minimum time is `52`, so we ensure that,
1097                // regardless of byte count, we emit the entire time (but do NOT emit the part at
1098                // time `100`.
1099                ProcessXParts(1),
1100                // Clear the previous `51` part, and start filling up `inflight_parts` with other
1101                // parts at `52`
1102                // This is an intermediate state.
1103                AssertBackpressured {
1104                    frontier: (53, Subtime(0)),
1105                    inflight_parts: 2,
1106                    retired_parts: 1,
1107                },
1108                // After we process all of `52`, we can continue to the next time.
1109                ProcessXParts(5),
1110                AssertBackpressured {
1111                    frontier: (101, Subtime(0)),
1112                    inflight_parts: 5,
1113                    retired_parts: 5,
1114                },
1115                AssertOutputFrontier((100, Subtime(9))),
1116            ],
1117            true,
1118        );
1119    }
1120
1121    #[mz_ore::test]
1122    fn test_backpressure_granular() {
1123        use Step::*;
1124        backpressure_runner(
1125            vec![(50, Part(101)), (50, Part(101))],
1126            100,
1127            (0, Subtime(1)),
1128            vec![
1129                // Advance our frontier to outputting a single part.
1130                AssertOutputFrontier((50, Subtime(1))),
1131                // Receive backpressure updates until our frontier is up-to-date but
1132                // not beyond the parts (while considering the summary).
1133                AssertBackpressured {
1134                    frontier: (0, Subtime(1)),
1135                    inflight_parts: 1,
1136                    retired_parts: 0,
1137                },
1138                AssertBackpressured {
1139                    frontier: (50, Subtime(1)),
1140                    inflight_parts: 1,
1141                    retired_parts: 0,
1142                },
1143                // Process that part.
1144                ProcessXParts(1),
1145                // Assert that we clear the backpressure status
1146                AssertBackpressured {
1147                    frontier: (50, Subtime(2)),
1148                    inflight_parts: 1,
1149                    retired_parts: 1,
1150                },
1151                // Ensure we make progress to the next part.
1152                AssertOutputFrontier((50, Subtime(2))),
1153            ],
1154            false,
1155        );
1156
1157        backpressure_runner(
1158            vec![
1159                (50, Part(10)),
1160                (50, Part(10)),
1161                (51, Part(35)),
1162                (52, Part(100)),
1163            ],
1164            50,
1165            (0, Subtime(1)),
1166            vec![
1167                // we can emit 3 parts before we hit the backpressure limit
1168                AssertOutputFrontier((51, Subtime(3))),
1169                AssertBackpressured {
1170                    frontier: (0, Subtime(1)),
1171                    inflight_parts: 3,
1172                    retired_parts: 0,
1173                },
1174                AssertBackpressured {
1175                    frontier: (50, Subtime(1)),
1176                    inflight_parts: 3,
1177                    retired_parts: 0,
1178                },
1179                // Retire the single part.
1180                ProcessXParts(1),
1181                AssertBackpressured {
1182                    frontier: (50, Subtime(2)),
1183                    inflight_parts: 3,
1184                    retired_parts: 1,
1185                },
1186                // Ensure we make progress, and then
1187                // can retire the next 2 parts.
1188                AssertOutputFrontier((52, Subtime(4))),
1189                ProcessXParts(2),
1190                AssertBackpressured {
1191                    frontier: (52, Subtime(4)),
1192                    inflight_parts: 3,
1193                    retired_parts: 2,
1194                },
1195            ],
1196            false,
1197        );
1198    }
1199
1200    type Time = (u64, Subtime);
1201    #[derive(Clone, Debug)]
1202    struct Part(usize);
1203    impl Backpressureable for Part {
1204        fn byte_size(&self) -> usize {
1205            self.0
1206        }
1207    }
1208
1209    /// Actions taken by `backpressure_runner`.
1210    enum Step {
1211        /// Assert that the output frontier of the `backpressure` operator has AT LEAST made it
1212        /// this far. This is a single time because we assume
1213        AssertOutputFrontier(Time),
1214        /// Assert that we have entered the backpressure flow in the `backpressure` operator. This
1215        /// allows us to assert what feedback frontier we got to, and how many inflight parts we
1216        /// retired.
1217        AssertBackpressured {
1218            frontier: Time,
1219            inflight_parts: usize,
1220            retired_parts: usize,
1221        },
1222        /// Process X parts in the downstream operator. This affects the feedback frontier.
1223        ProcessXParts(usize),
1224    }
1225
1226    /// A function that runs the `steps` to ensure that `backpressure` works as expected.
1227    fn backpressure_runner(
1228        // The input data to the `backpressure` operator
1229        input: Vec<(u64, Part)>,
1230        // The maximum inflight bytes the `backpressure` operator allows through.
1231        max_inflight_bytes: usize,
1232        // The feedback summary used by the `backpressure` operator.
1233        summary: Time,
1234        // List of steps to run through.
1235        steps: Vec<Step>,
1236        // Whether or not to consume records in the non-granular scope. This is useful when the
1237        // `summary` is something like `(1, 0)`.
1238        non_granular_consumer: bool,
1239    ) {
1240        timely::execute::execute_directly(move |worker| {
1241            let (backpressure_probe, consumer_tx, mut backpressure_status_rx, finalizer_tx, _token) =
1242                // Set up the top-level non-granular scope.
1243                worker.dataflow::<u64, _, _>(|scope| {
1244                    let (non_granular_feedback_handle, non_granular_feedback) =
1245                        if non_granular_consumer {
1246                            let (h, f) = scope.feedback(Default::default());
1247                            (Some(h), Some(f))
1248                        } else {
1249                            (None, None)
1250                        };
1251                    let (
1252                        backpressure_probe,
1253                        consumer_tx,
1254                        backpressure_status_rx,
1255                        token,
1256                        backpressured,
1257                        finalizer_tx,
1258                    ) = scope.scoped::<(u64, Subtime), _, _>("hybrid", |scope| {
1259                        let (input, finalizer_tx) =
1260                            iterator_operator(scope.clone(), input.into_iter());
1261
1262                        let (flow_control, granular_feedback_handle) = if non_granular_consumer {
1263                            (
1264                                FlowControl {
1265                                    progress_stream: non_granular_feedback.unwrap().enter(scope),
1266                                    max_inflight_bytes,
1267                                    summary,
1268                                    metrics: None
1269                                },
1270                                None,
1271                            )
1272                        } else {
1273                            let (granular_feedback_handle, granular_feedback) =
1274                                scope.feedback(Default::default());
1275                            (
1276                                FlowControl {
1277                                    progress_stream: granular_feedback,
1278                                    max_inflight_bytes,
1279                                    summary,
1280                                    metrics: None,
1281                                },
1282                                Some(granular_feedback_handle),
1283                            )
1284                        };
1285
1286                        let (backpressure_status_tx, backpressure_status_rx) = unbounded_channel();
1287
1288                        let (backpressured, token) = backpressure(
1289                            scope,
1290                            "test",
1291                            &input,
1292                            flow_control,
1293                            0,
1294                            Some(backpressure_status_tx),
1295                        );
1296
1297                        // If we want to granularly consume the output, we setup the consumer here.
1298                        let tx = if !non_granular_consumer {
1299                            Some(consumer_operator(
1300                                scope.clone(),
1301                                &backpressured,
1302                                granular_feedback_handle.unwrap(),
1303                            ))
1304                        } else {
1305                            None
1306                        };
1307
1308                        (
1309                            backpressured.probe(),
1310                            tx,
1311                            backpressure_status_rx,
1312                            token,
1313                            backpressured.leave(),
1314                            finalizer_tx,
1315                        )
1316                    });
1317
1318                    // If we want to non-granularly consume the output, we setup the consumer here.
1319                    let consumer_tx = if non_granular_consumer {
1320                        consumer_operator(
1321                            scope.clone(),
1322                            &backpressured,
1323                            non_granular_feedback_handle.unwrap(),
1324                        )
1325                    } else {
1326                        consumer_tx.unwrap()
1327                    };
1328
1329                    (
1330                        backpressure_probe,
1331                        consumer_tx,
1332                        backpressure_status_rx,
1333                        finalizer_tx,
1334                        token,
1335                    )
1336                });
1337
1338            use Step::*;
1339            for step in steps {
1340                match step {
1341                    AssertOutputFrontier(time) => {
1342                        eprintln!("checking advance to {time:?}");
1343                        backpressure_probe.with_frontier(|front| {
1344                            eprintln!("current backpressure output frontier: {front:?}");
1345                        });
1346                        while backpressure_probe.less_than(&time) {
1347                            worker.step();
1348                            backpressure_probe.with_frontier(|front| {
1349                                eprintln!("current backpressure output frontier: {front:?}");
1350                            });
1351                            std::thread::sleep(std::time::Duration::from_millis(25));
1352                        }
1353                    }
1354                    ProcessXParts(parts) => {
1355                        eprintln!("processing {parts:?} parts");
1356                        for _ in 0..parts {
1357                            consumer_tx.send(()).unwrap();
1358                        }
1359                    }
1360                    AssertBackpressured {
1361                        frontier,
1362                        inflight_parts,
1363                        retired_parts,
1364                    } => {
1365                        let frontier = Antichain::from_elem(frontier);
1366                        eprintln!(
1367                            "asserting backpressured at {frontier:?}, with {inflight_parts:?} inflight parts \
1368                            and {retired_parts:?} retired"
1369                        );
1370                        let (new_frontier, new_count, new_retired_count) = loop {
1371                            if let Ok(val) = backpressure_status_rx.try_recv() {
1372                                break val;
1373                            }
1374                            worker.step();
1375                            std::thread::sleep(std::time::Duration::from_millis(25));
1376                        };
1377                        assert_eq!(
1378                            (frontier, inflight_parts, retired_parts),
1379                            (new_frontier, new_count, new_retired_count)
1380                        );
1381                    }
1382                }
1383            }
1384            // Send the input to the empty frontier.
1385            let _ = finalizer_tx.send(());
1386        });
1387    }
1388
1389    /// An operator that emits `Part`'s at the specified timestamps. Does not
1390    /// drop its capability until it gets a signal from the `Sender` it returns.
1391    fn iterator_operator<
1392        G: Scope<Timestamp = (u64, Subtime)>,
1393        I: Iterator<Item = (u64, Part)> + 'static,
1394    >(
1395        scope: G,
1396        mut input: I,
1397    ) -> (Stream<G, Part>, oneshot::Sender<()>) {
1398        let (finalizer_tx, finalizer_rx) = oneshot::channel();
1399        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope);
1400        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<Part>>>();
1401
1402        iterator.build(|mut caps| async move {
1403            let mut capability = Some(caps.pop().unwrap());
1404            let mut last = None;
1405            while let Some(element) = input.next() {
1406                let time = element.0.clone();
1407                let part = element.1;
1408                last = Some((time, Subtime(0)));
1409                output_handle.give(&capability.as_ref().unwrap().delayed(&last.unwrap()), part);
1410            }
1411            if let Some(last) = last {
1412                capability
1413                    .as_mut()
1414                    .unwrap()
1415                    .downgrade(&(last.0 + 1, last.1));
1416            }
1417
1418            let _ = finalizer_rx.await;
1419            capability.take();
1420        });
1421
1422        (output, finalizer_tx)
1423    }
1424
1425    /// An operator that consumes its input ONLY when given a signal to do from
1426    /// the `UnboundedSender` it returns. Each `send` corresponds with 1 `Data` event
1427    /// being processed. Also connects the `feedback` handle to its output.
1428    fn consumer_operator<G: Scope, O: Backpressureable + std::fmt::Debug>(
1429        scope: G,
1430        input: &Stream<G, O>,
1431        feedback: timely::dataflow::operators::feedback::Handle<G, Vec<std::convert::Infallible>>,
1432    ) -> UnboundedSender<()> {
1433        let (tx, mut rx) = unbounded_channel::<()>();
1434        let mut consumer = AsyncOperatorBuilder::new("consumer".to_string(), scope);
1435        let (output_handle, output) =
1436            consumer.new_output::<CapacityContainerBuilder<Vec<std::convert::Infallible>>>();
1437        let mut input = consumer.new_input_for(input, Pipeline, &output_handle);
1438
1439        consumer.build(|_caps| async move {
1440            while let Some(()) = rx.recv().await {
1441                // Consume exactly one messages (unless the input is exhausted).
1442                while let Some(Event::Progress(_)) = input.next().await {}
1443            }
1444        });
1445        output.connect_loop(feedback);
1446
1447        tx
1448    }
1449}