Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::arrange::ShutdownButton;
117use differential_dataflow::operators::iterate::Variable;
118use differential_dataflow::trace::{BatchReader, TraceReader};
119use differential_dataflow::{AsCollection, Data, VecCollection};
120use futures::FutureExt;
121use futures::channel::oneshot;
122use itertools::Itertools;
123use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
124use mz_compute_types::dyncfgs::{
125    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
126    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
127    ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
128};
129use mz_compute_types::plan::render_plan::{
130    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
131};
132use mz_compute_types::plan::{ArrangementStrategy, LirId};
133use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
134use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
135use mz_repr::explain::DummyHumanizer;
136use mz_repr::fixed_length::ExtendDatums;
137use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, RowArena, SharedRow};
138use mz_storage_operators::persist_source;
139use mz_storage_types::controller::CollectionMetadata;
140use mz_timely_util::columnation::ColumnationChunker;
141use mz_timely_util::operator::{CollectionExt, StreamExt};
142use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
143use mz_timely_util::scope_label::ScopeExt;
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::Pipeline;
147use timely::dataflow::operators::vec::ToStream;
148use timely::dataflow::operators::vec::{BranchWhen, Filter};
149use timely::dataflow::operators::{Capability, Operator, Probe, probe};
150use timely::dataflow::{Scope, Stream, StreamVec};
151use timely::order::{Product, TotalOrder};
152use timely::progress::timestamp::Refines;
153use timely::progress::{Antichain, Timestamp};
154use timely::scheduling::ActivateOnDrop;
155use timely::worker::Worker as TimelyWorker;
156
157use crate::arrangement::manager::TraceBundle;
158use crate::compute_state::ComputeState;
159use crate::extensions::arrange::{KeyCollection, MzArrange};
160use crate::extensions::reduce::MzReduce;
161use crate::extensions::temporal_bucket::TemporalBucketing;
162use crate::logging::compute::{
163    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
164};
165use crate::render::context::{ArrangementFlavor, Context};
166use crate::render::errors::DataflowErrorSer;
167use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
168use mz_row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
169
170pub mod context;
171pub(crate) mod errors;
172mod flat_map;
173mod join;
174mod reduce;
175pub mod sinks;
176mod threshold;
177mod top_k;
178
179pub use context::CollectionBundle;
180pub use join::LinearJoinSpec;
181
182/// Guard that presses a differential [`ShutdownButton`] when dropped.
183///
184/// Dropping this guard releases the imported trace's capabilities.
185struct PressOnDrop<T>(ShutdownButton<T>);
186
187impl<T> Drop for PressOnDrop<T> {
188    fn drop(&mut self) {
189        self.0.press();
190    }
191}
192
193/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
194///
195/// This method imports sources from provided assets, and then builds the remaining
196/// dataflow using "compute-local" assets like shared arrangements, and producing
197/// both arrangements and sinks.
198pub fn build_compute_dataflow(
199    timely_worker: &mut TimelyWorker,
200    compute_state: &mut ComputeState,
201    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
202    start_signal: StartSignal,
203    until: Antichain<mz_repr::Timestamp>,
204    dataflow_expiration: Antichain<mz_repr::Timestamp>,
205) {
206    // Mutually recursive view definitions require special handling.
207    let recursive = dataflow
208        .objects_to_build
209        .iter()
210        .any(|object| object.plan.is_recursive());
211
212    // Determine indexes to export, and their dependencies.
213    let indexes = dataflow
214        .index_exports
215        .iter()
216        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
217        .collect::<Vec<_>>();
218
219    // Determine sinks to export, and their dependencies.
220    let sinks = dataflow
221        .sink_exports
222        .iter()
223        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
224        .collect::<Vec<_>>();
225
226    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
227    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
228    let subscribe_snapshot_optimization =
229        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
230
231    let name = format!("Dataflow: {}", &dataflow.debug_name);
232    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
233    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
234
235    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
236        let scope = scope.with_label();
237
238        // The scope.clone() occurs to allow import in the region.
239        // We build a region here to establish a pattern of a scope inside the dataflow,
240        // so that other similar uses (e.g. with iterative scopes) do not require weird
241        // alternate type signatures.
242        let mut imported_sources = Vec::new();
243        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
244        let output_probe = MzProbeHandle::default();
245
246        scope.clone().region_named(&input_name, |region| {
247            // Import declared sources into the rendering context.
248            for (source_id, import) in dataflow.source_imports.iter() {
249                region.region_named(&format!("Source({:?})", source_id), |inner| {
250                    let mut read_schema = None;
251                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
252                        // If enabled, we read from Persist with a `RelationDesc` that
253                        // omits uneeded columns.
254                        if apply_demands {
255                            let demands = ops.demand();
256                            let new_desc = import
257                                .desc
258                                .storage_metadata
259                                .relation_desc
260                                .apply_demand(&demands);
261                            let new_arity = demands.len();
262                            let remap: BTreeMap<_, _> = demands
263                                .into_iter()
264                                .enumerate()
265                                .map(|(new, old)| (old, new))
266                                .collect();
267                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
268                            read_schema = Some(new_desc);
269                        }
270
271                        mz_expr::MfpPlan::create_from(ops)
272                            .expect("Linear operators should always be valid")
273                    });
274
275                    let snapshot_mode = if import.with_snapshot || !subscribe_snapshot_optimization
276                    {
277                        SnapshotMode::Include
278                    } else {
279                        compute_state.metrics.inc_subscribe_snapshot_optimization();
280                        SnapshotMode::Exclude
281                    };
282                    let suppress_early_progress_as_of = dataflow.as_of.clone();
283
284                    // Note: For correctness, we require that sources only emit times advanced by
285                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
286                    let (mut ok_stream, err_stream, token) =
287                        persist_source::persist_source::<DataflowErrorSer>(
288                            inner,
289                            *source_id,
290                            Arc::clone(&compute_state.persist_clients),
291                            &compute_state.txns_ctx,
292                            import.desc.storage_metadata.clone(),
293                            read_schema,
294                            dataflow.as_of.clone(),
295                            snapshot_mode,
296                            until.clone(),
297                            mfp.as_mut(),
298                            compute_state.dataflow_max_inflight_bytes(),
299                            start_signal.clone().into_send_future(),
300                            ErrorHandler::Halt("compute_import"),
301                        );
302
303                    // If `mfp` is non-identity, we need to apply what remains.
304                    // For the moment, assert that it is either trivial or `None`.
305                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
306
307                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
308                    // ensure that the first frontier we report into the dataflow is beyond the
309                    // `as_of`.
310                    if let Some(as_of) = suppress_early_progress_as_of {
311                        ok_stream = suppress_early_progress(ok_stream, as_of);
312                    }
313
314                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
315                        // Apply logical backpressure to the source.
316                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
317                            .get(&compute_state.worker_config);
318                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
319                            .get(&compute_state.worker_config)
320                            .as_millis()
321                            .try_into()
322                            .expect("must fit");
323
324                        let stream = ok_stream.limit_progress(
325                            output_probe.clone(),
326                            slack,
327                            limit,
328                            import.upper.clone(),
329                            name.clone(),
330                        );
331                        ok_stream = stream;
332                    }
333
334                    // Attach a probe reporting the input frontier.
335                    let input_probe =
336                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
337                    ok_stream = ok_stream.probe_with(&input_probe);
338
339                    let (oks, errs) = (
340                        ok_stream
341                            .as_collection()
342                            .leave_region(region)
343                            .leave_region(scope),
344                        err_stream
345                            .as_collection()
346                            .leave_region(region)
347                            .leave_region(scope),
348                    );
349
350                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
351
352                    // Associate returned tokens with the source identifier.
353                    tokens.insert(*source_id, Rc::new(token));
354                });
355            }
356        });
357
358        // If there exists a recursive expression, we'll need to use a non-region scope,
359        // in order to support additional timestamp coordinates for iteration.
360        if recursive {
361            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
362                let mut context = Context::for_dataflow_in(
363                    &dataflow,
364                    region.clone(),
365                    compute_state,
366                    until,
367                    dataflow_expiration,
368                );
369
370                for (id, (oks, errs)) in imported_sources.into_iter() {
371                    let bundle = crate::render::CollectionBundle::from_collections(
372                        oks.enter(region),
373                        errs.enter(region),
374                    );
375                    // Associate collection bundle with the source identifier.
376                    context.insert_id(id, bundle);
377                }
378
379                // Import declared indexes into the rendering context.
380                for (idx_id, idx) in &dataflow.index_imports {
381                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
382                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
383                        SnapshotMode::Include
384                    } else {
385                        compute_state.metrics.inc_subscribe_snapshot_optimization();
386                        SnapshotMode::Exclude
387                    };
388                    context.import_index(
389                        scope,
390                        compute_state,
391                        &mut tokens,
392                        input_probe,
393                        *idx_id,
394                        &idx.desc,
395                        &idx.typ,
396                        snapshot_mode,
397                        start_signal.clone(),
398                    );
399                }
400
401                // Build declared objects.
402                for object in dataflow.objects_to_build {
403                    let bundle = context.scope.clone().region_named(
404                        &format!("BuildingObject({:?})", object.id),
405                        |region| {
406                            let depends = object.plan.depends();
407                            let in_let = object.plan.is_recursive();
408                            context
409                                .enter_region(region, Some(&depends))
410                                .render_recursive_plan(
411                                    object.id,
412                                    0,
413                                    object.plan,
414                                    // recursive plans _must_ have bodies in a let
415                                    BindingInfo::Body { in_let },
416                                )
417                                .leave_region(context.scope)
418                        },
419                    );
420                    let global_id = object.id;
421
422                    context.log_dataflow_global_id(
423                        *bundle
424                            .scope()
425                            .addr()
426                            .first()
427                            .expect("Dataflow root id must exist"),
428                        global_id,
429                    );
430                    context.insert_id(Id::Global(object.id), bundle);
431                }
432
433                // Export declared indexes.
434                for (idx_id, dependencies, idx) in indexes {
435                    context.export_index_iterative(
436                        scope,
437                        compute_state,
438                        &tokens,
439                        dependencies,
440                        idx_id,
441                        &idx,
442                        &output_probe,
443                    );
444                }
445
446                // Export declared sinks.
447                for (sink_id, dependencies, sink) in sinks {
448                    context.export_sink(
449                        compute_state,
450                        &tokens,
451                        dependencies,
452                        sink_id,
453                        &sink,
454                        start_signal.clone(),
455                        &output_probe,
456                        scope,
457                    );
458                }
459            });
460        } else {
461            scope.clone().region_named(&build_name, |region| {
462                let mut context = Context::for_dataflow_in(
463                    &dataflow,
464                    region.clone(),
465                    compute_state,
466                    until,
467                    dataflow_expiration,
468                );
469
470                for (id, (oks, errs)) in imported_sources.into_iter() {
471                    let bundle = crate::render::CollectionBundle::from_collections(
472                        oks.enter_region(region),
473                        errs.enter_region(region),
474                    );
475                    // Associate collection bundle with the source identifier.
476                    context.insert_id(id, bundle);
477                }
478
479                // Import declared indexes into the rendering context.
480                for (idx_id, idx) in &dataflow.index_imports {
481                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
482                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
483                        SnapshotMode::Include
484                    } else {
485                        compute_state.metrics.inc_subscribe_snapshot_optimization();
486                        SnapshotMode::Exclude
487                    };
488                    context.import_index(
489                        scope,
490                        compute_state,
491                        &mut tokens,
492                        input_probe,
493                        *idx_id,
494                        &idx.desc,
495                        &idx.typ,
496                        snapshot_mode,
497                        start_signal.clone(),
498                    );
499                }
500
501                // Build declared objects.
502                for object in dataflow.objects_to_build {
503                    let bundle = context.scope.clone().region_named(
504                        &format!("BuildingObject({:?})", object.id),
505                        |region| {
506                            let depends = object.plan.depends();
507                            context
508                                .enter_region(region, Some(&depends))
509                                .render_plan(object.id, object.plan)
510                                .leave_region(context.scope)
511                        },
512                    );
513                    let global_id = object.id;
514                    context.log_dataflow_global_id(
515                        *bundle
516                            .scope()
517                            .addr()
518                            .first()
519                            .expect("Dataflow root id must exist"),
520                        global_id,
521                    );
522                    context.insert_id(Id::Global(object.id), bundle);
523                }
524
525                // Export declared indexes.
526                for (idx_id, dependencies, idx) in indexes {
527                    context.export_index(
528                        compute_state,
529                        &tokens,
530                        dependencies,
531                        idx_id,
532                        &idx,
533                        &output_probe,
534                    );
535                }
536
537                // Export declared sinks.
538                for (sink_id, dependencies, sink) in sinks {
539                    context.export_sink(
540                        compute_state,
541                        &tokens,
542                        dependencies,
543                        sink_id,
544                        &sink,
545                        start_signal.clone(),
546                        &output_probe,
547                        scope,
548                    );
549                }
550            });
551        }
552    });
553}
554
555// This implementation block allows child timestamps to vary from parent timestamps,
556// but requires the parent timestamp to be `repr::Timestamp`.
557impl<'g, T> Context<'g, T>
558where
559    T: Refines<mz_repr::Timestamp> + RenderTimestamp,
560{
561    /// Import the collection from the arrangement, discarding batches from the snapshot.
562    /// (This does not guarantee that no records from the snapshot are included; the assumption is
563    /// that we'll filter those out later if necessary.)
564    fn import_filtered_index_collection<
565        'outer,
566        Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
567        V: Data,
568    >(
569        &self,
570        arranged: Arranged<'outer, Tr>,
571        start_signal: StartSignal,
572        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
573    ) -> VecCollection<'g, T, V, Tr::Diff>
574    where
575        // This is implied by the fact that the outer timestamp = mz_repr::Timestamp, but it's essential
576        // for our batch-level filtering to be safe, so we document it here regardless.
577        mz_repr::Timestamp: TotalOrder,
578    {
579        let oks = arranged.stream.with_start_signal(start_signal).filter({
580            let as_of = self.as_of_frontier.clone();
581            move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
582        });
583        Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
584    }
585
586    pub(crate) fn import_index<'outer>(
587        &mut self,
588        outer: Scope<'outer, mz_repr::Timestamp>,
589        compute_state: &mut ComputeState,
590        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
591        input_probe: probe::Handle<mz_repr::Timestamp>,
592        idx_id: GlobalId,
593        idx: &IndexDesc,
594        typ: &ReprRelationType,
595        snapshot_mode: SnapshotMode,
596        start_signal: StartSignal,
597    ) {
598        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
599            assert!(
600                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
601                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
602            );
603
604            let token = traces.to_drop().clone();
605
606            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
607                outer,
608                &format!("Index({}, {:?})", idx.on_id, idx.key),
609                self.as_of_frontier.clone(),
610                self.until.clone(),
611            );
612
613            oks.stream = oks.stream.probe_with(&input_probe);
614
615            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
616                outer,
617                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
618                self.as_of_frontier.clone(),
619                self.until.clone(),
620            );
621
622            let bundle = match snapshot_mode {
623                SnapshotMode::Include => {
624                    let ok_arranged = oks
625                        .enter(self.scope)
626                        .with_start_signal(start_signal.clone());
627                    let err_arranged = err_arranged
628                        .enter(self.scope)
629                        .with_start_signal(start_signal);
630                    CollectionBundle::from_expressions(
631                        idx.key.clone(),
632                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
633                    )
634                }
635                SnapshotMode::Exclude => {
636                    // When we import an index without a snapshot, we have two balancing considerations:
637                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
638                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
639                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
640                    //   (joins, reduces, etc.) also require the snapshot data.
641                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
642                    let oks = {
643                        let mut datums = DatumVec::new();
644                        let (permutation, _thinning) =
645                            permutation_for_arrangement(&idx.key, typ.arity());
646                        self.import_filtered_index_collection(
647                            oks,
648                            start_signal.clone(),
649                            move |k: DatumSeq, v: DatumSeq| {
650                                let temp_storage = RowArena::new();
651                                let mut datums_borrow = datums.borrow();
652                                k.extend_datums(&temp_storage, &mut datums_borrow, None);
653                                v.extend_datums(&temp_storage, &mut datums_borrow, None);
654                                SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
655                            },
656                        )
657                    };
658                    let errs = self.import_filtered_index_collection(
659                        err_arranged,
660                        start_signal,
661                        |e, _| e.clone(),
662                    );
663                    CollectionBundle::from_collections(oks, errs)
664                }
665            };
666            self.update_id(Id::Global(idx.on_id), bundle);
667            tokens.insert(
668                idx_id,
669                Rc::new((PressOnDrop(ok_button), PressOnDrop(err_button), token)),
670            );
671        } else {
672            panic!(
673                "import of index {} failed while building dataflow {}",
674                idx_id, self.dataflow_id
675            );
676        }
677    }
678}
679
680// This implementation block requires the scopes have the same timestamp as the trace manager.
681// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
682impl<'g> Context<'g, mz_repr::Timestamp> {
683    pub(crate) fn export_index(
684        &self,
685        compute_state: &mut ComputeState,
686        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
687        dependency_ids: BTreeSet<GlobalId>,
688        idx_id: GlobalId,
689        idx: &IndexDesc,
690        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
691    ) {
692        // put together tokens that belong to the export
693        let mut needed_tokens = Vec::new();
694        for dep_id in dependency_ids {
695            if let Some(token) = tokens.get(&dep_id) {
696                needed_tokens.push(Rc::clone(token));
697            }
698        }
699        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
700            panic!(
701                "Arrangement alarmingly absent! id: {:?}",
702                Id::Global(idx_id)
703            )
704        });
705
706        match bundle.arrangement(&idx.key) {
707            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
708                // Ensure that the frontier does not advance past the expiration time, if set.
709                // Otherwise, we might write down incorrect data.
710                if let Some(&expiration) = self.dataflow_expiration.as_option() {
711                    oks.stream = oks.stream.expire_stream_at(
712                        &format!("{}_export_index_oks", self.debug_name),
713                        expiration,
714                    );
715                    errs.stream = errs.stream.expire_stream_at(
716                        &format!("{}_export_index_errs", self.debug_name),
717                        expiration,
718                    );
719                }
720
721                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
722
723                // Attach logging of dataflow errors.
724                if let Some(logger) = compute_state.compute_logger.clone() {
725                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
726                }
727
728                compute_state.traces.set(
729                    idx_id,
730                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
731                );
732            }
733            Some(ArrangementFlavor::Trace(gid, _, _)) => {
734                // Duplicate of existing arrangement with id `gid`, so
735                // just create another handle to that arrangement.
736                let trace = compute_state.traces.get(&gid).unwrap().clone();
737                compute_state.traces.set(idx_id, trace);
738            }
739            None => {
740                println!("collection available: {:?}", bundle.collection.is_none());
741                println!(
742                    "keys available: {:?}",
743                    bundle.arranged.keys().collect::<Vec<_>>()
744                );
745                panic!(
746                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
747                    Id::Global(idx_id),
748                    &idx.key
749                );
750            }
751        };
752    }
753}
754
755// This implementation block requires the scopes have the same timestamp as the trace manager.
756// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
757impl<'g, T> Context<'g, T>
758where
759    T: RenderTimestamp,
760{
761    pub(crate) fn export_index_iterative<'outer>(
762        &self,
763        outer: Scope<'outer, mz_repr::Timestamp>,
764        compute_state: &mut ComputeState,
765        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
766        dependency_ids: BTreeSet<GlobalId>,
767        idx_id: GlobalId,
768        idx: &IndexDesc,
769        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
770    ) {
771        // put together tokens that belong to the export
772        let mut needed_tokens = Vec::new();
773        for dep_id in dependency_ids {
774            if let Some(token) = tokens.get(&dep_id) {
775                needed_tokens.push(Rc::clone(token));
776            }
777        }
778        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
779            panic!(
780                "Arrangement alarmingly absent! id: {:?}",
781                Id::Global(idx_id)
782            )
783        });
784
785        match bundle.arrangement(&idx.key) {
786            Some(ArrangementFlavor::Local(oks, errs)) => {
787                // TODO: The following as_collection/leave/arrange sequence could be optimized.
788                //   * Combine as_collection and leave into a single function.
789                //   * Use columnar to extract columns from the batches to implement leave.
790                let mut oks = oks
791                    .as_collection(|k, v| (k.to_row(), v.to_row()))
792                    .leave(outer)
793                    .mz_arrange::<
794                        ColumnationChunker<_>,
795                        RowRowBatcher<_, _>,
796                        RowRowBuilder<_, _>,
797                        _,
798                    >(
799                        "Arrange export iterative",
800                    );
801
802                let mut errs = errs
803                    .as_collection(|k, v| (k.clone(), v.clone()))
804                    .leave(outer)
805                    .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
806                        "Arrange export iterative err",
807                    );
808
809                // Ensure that the frontier does not advance past the expiration time, if set.
810                // Otherwise, we might write down incorrect data.
811                if let Some(&expiration) = self.dataflow_expiration.as_option() {
812                    oks.stream = oks.stream.expire_stream_at(
813                        &format!("{}_export_index_iterative_oks", self.debug_name),
814                        expiration,
815                    );
816                    errs.stream = errs.stream.expire_stream_at(
817                        &format!("{}_export_index_iterative_err", self.debug_name),
818                        expiration,
819                    );
820                }
821
822                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
823
824                // Attach logging of dataflow errors.
825                if let Some(logger) = compute_state.compute_logger.clone() {
826                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
827                }
828
829                compute_state.traces.set(
830                    idx_id,
831                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
832                );
833            }
834            Some(ArrangementFlavor::Trace(gid, _, _)) => {
835                // Duplicate of existing arrangement with id `gid`, so
836                // just create another handle to that arrangement.
837                let trace = compute_state.traces.get(&gid).unwrap().clone();
838                compute_state.traces.set(idx_id, trace);
839            }
840            None => {
841                println!("collection available: {:?}", bundle.collection.is_none());
842                println!(
843                    "keys available: {:?}",
844                    bundle.arranged.keys().collect::<Vec<_>>()
845                );
846                panic!(
847                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
848                    Id::Global(idx_id),
849                    &idx.key
850                );
851            }
852        };
853    }
854}
855
856/// Information about bindings, tracked in `render_recursive_plan` and
857/// `render_plan`, to be passed to `render_letfree_plan`.
858///
859/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
860/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
861enum BindingInfo {
862    Body { in_let: bool },
863    Let { id: LocalId, last: bool },
864    LetRec { id: LocalId, last: bool },
865}
866
867impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
868    /// Renders a plan to a differential dataflow, producing the collection of results.
869    ///
870    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
871    /// in the context of `level` pre-existing iteration coordinates.
872    ///
873    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
874    /// and establishing the appropriate recursive dependencies among the bound variables.
875    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
876    /// further [`RecBind`]s are found.
877    ///
878    /// The method requires that all variables conclude with a physical representation that
879    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
880    fn render_recursive_plan(
881        &mut self,
882        object_id: GlobalId,
883        level: usize,
884        plan: RenderPlan,
885        binding: BindingInfo,
886    ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
887        for BindStage { lets, recs } in plan.binds {
888            // Render the let bindings in order.
889            let mut let_iter = lets.into_iter().peekable();
890            while let Some(LetBind { id, value }) = let_iter.next() {
891                let bundle =
892                    self.scope
893                        .clone()
894                        .region_named(&format!("Binding({:?})", id), |region| {
895                            let depends = value.depends();
896                            let last = let_iter.peek().is_none();
897                            let binding = BindingInfo::Let { id, last };
898                            self.enter_region(region, Some(&depends))
899                                .render_letfree_plan(object_id, value, binding)
900                                .leave_region(self.scope)
901                        });
902                self.insert_id(Id::Local(id), bundle);
903            }
904
905            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
906
907            // Define variables for rec bindings.
908            // It is important that we only use the `Variable` until the object is bound.
909            // At that point, all subsequent uses should have access to the object itself.
910            let mut variables = BTreeMap::new();
911            for id in rec_ids.iter() {
912                use differential_dataflow::dynamic::feedback_summary;
913                let inner = feedback_summary::<u64>(level + 1, 1);
914                let (oks_v, oks_collection) =
915                    Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
916                let (err_v, err_collection) =
917                    Variable::new(self.scope, Product::new(Default::default(), inner));
918
919                self.insert_id(
920                    Id::Local(*id),
921                    CollectionBundle::from_collections(oks_collection, err_collection),
922                );
923                variables.insert(Id::Local(*id), (oks_v, err_v));
924            }
925            // Now render each of the rec bindings.
926            let mut rec_iter = recs.into_iter().peekable();
927            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
928                let last = rec_iter.peek().is_none();
929                let binding = BindingInfo::LetRec { id, last };
930                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
931                // We need to ensure that the raw collection exists, but do not have enough information
932                // here to cause that to happen.
933                let (oks, mut err) = bundle.collection.clone().unwrap();
934                self.insert_id(Id::Local(id), bundle);
935                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
936
937                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
938                let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
939                    oks,
940                    "LetRecConsolidation",
941                );
942
943                if let Some(limit) = limit {
944                    // We swallow the results of the `max_iter`th iteration, because
945                    // these results would go into the `max_iter + 1`th iteration.
946                    let (in_limit, over_limit) =
947                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
948                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
949                            let iteration_index = *ps.get(level).unwrap_or(&0);
950                            // The pointstamp starts counting from 0, so we need to add 1.
951                            iteration_index + 1 >= limit.max_iters.into()
952                        });
953                    oks = VecCollection::new(in_limit);
954                    if !limit.return_at_limit {
955                        err = err.concat(VecCollection::new(over_limit).map(move |_data| {
956                            DataflowErrorSer::from(EvalError::LetRecLimitExceeded(
957                                format!("{}", limit.max_iters.get()).into(),
958                            ))
959                        }));
960                    }
961                }
962
963                // Set err variable to the distinct elements of `err`.
964                // Distinctness is important, as we otherwise might add the same error each iteration,
965                // say if the limit of `oks` has an error. This would result in non-terminating rather
966                // than a clean report of the error. The trade-off is that we lose information about
967                // multiplicities of errors, but .. this seems to be the better call.
968                let err: KeyCollection<_, _, _> = err.into();
969                let errs = err
970                    .mz_arrange::<
971                        ColumnationChunker<_>,
972                        ErrBatcher<_, _>,
973                        ErrBuilder<_, _>,
974                        ErrSpine<_, _>,
975                    >(
976                        "Arrange recursive err",
977                    )
978                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
979                        "Distinct recursive err",
980                        move |_k, _s, t| t.push(((), Diff::ONE)),
981                    )
982                    .as_collection(|k, _| k.clone());
983
984                oks_v.set(oks);
985                err_v.set(errs);
986            }
987            // Now extract each of the rec bindings into the outer scope.
988            for id in rec_ids.into_iter() {
989                let bundle = self.remove_id(Id::Local(id)).unwrap();
990                let (oks, err) = bundle.collection.unwrap();
991                self.insert_id(
992                    Id::Local(id),
993                    CollectionBundle::from_collections(
994                        oks.leave_dynamic(level + 1),
995                        err.leave_dynamic(level + 1),
996                    ),
997                );
998            }
999        }
1000
1001        self.render_letfree_plan(object_id, plan.body, binding)
1002    }
1003}
1004
1005impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
1006    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
1007    /// results.
1008    ///
1009    /// The return type reflects the uncertainty about the data representation, perhaps
1010    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
1011    ///
1012    /// # Panics
1013    ///
1014    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
1015    /// `render_recursive_plan` instead.
1016    fn render_plan(
1017        &mut self,
1018        object_id: GlobalId,
1019        plan: RenderPlan,
1020    ) -> CollectionBundle<'scope, T> {
1021        let mut in_let = false;
1022        for BindStage { lets, recs } in plan.binds {
1023            assert!(recs.is_empty());
1024
1025            let mut let_iter = lets.into_iter().peekable();
1026            while let Some(LetBind { id, value }) = let_iter.next() {
1027                // if we encounter a single let, the body is in a let
1028                in_let = true;
1029                let bundle =
1030                    self.scope
1031                        .clone()
1032                        .region_named(&format!("Binding({:?})", id), |region| {
1033                            let depends = value.depends();
1034                            let last = let_iter.peek().is_none();
1035                            let binding = BindingInfo::Let { id, last };
1036                            self.enter_region(region, Some(&depends))
1037                                .render_letfree_plan(object_id, value, binding)
1038                                .leave_region(self.scope)
1039                        });
1040                self.insert_id(Id::Local(id), bundle);
1041            }
1042        }
1043
1044        self.scope.clone().region_named("Main Body", |region| {
1045            let depends = plan.body.depends();
1046            self.enter_region(region, Some(&depends))
1047                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1048                .leave_region(self.scope)
1049        })
1050    }
1051
1052    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1053    fn render_letfree_plan(
1054        &self,
1055        object_id: GlobalId,
1056        plan: LetFreePlan,
1057        binding: BindingInfo,
1058    ) -> CollectionBundle<'scope, T> {
1059        let (mut nodes, root_id, topological_order) = plan.destruct();
1060
1061        // Rendered collections by their `LirId`.
1062        let mut collections = BTreeMap::new();
1063
1064        // Mappings to send along.
1065        // To save overhead, we'll only compute mappings when we need to,
1066        // which means things get gated behind options. Unfortunately, that means we
1067        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1068        // but there's no convenient way to express the invariant.
1069        let should_compute_lir_metadata = self.compute_logger.is_some();
1070        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1071            Some(Vec::with_capacity(nodes.len()))
1072        } else {
1073            None
1074        };
1075
1076        let mut topo_iter = topological_order.into_iter().peekable();
1077        while let Some(lir_id) = topo_iter.next() {
1078            let node = nodes.remove(&lir_id).unwrap();
1079
1080            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1081            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1082            // in some other structure and have that structure impl ExprHumanizer
1083            let metadata = if should_compute_lir_metadata {
1084                let operator = node.expr.humanize(&DummyHumanizer);
1085
1086                // mark the last operator in topo order with any binding decoration
1087                let operator = if topo_iter.peek().is_none() {
1088                    match &binding {
1089                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1090                        BindingInfo::Body { in_let: false } => operator,
1091                        BindingInfo::Let { id, last: true } => {
1092                            format!("With {id} = {operator}")
1093                        }
1094                        BindingInfo::Let { id, last: false } => {
1095                            format!("{id} = {operator}")
1096                        }
1097                        BindingInfo::LetRec { id, last: true } => {
1098                            format!("With Recursive {id} = {operator}")
1099                        }
1100                        BindingInfo::LetRec { id, last: false } => {
1101                            format!("{id} = {operator}")
1102                        }
1103                    }
1104                } else {
1105                    operator
1106                };
1107
1108                let operator_id_start = self.scope.worker().peek_identifier();
1109                Some((operator, operator_id_start))
1110            } else {
1111                None
1112            };
1113
1114            let mut bundle = self.render_plan_expr(node.expr, &collections);
1115
1116            if let Some((operator, operator_id_start)) = metadata {
1117                let operator_id_end = self.scope.worker().peek_identifier();
1118                let operator_span = (operator_id_start, operator_id_end);
1119
1120                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1121                    lir_mapping_metadata.push((
1122                        lir_id,
1123                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1124                    ))
1125                }
1126            }
1127
1128            self.log_operator_hydration(&mut bundle, lir_id);
1129
1130            collections.insert(lir_id, bundle);
1131        }
1132
1133        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1134            self.log_lir_mapping(object_id, lir_mapping_metadata);
1135        }
1136
1137        collections
1138            .remove(&root_id)
1139            .expect("LetFreePlan invariant (1)")
1140    }
1141
1142    /// Renders a [`render_plan::Expr`], producing the collection of results.
1143    ///
1144    /// # Panics
1145    ///
1146    /// Panics if any of the expr's inputs is not found in `collections`.
1147    /// Callers must ensure that input nodes have been rendered previously.
1148    fn render_plan_expr(
1149        &self,
1150        expr: render_plan::Expr,
1151        collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1152    ) -> CollectionBundle<'scope, T> {
1153        use render_plan::Expr::*;
1154
1155        let expect_input = |id| {
1156            collections
1157                .get(&id)
1158                .cloned()
1159                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1160        };
1161
1162        match expr {
1163            Constant { rows } => {
1164                // Produce both rows and errs to avoid conditional dataflow construction.
1165                let (rows, errs) = match rows {
1166                    Ok(rows) => (rows, Vec::new()),
1167                    Err(e) => (Vec::new(), vec![e]),
1168                };
1169
1170                // We should advance times in constant collections to start from `as_of`.
1171                let as_of_frontier = self.as_of_frontier.clone();
1172                let until = self.until.clone();
1173                let ok_collection = rows
1174                    .into_iter()
1175                    .filter_map(move |(row, mut time, diff)| {
1176                        time.advance_by(as_of_frontier.borrow());
1177                        if !until.less_equal(&time) {
1178                            Some((
1179                                row,
1180                                <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1181                                diff,
1182                            ))
1183                        } else {
1184                            None
1185                        }
1186                    })
1187                    .to_stream(self.scope)
1188                    .as_collection();
1189
1190                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1191                error_time.advance_by(self.as_of_frontier.borrow());
1192                let err_collection = errs
1193                    .into_iter()
1194                    .map(move |e| {
1195                        (
1196                            DataflowErrorSer::from(e),
1197                            <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1198                            Diff::ONE,
1199                        )
1200                    })
1201                    .to_stream(self.scope)
1202                    .as_collection();
1203
1204                CollectionBundle::from_collections(ok_collection, err_collection)
1205            }
1206            Get { id, keys, plan } => {
1207                // Recover the collection from `self` and then apply `mfp` to it.
1208                // If `mfp` happens to be trivial, we can just return the collection.
1209                let mut collection = self
1210                    .lookup_id(id)
1211                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1212                match plan {
1213                    mz_compute_types::plan::GetPlan::PassArrangements => {
1214                        // Assert that each of `keys` are present in `collection`.
1215                        assert!(
1216                            keys.arranged
1217                                .iter()
1218                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1219                        );
1220                        assert!(keys.raw <= collection.collection.is_some());
1221                        // Retain only those keys we want to import.
1222                        collection.arranged.retain(|key, _value| {
1223                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1224                        });
1225                        collection
1226                    }
1227                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1228                        let (oks, errs) = collection.as_collection_core(
1229                            mfp,
1230                            Some((key, row)),
1231                            self.until.clone(),
1232                            &self.config_set,
1233                        );
1234                        CollectionBundle::from_collections(oks, errs)
1235                    }
1236                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1237                        let (oks, errs) = collection.as_collection_core(
1238                            mfp,
1239                            None,
1240                            self.until.clone(),
1241                            &self.config_set,
1242                        );
1243                        CollectionBundle::from_collections(oks, errs)
1244                    }
1245                }
1246            }
1247            Mfp {
1248                input,
1249                mfp,
1250                input_key_val,
1251            } => {
1252                let input = expect_input(input);
1253                // If `mfp` is non-trivial, we should apply it and produce a collection.
1254                if mfp.is_identity() {
1255                    input
1256                } else {
1257                    let (oks, errs) = input.as_collection_core(
1258                        mfp,
1259                        input_key_val,
1260                        self.until.clone(),
1261                        &self.config_set,
1262                    );
1263                    CollectionBundle::from_collections(oks, errs)
1264                }
1265            }
1266            FlatMap {
1267                input_key,
1268                input,
1269                exprs,
1270                func,
1271                mfp_after: mfp,
1272            } => {
1273                let input = expect_input(input);
1274                self.render_flat_map(input_key, input, exprs, func, mfp)
1275            }
1276            Join { inputs, plan } => {
1277                let inputs = inputs.into_iter().map(expect_input).collect();
1278                match plan {
1279                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1280                        self.render_join(inputs, linear_plan)
1281                    }
1282                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1283                        self.render_delta_join(inputs, delta_plan)
1284                    }
1285                }
1286            }
1287            Reduce {
1288                input_key,
1289                input,
1290                key_val_plan,
1291                plan,
1292                mfp_after,
1293                temporal_bucketing_strategy,
1294            } => {
1295                let input = expect_input(input);
1296                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1297                self.render_reduce(
1298                    input_key,
1299                    input,
1300                    key_val_plan,
1301                    plan,
1302                    mfp_option,
1303                    temporal_bucketing_strategy,
1304                )
1305            }
1306            TopK {
1307                input,
1308                top_k_plan,
1309                temporal_bucketing_strategy,
1310            } => {
1311                let input = expect_input(input);
1312                self.render_topk(input, top_k_plan, temporal_bucketing_strategy)
1313            }
1314            Negate { input } => {
1315                let input = expect_input(input);
1316                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1317                CollectionBundle::from_collections(oks.negate(), errs)
1318            }
1319            Threshold {
1320                input,
1321                threshold_plan,
1322            } => {
1323                let input = expect_input(input);
1324                self.render_threshold(input, threshold_plan)
1325            }
1326            Union {
1327                inputs,
1328                consolidate_output,
1329                temporal_bucketing_strategies,
1330            } => {
1331                let mut oks = Vec::new();
1332                let mut errs = Vec::new();
1333                for (input, strategy) in inputs.into_iter().zip_eq(temporal_bucketing_strategies) {
1334                    let (os, es) =
1335                        expect_input(input).as_specific_collection(None, &self.config_set);
1336                    // Apply per-input temporal bucketing. No-op for `Direct`.
1337                    // Only consolidating Unions carry non-`Direct` strategies;
1338                    // see the `Union` arm of `lower_mir_expr_stack_safe`.
1339                    let os = if matches!(strategy, ArrangementStrategy::TemporalBucketing)
1340                        && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
1341                    {
1342                        let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1343                            .get(&self.config_set)
1344                            .try_into()
1345                            .expect("must fit");
1346                        T::maybe_apply_temporal_bucketing(
1347                            os.inner,
1348                            self.as_of_frontier.clone(),
1349                            summary,
1350                        )
1351                    } else {
1352                        os
1353                    };
1354                    oks.push(os);
1355                    errs.push(es);
1356                }
1357                let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1358                if consolidate_output {
1359                    oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1360                        oks,
1361                        "UnionConsolidation",
1362                    )
1363                }
1364                let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1365                CollectionBundle::from_collections(oks, errs)
1366            }
1367            ArrangeBy {
1368                input_key,
1369                input,
1370                input_mfp,
1371                forms: keys,
1372                strategy,
1373            } => {
1374                let input = expect_input(input);
1375                input.ensure_collections(
1376                    keys,
1377                    input_key,
1378                    input_mfp,
1379                    self.as_of_frontier.clone(),
1380                    self.until.clone(),
1381                    &self.config_set,
1382                    strategy,
1383                )
1384            }
1385        }
1386    }
1387
1388    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1389        if let Some(logger) = &self.compute_logger {
1390            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1391                dataflow_index,
1392                global_id,
1393            }));
1394        }
1395    }
1396
1397    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1398        if let Some(logger) = &self.compute_logger {
1399            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1400        }
1401    }
1402
1403    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1404        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1405        // which we should attach the logging operator.
1406        //
1407        // We could attach to each collection and track the lower bound of output frontiers.
1408        // However, that would be of limited use because we expect all collections to hydrate at
1409        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1410        // frontier of the unarranged collection advance, it will perform all work necessary to
1411        // also advance its own frontier. We don't expect significant delays between frontier
1412        // advancements of the unarranged and arranged collections, so attaching the logging
1413        // operator to any one of them should produce accurate results.
1414        //
1415        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1416        // beneficial to attach the logging operator to one of the arranged representation to avoid
1417        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1418        // if we attached the logging operator to it, we would introduce a fork in its output
1419        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1420        // that the output streams of the arrangements don't yet feed into anything else, so
1421        // attaching a (pass-through) logging operator does not introduce a fork.
1422
1423        match bundle.arranged.values_mut().next() {
1424            Some(arrangement) => {
1425                use ArrangementFlavor::*;
1426
1427                match arrangement {
1428                    Local(a, _) => {
1429                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1430                    }
1431                    Trace(_, a, _) => {
1432                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1433                    }
1434                }
1435            }
1436            None => {
1437                let (oks, _) = bundle
1438                    .collection
1439                    .as_mut()
1440                    .expect("CollectionBundle invariant");
1441                let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1442                *oks = stream.as_collection();
1443            }
1444        }
1445    }
1446
1447    fn log_operator_hydration_inner<D>(
1448        &self,
1449        stream: Stream<'scope, T, D>,
1450        lir_id: LirId,
1451    ) -> Stream<'scope, T, D>
1452    where
1453        D: timely::Container + Clone + 'static,
1454    {
1455        let Some(logger) = self.compute_logger.clone() else {
1456            return stream.clone(); // hydration logging disabled
1457        };
1458
1459        let export_ids = self.export_ids.clone();
1460
1461        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1462        //
1463        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1464        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1465        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1466        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1467        // instead.
1468        let mut hydration_frontier = Antichain::new();
1469        for time in self.as_of_frontier.iter() {
1470            if let Some(time) = time.try_step_forward() {
1471                hydration_frontier.insert(Refines::to_inner(time));
1472            }
1473        }
1474
1475        let name = format!("LogOperatorHydration ({lir_id})");
1476        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1477            let mut hydrated = false;
1478
1479            for &export_id in &export_ids {
1480                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1481                    export_id,
1482                    lir_id,
1483                    hydrated,
1484                }));
1485            }
1486
1487            move |(input, frontier), output| {
1488                // Pass through inputs.
1489                input.for_each(|cap, data| {
1490                    output.session(&cap).give_container(data);
1491                });
1492
1493                if hydrated {
1494                    return;
1495                }
1496
1497                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1498                    hydrated = true;
1499
1500                    for &export_id in &export_ids {
1501                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1502                            export_id,
1503                            lir_id,
1504                            hydrated,
1505                        }));
1506                    }
1507                }
1508            }
1509        })
1510    }
1511}
1512
1513#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1514/// A timestamp type that can be used for operations within MZ's dataflow layer.
1515pub trait RenderTimestamp: MzTimestamp + Default + Refines<mz_repr::Timestamp> {
1516    /// The system timestamp component of the timestamp.
1517    ///
1518    /// This is useful for manipulating the system time, as when delaying
1519    /// updates for subsequent cancellation, as with monotonic reduction.
1520    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1521    /// Effects a system delay in terms of the timestamp summary.
1522    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1523    /// The event timestamp component of the timestamp.
1524    fn event_time(&self) -> mz_repr::Timestamp;
1525    /// The event timestamp component of the timestamp, as a mutable reference.
1526    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1527    /// Effects an event delay in terms of the timestamp summary.
1528    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1529    /// Steps the timestamp back so that logical compaction to the output will
1530    /// not conflate `self` with any historical times.
1531    fn step_back(&self) -> Self;
1532}
1533
1534/// Apply temporal bucketing to a stream when the timestamp type supports it.
1535///
1536/// Sibling to [`RenderTimestamp`]: bucketing is an arrangement-time concern, not a
1537/// general property of a render timestamp, so the dispatch lives in its own trait.
1538/// Total-ordered timestamps perform real bucketing; partially-ordered timestamps
1539/// (e.g. `Product<…>` in iterative scopes) implement this as a no-op.
1540pub trait MaybeBucketByTime: Timestamp {
1541    fn maybe_apply_temporal_bucketing<'scope, D>(
1542        stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1543        as_of: Antichain<mz_repr::Timestamp>,
1544        summary: mz_repr::Timestamp,
1545    ) -> VecCollection<'scope, Self, D, Diff>
1546    where
1547        D: differential_dataflow::ExchangeData
1548            + crate::typedefs::MzData
1549            + differential_dataflow::Hashable;
1550}
1551
1552impl RenderTimestamp for mz_repr::Timestamp {
1553    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1554        self
1555    }
1556    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1557        delay
1558    }
1559    fn event_time(&self) -> mz_repr::Timestamp {
1560        *self
1561    }
1562    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1563        self
1564    }
1565    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1566        delay
1567    }
1568    fn step_back(&self) -> Self {
1569        self.saturating_sub(1)
1570    }
1571}
1572
1573impl MaybeBucketByTime for mz_repr::Timestamp {
1574    fn maybe_apply_temporal_bucketing<'scope, D>(
1575        stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1576        as_of: Antichain<mz_repr::Timestamp>,
1577        summary: mz_repr::Timestamp,
1578    ) -> VecCollection<'scope, Self, D, Diff>
1579    where
1580        D: differential_dataflow::ExchangeData
1581            + crate::typedefs::MzData
1582            + differential_dataflow::Hashable,
1583    {
1584        stream
1585            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
1586            .as_collection()
1587    }
1588}
1589
1590impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1591    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1592        &mut self.outer
1593    }
1594    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1595        Product::new(delay, Default::default())
1596    }
1597    fn event_time(&self) -> mz_repr::Timestamp {
1598        self.outer
1599    }
1600    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1601        &mut self.outer
1602    }
1603    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1604        Product::new(delay, Default::default())
1605    }
1606    fn step_back(&self) -> Self {
1607        // It is necessary to step back both coordinates of a product,
1608        // and when one is a `PointStamp` that also means all coordinates
1609        // of the pointstamp.
1610        let inner = self.inner.clone();
1611        let mut vec = inner.into_inner();
1612        for item in vec.iter_mut() {
1613            *item = item.saturating_sub(1);
1614        }
1615        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1616    }
1617}
1618
1619impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
1620    fn maybe_apply_temporal_bucketing<'scope, D>(
1621        stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1622        _as_of: Antichain<mz_repr::Timestamp>,
1623        _summary: mz_repr::Timestamp,
1624    ) -> VecCollection<'scope, Self, D, Diff>
1625    where
1626        D: differential_dataflow::ExchangeData
1627            + crate::typedefs::MzData
1628            + differential_dataflow::Hashable,
1629    {
1630        // TODO: Implement bucketing on outer timestamp for iterative scopes.
1631        stream.as_collection()
1632    }
1633}
1634
1635/// A signal that can be awaited by operators to suspend them prior to startup.
1636///
1637/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1638///
1639/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1640///
1641///  * Async operators can simply `await` it.
1642///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1643///    and then check `StartSignal::has_fired()` on each activation.
1644#[derive(Clone)]
1645pub(crate) struct StartSignal {
1646    /// A future that completes when the signal fires.
1647    ///
1648    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1649    /// signal is activated by dropping the corresponding `Sender`.
1650    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1651    /// A weak reference to the token, to register drop-on-fire values.
1652    token_ref: Weak<RefCell<Box<dyn Any>>>,
1653}
1654
1655impl StartSignal {
1656    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1657    /// dropped.
1658    pub fn new() -> (Self, Rc<dyn Any>) {
1659        let (tx, rx) = oneshot::channel::<Infallible>();
1660        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1661        let signal = Self {
1662            fut: rx.shared(),
1663            token_ref: Rc::downgrade(&token),
1664        };
1665        (signal, token)
1666    }
1667
1668    pub fn has_fired(&self) -> bool {
1669        self.token_ref.strong_count() == 0
1670    }
1671
1672    /// Returns a Send-safe future that completes when the signal fires.
1673    ///
1674    /// Unlike `StartSignal` itself, the returned future does not retain a reference to the token,
1675    /// so it cannot be used for `drop_on_fire` or `has_fired` checks.
1676    pub fn into_send_future(self) -> impl Future<Output = ()> + Send {
1677        use futures::FutureExt;
1678        self.fut.map(|_| ())
1679    }
1680
1681    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1682        if let Some(token) = self.token_ref.upgrade() {
1683            let mut token = token.borrow_mut();
1684            let inner = std::mem::replace(&mut *token, Box::new(()));
1685            *token = Box::new((inner, to_drop));
1686        }
1687    }
1688}
1689
1690impl Future for StartSignal {
1691    type Output = ();
1692
1693    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1694        self.fut.poll_unpin(cx).map(|_| ())
1695    }
1696}
1697
1698/// Extension trait to attach a `StartSignal` to operator outputs.
1699pub(crate) trait WithStartSignal {
1700    /// Delays data and progress updates until the start signal has fired.
1701    ///
1702    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1703    /// depending on the amount and shape of its inputs.
1704    fn with_start_signal(self, signal: StartSignal) -> Self;
1705}
1706
1707impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1708where
1709    Tr: TraceReader<Time: RenderTimestamp> + Clone,
1710{
1711    fn with_start_signal(self, signal: StartSignal) -> Self {
1712        Arranged {
1713            stream: self.stream.with_start_signal(signal),
1714            trace: self.trace,
1715        }
1716    }
1717}
1718
1719impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1720where
1721    D: timely::Container + Clone + 'static,
1722{
1723    fn with_start_signal(self, signal: StartSignal) -> Self {
1724        let activations = self.scope().activations();
1725        self.unary(Pipeline, "StartSignal", |_cap, info| {
1726            let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1727            signal.drop_on_fire(token);
1728
1729            let mut stash = Vec::new();
1730
1731            move |input, output| {
1732                // Stash incoming updates as long as the start signal has not fired.
1733                if !signal.has_fired() {
1734                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1735                    return;
1736                }
1737
1738                // Release any data we might still have stashed.
1739                for (cap, mut data) in std::mem::take(&mut stash) {
1740                    output.session(&cap).give_container(&mut data);
1741                }
1742
1743                // Pass through all remaining input data.
1744                input.for_each(|cap, data| {
1745                    output.session(&cap).give_container(data);
1746                });
1747            }
1748        })
1749    }
1750}
1751
1752/// Suppress progress messages for times before the given `as_of`.
1753///
1754/// This operator exists specifically to work around a memory spike we'd otherwise see when
1755/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1756/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1757/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1758/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1759/// the snapshot batch.
1760///
1761/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1762/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1763/// beyond the `as_of`, so the snapshot data has already been collected.
1764///
1765/// To ensure this, this operator needs to take two measures:
1766///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1767///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1768///
1769/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1770/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1771/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1772/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1773fn suppress_early_progress<'scope, T: Timestamp, D>(
1774    stream: Stream<'scope, T, D>,
1775    as_of: Antichain<T>,
1776) -> Stream<'scope, T, D>
1777where
1778    D: Data + timely::Container,
1779{
1780    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1781        let mut early_cap = Some(default_cap);
1782
1783        move |(input, frontier), output| {
1784            input.for_each_time(|data_cap, data| {
1785                if as_of.less_than(data_cap.time()) {
1786                    let mut session = output.session(&data_cap);
1787                    for data in data {
1788                        session.give_container(data);
1789                    }
1790                } else {
1791                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1792                    let mut session = output.session(&cap);
1793                    for data in data {
1794                        session.give_container(data);
1795                    }
1796                }
1797            });
1798
1799            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1800                early_cap.take();
1801            }
1802        }
1803    })
1804}
1805
1806/// Extension trait for [`Stream`] to selectively limit progress.
1807trait LimitProgress<T: Timestamp> {
1808    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1809    /// the implementation to observe times in data, and release capabilities based on the probe's
1810    /// frontier, after applying `slack` to round up timestamps.
1811    ///
1812    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1813    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1814    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1815    /// for times before the `upper`, which is the time until which the source can distinguish
1816    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1817    /// our capability.
1818    ///
1819    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1820    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1821    ///
1822    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1823    /// allow downstream operators to release resources.
1824    ///
1825    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1826    /// avoid unbounded memory usage.
1827    ///
1828    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1829    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1830    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1831    /// * `name` is a human-readable name for the operator.
1832    /// * `limit` is the maximum number of pending times to keep around.
1833    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1834    ///   retain a capability.
1835    fn limit_progress(
1836        self,
1837        handle: MzProbeHandle<T>,
1838        slack_ms: u64,
1839        limit: Option<usize>,
1840        upper: Antichain<T>,
1841        name: String,
1842    ) -> Self;
1843}
1844
1845// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1846// number.
1847impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1848    for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1849where
1850    D: Clone + 'static,
1851    R: Clone + 'static,
1852{
1853    fn limit_progress(
1854        self,
1855        handle: MzProbeHandle<mz_repr::Timestamp>,
1856        slack_ms: u64,
1857        limit: Option<usize>,
1858        upper: Antichain<mz_repr::Timestamp>,
1859        name: String,
1860    ) -> Self {
1861        let scope = self.scope();
1862        let stream =
1863            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1864                // Times that we've observed on our input.
1865                let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1866                // Capability for the lower bound of `pending_times`, if any.
1867                let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1868
1869                let activator = scope.activator_for(info.address);
1870                handle.activate(activator.clone());
1871
1872                move |(input, frontier), output| {
1873                    input.for_each(|cap, data| {
1874                        for time in data
1875                            .iter()
1876                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1877                        {
1878                            // `slack_ms == 0` means no rounding; otherwise round up to the next
1879                            // multiple of `slack_ms`. Avoids a divide-by-zero panic when the
1880                            // operator is configured without slack.
1881                            let rounded_time = if slack_ms == 0 {
1882                                time
1883                            } else {
1884                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms)
1885                            };
1886                            if !upper.less_than(&rounded_time.into()) {
1887                                pending_times.insert(rounded_time.into());
1888                            }
1889                        }
1890                        output.session(&cap).give_container(data);
1891                        if retained_cap.as_ref().is_none_or(|c| {
1892                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1893                        }) {
1894                            retained_cap = Some(cap.retain(0));
1895                        }
1896                    });
1897
1898                    handle.with_frontier(|f| {
1899                        while pending_times
1900                            .first()
1901                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1902                        {
1903                            let _ = pending_times.pop_first();
1904                        }
1905                    });
1906
1907                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1908                        let _ = pending_times.pop_first();
1909                    }
1910
1911                    match (retained_cap.as_mut(), pending_times.first()) {
1912                        (Some(cap), Some(first)) => cap.downgrade(first),
1913                        (_, None) => retained_cap = None,
1914                        _ => {}
1915                    }
1916
1917                    if frontier.is_empty() {
1918                        retained_cap = None;
1919                        pending_times.clear();
1920                    }
1921
1922                    if !pending_times.is_empty() {
1923                        tracing::debug!(
1924                            name,
1925                            info.global_id,
1926                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1927                            frontier = ?frontier.frontier().get(0),
1928                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1929                            ?upper,
1930                            "pending times",
1931                        );
1932                    }
1933                }
1934            });
1935        stream
1936    }
1937}
1938
1939/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1940/// the difference between timestamps.
1941struct PendingTimesDisplay<T>(T);
1942
1943impl<T> std::fmt::Display for PendingTimesDisplay<T>
1944where
1945    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1946{
1947    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1948        let mut iter = self.0.clone().into_iter();
1949        write!(f, "[")?;
1950        if let Some(first) = iter.next() {
1951            write!(f, "{}", first)?;
1952            let mut last = u64::from(first);
1953            for time in iter {
1954                write!(f, ", +{}", u64::from(time) - last)?;
1955                last = u64::from(time);
1956            }
1957        }
1958        write!(f, "]")?;
1959        Ok(())
1960    }
1961}
1962
1963/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1964/// into two rows, given the arity of the first component.
1965#[derive(Clone, Copy, Debug)]
1966struct Pairer {
1967    split_arity: usize,
1968}
1969
1970impl Pairer {
1971    /// Creates a pairer with knowledge of the arity of first component in the pair.
1972    fn new(split_arity: usize) -> Self {
1973        Self { split_arity }
1974    }
1975
1976    /// Merges a pair of datum iterators creating a `Row` instance.
1977    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1978    where
1979        I1: IntoIterator<Item = Datum<'a>>,
1980        I2: IntoIterator<Item = Datum<'a>>,
1981    {
1982        SharedRow::pack(first.into_iter().chain(second))
1983    }
1984
1985    /// Splits a datum iterator into a pair of `Row` instances.
1986    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1987        let mut datum_iter = datum_iter.into_iter();
1988        let mut row_builder = SharedRow::get();
1989        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1990        let second = row_builder.pack_using(datum_iter);
1991        (first, second)
1992    }
1993}