Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::arrange::ShutdownButton;
117use differential_dataflow::operators::iterate::Variable;
118use differential_dataflow::trace::{BatchReader, TraceReader};
119use differential_dataflow::{AsCollection, Data, VecCollection};
120use futures::FutureExt;
121use futures::channel::oneshot;
122use itertools::Itertools;
123use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
124use mz_compute_types::dyncfgs::{
125    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
126    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
127    ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
128};
129use mz_compute_types::plan::render_plan::{
130    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
131};
132use mz_compute_types::plan::{ArrangementStrategy, LirId};
133use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
134use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
135use mz_repr::explain::DummyHumanizer;
136use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
137use mz_storage_operators::persist_source;
138use mz_storage_types::controller::CollectionMetadata;
139use mz_timely_util::columnation::ColumnationChunker;
140use mz_timely_util::operator::{CollectionExt, StreamExt};
141use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
142use mz_timely_util::scope_label::ScopeExt;
143use timely::PartialOrder;
144use timely::container::CapacityContainerBuilder;
145use timely::dataflow::channels::pact::Pipeline;
146use timely::dataflow::operators::vec::ToStream;
147use timely::dataflow::operators::vec::{BranchWhen, Filter};
148use timely::dataflow::operators::{Capability, Operator, Probe, probe};
149use timely::dataflow::{Scope, Stream, StreamVec};
150use timely::order::{Product, TotalOrder};
151use timely::progress::timestamp::Refines;
152use timely::progress::{Antichain, Timestamp};
153use timely::scheduling::ActivateOnDrop;
154use timely::worker::Worker as TimelyWorker;
155
156use crate::arrangement::manager::TraceBundle;
157use crate::compute_state::ComputeState;
158use crate::extensions::arrange::{KeyCollection, MzArrange};
159use crate::extensions::reduce::MzReduce;
160use crate::extensions::temporal_bucket::TemporalBucketing;
161use crate::logging::compute::{
162    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
163};
164use crate::render::context::{ArrangementFlavor, Context};
165use crate::render::errors::DataflowErrorSer;
166use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
167use mz_row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
168
169pub mod context;
170pub(crate) mod errors;
171mod flat_map;
172mod join;
173mod reduce;
174pub mod sinks;
175mod threshold;
176mod top_k;
177
178pub use context::CollectionBundle;
179pub use join::LinearJoinSpec;
180
181/// Guard that presses a differential [`ShutdownButton`] when dropped.
182///
183/// Dropping this guard releases the imported trace's capabilities.
184struct PressOnDrop<T>(ShutdownButton<T>);
185
186impl<T> Drop for PressOnDrop<T> {
187    fn drop(&mut self) {
188        self.0.press();
189    }
190}
191
192/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
193///
194/// This method imports sources from provided assets, and then builds the remaining
195/// dataflow using "compute-local" assets like shared arrangements, and producing
196/// both arrangements and sinks.
197pub fn build_compute_dataflow(
198    timely_worker: &mut TimelyWorker,
199    compute_state: &mut ComputeState,
200    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
201    start_signal: StartSignal,
202    until: Antichain<mz_repr::Timestamp>,
203    dataflow_expiration: Antichain<mz_repr::Timestamp>,
204) {
205    // Mutually recursive view definitions require special handling.
206    let recursive = dataflow
207        .objects_to_build
208        .iter()
209        .any(|object| object.plan.is_recursive());
210
211    // Determine indexes to export, and their dependencies.
212    let indexes = dataflow
213        .index_exports
214        .iter()
215        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
216        .collect::<Vec<_>>();
217
218    // Determine sinks to export, and their dependencies.
219    let sinks = dataflow
220        .sink_exports
221        .iter()
222        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
223        .collect::<Vec<_>>();
224
225    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
226    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
227    let subscribe_snapshot_optimization =
228        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
229
230    let name = format!("Dataflow: {}", &dataflow.debug_name);
231    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
232    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
233
234    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
235        let scope = scope.with_label();
236
237        // The scope.clone() occurs to allow import in the region.
238        // We build a region here to establish a pattern of a scope inside the dataflow,
239        // so that other similar uses (e.g. with iterative scopes) do not require weird
240        // alternate type signatures.
241        let mut imported_sources = Vec::new();
242        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
243        let output_probe = MzProbeHandle::default();
244
245        scope.clone().region_named(&input_name, |region| {
246            // Import declared sources into the rendering context.
247            for (source_id, import) in dataflow.source_imports.iter() {
248                region.region_named(&format!("Source({:?})", source_id), |inner| {
249                    let mut read_schema = None;
250                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
251                        // If enabled, we read from Persist with a `RelationDesc` that
252                        // omits uneeded columns.
253                        if apply_demands {
254                            let demands = ops.demand();
255                            let new_desc = import
256                                .desc
257                                .storage_metadata
258                                .relation_desc
259                                .apply_demand(&demands);
260                            let new_arity = demands.len();
261                            let remap: BTreeMap<_, _> = demands
262                                .into_iter()
263                                .enumerate()
264                                .map(|(new, old)| (old, new))
265                                .collect();
266                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
267                            read_schema = Some(new_desc);
268                        }
269
270                        mz_expr::MfpPlan::create_from(ops)
271                            .expect("Linear operators should always be valid")
272                    });
273
274                    let snapshot_mode = if import.with_snapshot || !subscribe_snapshot_optimization
275                    {
276                        SnapshotMode::Include
277                    } else {
278                        compute_state.metrics.inc_subscribe_snapshot_optimization();
279                        SnapshotMode::Exclude
280                    };
281                    let suppress_early_progress_as_of = dataflow.as_of.clone();
282
283                    // Note: For correctness, we require that sources only emit times advanced by
284                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
285                    let (mut ok_stream, err_stream, token) =
286                        persist_source::persist_source::<DataflowErrorSer>(
287                            inner,
288                            *source_id,
289                            Arc::clone(&compute_state.persist_clients),
290                            &compute_state.txns_ctx,
291                            import.desc.storage_metadata.clone(),
292                            read_schema,
293                            dataflow.as_of.clone(),
294                            snapshot_mode,
295                            until.clone(),
296                            mfp.as_mut(),
297                            compute_state.dataflow_max_inflight_bytes(),
298                            start_signal.clone().into_send_future(),
299                            ErrorHandler::Halt("compute_import"),
300                        );
301
302                    // If `mfp` is non-identity, we need to apply what remains.
303                    // For the moment, assert that it is either trivial or `None`.
304                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
305
306                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
307                    // ensure that the first frontier we report into the dataflow is beyond the
308                    // `as_of`.
309                    if let Some(as_of) = suppress_early_progress_as_of {
310                        ok_stream = suppress_early_progress(ok_stream, as_of);
311                    }
312
313                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
314                        // Apply logical backpressure to the source.
315                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
316                            .get(&compute_state.worker_config);
317                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
318                            .get(&compute_state.worker_config)
319                            .as_millis()
320                            .try_into()
321                            .expect("must fit");
322
323                        let stream = ok_stream.limit_progress(
324                            output_probe.clone(),
325                            slack,
326                            limit,
327                            import.upper.clone(),
328                            name.clone(),
329                        );
330                        ok_stream = stream;
331                    }
332
333                    // Attach a probe reporting the input frontier.
334                    let input_probe =
335                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
336                    ok_stream = ok_stream.probe_with(&input_probe);
337
338                    let (oks, errs) = (
339                        ok_stream
340                            .as_collection()
341                            .leave_region(region)
342                            .leave_region(scope),
343                        err_stream
344                            .as_collection()
345                            .leave_region(region)
346                            .leave_region(scope),
347                    );
348
349                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
350
351                    // Associate returned tokens with the source identifier.
352                    tokens.insert(*source_id, Rc::new(token));
353                });
354            }
355        });
356
357        // If there exists a recursive expression, we'll need to use a non-region scope,
358        // in order to support additional timestamp coordinates for iteration.
359        if recursive {
360            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
361                let mut context = Context::for_dataflow_in(
362                    &dataflow,
363                    region.clone(),
364                    compute_state,
365                    until,
366                    dataflow_expiration,
367                );
368
369                for (id, (oks, errs)) in imported_sources.into_iter() {
370                    let bundle = crate::render::CollectionBundle::from_collections(
371                        oks.enter(region),
372                        errs.enter(region),
373                    );
374                    // Associate collection bundle with the source identifier.
375                    context.insert_id(id, bundle);
376                }
377
378                // Import declared indexes into the rendering context.
379                for (idx_id, idx) in &dataflow.index_imports {
380                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
381                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
382                        SnapshotMode::Include
383                    } else {
384                        compute_state.metrics.inc_subscribe_snapshot_optimization();
385                        SnapshotMode::Exclude
386                    };
387                    context.import_index(
388                        scope,
389                        compute_state,
390                        &mut tokens,
391                        input_probe,
392                        *idx_id,
393                        &idx.desc,
394                        &idx.typ,
395                        snapshot_mode,
396                        start_signal.clone(),
397                    );
398                }
399
400                // Build declared objects.
401                for object in dataflow.objects_to_build {
402                    let bundle = context.scope.clone().region_named(
403                        &format!("BuildingObject({:?})", object.id),
404                        |region| {
405                            let depends = object.plan.depends();
406                            let in_let = object.plan.is_recursive();
407                            context
408                                .enter_region(region, Some(&depends))
409                                .render_recursive_plan(
410                                    object.id,
411                                    0,
412                                    object.plan,
413                                    // recursive plans _must_ have bodies in a let
414                                    BindingInfo::Body { in_let },
415                                )
416                                .leave_region(context.scope)
417                        },
418                    );
419                    let global_id = object.id;
420
421                    context.log_dataflow_global_id(
422                        *bundle
423                            .scope()
424                            .addr()
425                            .first()
426                            .expect("Dataflow root id must exist"),
427                        global_id,
428                    );
429                    context.insert_id(Id::Global(object.id), bundle);
430                }
431
432                // Export declared indexes.
433                for (idx_id, dependencies, idx) in indexes {
434                    context.export_index_iterative(
435                        scope,
436                        compute_state,
437                        &tokens,
438                        dependencies,
439                        idx_id,
440                        &idx,
441                        &output_probe,
442                    );
443                }
444
445                // Export declared sinks.
446                for (sink_id, dependencies, sink) in sinks {
447                    context.export_sink(
448                        compute_state,
449                        &tokens,
450                        dependencies,
451                        sink_id,
452                        &sink,
453                        start_signal.clone(),
454                        &output_probe,
455                        scope,
456                    );
457                }
458            });
459        } else {
460            scope.clone().region_named(&build_name, |region| {
461                let mut context = Context::for_dataflow_in(
462                    &dataflow,
463                    region.clone(),
464                    compute_state,
465                    until,
466                    dataflow_expiration,
467                );
468
469                for (id, (oks, errs)) in imported_sources.into_iter() {
470                    let bundle = crate::render::CollectionBundle::from_collections(
471                        oks.enter_region(region),
472                        errs.enter_region(region),
473                    );
474                    // Associate collection bundle with the source identifier.
475                    context.insert_id(id, bundle);
476                }
477
478                // Import declared indexes into the rendering context.
479                for (idx_id, idx) in &dataflow.index_imports {
480                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
481                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
482                        SnapshotMode::Include
483                    } else {
484                        compute_state.metrics.inc_subscribe_snapshot_optimization();
485                        SnapshotMode::Exclude
486                    };
487                    context.import_index(
488                        scope,
489                        compute_state,
490                        &mut tokens,
491                        input_probe,
492                        *idx_id,
493                        &idx.desc,
494                        &idx.typ,
495                        snapshot_mode,
496                        start_signal.clone(),
497                    );
498                }
499
500                // Build declared objects.
501                for object in dataflow.objects_to_build {
502                    let bundle = context.scope.clone().region_named(
503                        &format!("BuildingObject({:?})", object.id),
504                        |region| {
505                            let depends = object.plan.depends();
506                            context
507                                .enter_region(region, Some(&depends))
508                                .render_plan(object.id, object.plan)
509                                .leave_region(context.scope)
510                        },
511                    );
512                    let global_id = object.id;
513                    context.log_dataflow_global_id(
514                        *bundle
515                            .scope()
516                            .addr()
517                            .first()
518                            .expect("Dataflow root id must exist"),
519                        global_id,
520                    );
521                    context.insert_id(Id::Global(object.id), bundle);
522                }
523
524                // Export declared indexes.
525                for (idx_id, dependencies, idx) in indexes {
526                    context.export_index(
527                        compute_state,
528                        &tokens,
529                        dependencies,
530                        idx_id,
531                        &idx,
532                        &output_probe,
533                    );
534                }
535
536                // Export declared sinks.
537                for (sink_id, dependencies, sink) in sinks {
538                    context.export_sink(
539                        compute_state,
540                        &tokens,
541                        dependencies,
542                        sink_id,
543                        &sink,
544                        start_signal.clone(),
545                        &output_probe,
546                        scope,
547                    );
548                }
549            });
550        }
551    });
552}
553
554// This implementation block allows child timestamps to vary from parent timestamps,
555// but requires the parent timestamp to be `repr::Timestamp`.
556impl<'g, T> Context<'g, T>
557where
558    T: Refines<mz_repr::Timestamp> + RenderTimestamp,
559{
560    /// Import the collection from the arrangement, discarding batches from the snapshot.
561    /// (This does not guarantee that no records from the snapshot are included; the assumption is
562    /// that we'll filter those out later if necessary.)
563    fn import_filtered_index_collection<
564        'outer,
565        Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
566        V: Data,
567    >(
568        &self,
569        arranged: Arranged<'outer, Tr>,
570        start_signal: StartSignal,
571        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
572    ) -> VecCollection<'g, T, V, Tr::Diff>
573    where
574        // This is implied by the fact that the outer timestamp = mz_repr::Timestamp, but it's essential
575        // for our batch-level filtering to be safe, so we document it here regardless.
576        mz_repr::Timestamp: TotalOrder,
577    {
578        let oks = arranged.stream.with_start_signal(start_signal).filter({
579            let as_of = self.as_of_frontier.clone();
580            move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
581        });
582        Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
583    }
584
585    pub(crate) fn import_index<'outer>(
586        &mut self,
587        outer: Scope<'outer, mz_repr::Timestamp>,
588        compute_state: &mut ComputeState,
589        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
590        input_probe: probe::Handle<mz_repr::Timestamp>,
591        idx_id: GlobalId,
592        idx: &IndexDesc,
593        typ: &ReprRelationType,
594        snapshot_mode: SnapshotMode,
595        start_signal: StartSignal,
596    ) {
597        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
598            assert!(
599                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
600                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
601            );
602
603            let token = traces.to_drop().clone();
604
605            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
606                outer,
607                &format!("Index({}, {:?})", idx.on_id, idx.key),
608                self.as_of_frontier.clone(),
609                self.until.clone(),
610            );
611
612            oks.stream = oks.stream.probe_with(&input_probe);
613
614            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
615                outer,
616                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
617                self.as_of_frontier.clone(),
618                self.until.clone(),
619            );
620
621            let bundle = match snapshot_mode {
622                SnapshotMode::Include => {
623                    let ok_arranged = oks
624                        .enter(self.scope)
625                        .with_start_signal(start_signal.clone());
626                    let err_arranged = err_arranged
627                        .enter(self.scope)
628                        .with_start_signal(start_signal);
629                    CollectionBundle::from_expressions(
630                        idx.key.clone(),
631                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
632                    )
633                }
634                SnapshotMode::Exclude => {
635                    // When we import an index without a snapshot, we have two balancing considerations:
636                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
637                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
638                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
639                    //   (joins, reduces, etc.) also require the snapshot data.
640                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
641                    let oks = {
642                        let mut datums = DatumVec::new();
643                        let (permutation, _thinning) =
644                            permutation_for_arrangement(&idx.key, typ.arity());
645                        self.import_filtered_index_collection(
646                            oks,
647                            start_signal.clone(),
648                            move |k: DatumSeq, v: DatumSeq| {
649                                let mut datums_borrow = datums.borrow();
650                                datums_borrow.extend(k);
651                                datums_borrow.extend(v);
652                                SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
653                            },
654                        )
655                    };
656                    let errs = self.import_filtered_index_collection(
657                        err_arranged,
658                        start_signal,
659                        |e, _| e.clone(),
660                    );
661                    CollectionBundle::from_collections(oks, errs)
662                }
663            };
664            self.update_id(Id::Global(idx.on_id), bundle);
665            tokens.insert(
666                idx_id,
667                Rc::new((PressOnDrop(ok_button), PressOnDrop(err_button), token)),
668            );
669        } else {
670            panic!(
671                "import of index {} failed while building dataflow {}",
672                idx_id, self.dataflow_id
673            );
674        }
675    }
676}
677
678// This implementation block requires the scopes have the same timestamp as the trace manager.
679// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
680impl<'g> Context<'g, mz_repr::Timestamp> {
681    pub(crate) fn export_index(
682        &self,
683        compute_state: &mut ComputeState,
684        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
685        dependency_ids: BTreeSet<GlobalId>,
686        idx_id: GlobalId,
687        idx: &IndexDesc,
688        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
689    ) {
690        // put together tokens that belong to the export
691        let mut needed_tokens = Vec::new();
692        for dep_id in dependency_ids {
693            if let Some(token) = tokens.get(&dep_id) {
694                needed_tokens.push(Rc::clone(token));
695            }
696        }
697        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
698            panic!(
699                "Arrangement alarmingly absent! id: {:?}",
700                Id::Global(idx_id)
701            )
702        });
703
704        match bundle.arrangement(&idx.key) {
705            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
706                // Ensure that the frontier does not advance past the expiration time, if set.
707                // Otherwise, we might write down incorrect data.
708                if let Some(&expiration) = self.dataflow_expiration.as_option() {
709                    oks.stream = oks.stream.expire_stream_at(
710                        &format!("{}_export_index_oks", self.debug_name),
711                        expiration,
712                    );
713                    errs.stream = errs.stream.expire_stream_at(
714                        &format!("{}_export_index_errs", self.debug_name),
715                        expiration,
716                    );
717                }
718
719                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
720
721                // Attach logging of dataflow errors.
722                if let Some(logger) = compute_state.compute_logger.clone() {
723                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
724                }
725
726                compute_state.traces.set(
727                    idx_id,
728                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
729                );
730            }
731            Some(ArrangementFlavor::Trace(gid, _, _)) => {
732                // Duplicate of existing arrangement with id `gid`, so
733                // just create another handle to that arrangement.
734                let trace = compute_state.traces.get(&gid).unwrap().clone();
735                compute_state.traces.set(idx_id, trace);
736            }
737            None => {
738                println!("collection available: {:?}", bundle.collection.is_none());
739                println!(
740                    "keys available: {:?}",
741                    bundle.arranged.keys().collect::<Vec<_>>()
742                );
743                panic!(
744                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
745                    Id::Global(idx_id),
746                    &idx.key
747                );
748            }
749        };
750    }
751}
752
753// This implementation block requires the scopes have the same timestamp as the trace manager.
754// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
755impl<'g, T> Context<'g, T>
756where
757    T: RenderTimestamp,
758{
759    pub(crate) fn export_index_iterative<'outer>(
760        &self,
761        outer: Scope<'outer, mz_repr::Timestamp>,
762        compute_state: &mut ComputeState,
763        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
764        dependency_ids: BTreeSet<GlobalId>,
765        idx_id: GlobalId,
766        idx: &IndexDesc,
767        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
768    ) {
769        // put together tokens that belong to the export
770        let mut needed_tokens = Vec::new();
771        for dep_id in dependency_ids {
772            if let Some(token) = tokens.get(&dep_id) {
773                needed_tokens.push(Rc::clone(token));
774            }
775        }
776        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
777            panic!(
778                "Arrangement alarmingly absent! id: {:?}",
779                Id::Global(idx_id)
780            )
781        });
782
783        match bundle.arrangement(&idx.key) {
784            Some(ArrangementFlavor::Local(oks, errs)) => {
785                // TODO: The following as_collection/leave/arrange sequence could be optimized.
786                //   * Combine as_collection and leave into a single function.
787                //   * Use columnar to extract columns from the batches to implement leave.
788                let mut oks = oks
789                    .as_collection(|k, v| (k.to_row(), v.to_row()))
790                    .leave(outer)
791                    .mz_arrange::<
792                        ColumnationChunker<_>,
793                        RowRowBatcher<_, _>,
794                        RowRowBuilder<_, _>,
795                        _,
796                    >(
797                        "Arrange export iterative",
798                    );
799
800                let mut errs = errs
801                    .as_collection(|k, v| (k.clone(), v.clone()))
802                    .leave(outer)
803                    .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
804                        "Arrange export iterative err",
805                    );
806
807                // Ensure that the frontier does not advance past the expiration time, if set.
808                // Otherwise, we might write down incorrect data.
809                if let Some(&expiration) = self.dataflow_expiration.as_option() {
810                    oks.stream = oks.stream.expire_stream_at(
811                        &format!("{}_export_index_iterative_oks", self.debug_name),
812                        expiration,
813                    );
814                    errs.stream = errs.stream.expire_stream_at(
815                        &format!("{}_export_index_iterative_err", self.debug_name),
816                        expiration,
817                    );
818                }
819
820                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
821
822                // Attach logging of dataflow errors.
823                if let Some(logger) = compute_state.compute_logger.clone() {
824                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
825                }
826
827                compute_state.traces.set(
828                    idx_id,
829                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
830                );
831            }
832            Some(ArrangementFlavor::Trace(gid, _, _)) => {
833                // Duplicate of existing arrangement with id `gid`, so
834                // just create another handle to that arrangement.
835                let trace = compute_state.traces.get(&gid).unwrap().clone();
836                compute_state.traces.set(idx_id, trace);
837            }
838            None => {
839                println!("collection available: {:?}", bundle.collection.is_none());
840                println!(
841                    "keys available: {:?}",
842                    bundle.arranged.keys().collect::<Vec<_>>()
843                );
844                panic!(
845                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
846                    Id::Global(idx_id),
847                    &idx.key
848                );
849            }
850        };
851    }
852}
853
854/// Information about bindings, tracked in `render_recursive_plan` and
855/// `render_plan`, to be passed to `render_letfree_plan`.
856///
857/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
858/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
859enum BindingInfo {
860    Body { in_let: bool },
861    Let { id: LocalId, last: bool },
862    LetRec { id: LocalId, last: bool },
863}
864
865impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
866    /// Renders a plan to a differential dataflow, producing the collection of results.
867    ///
868    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
869    /// in the context of `level` pre-existing iteration coordinates.
870    ///
871    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
872    /// and establishing the appropriate recursive dependencies among the bound variables.
873    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
874    /// further [`RecBind`]s are found.
875    ///
876    /// The method requires that all variables conclude with a physical representation that
877    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
878    fn render_recursive_plan(
879        &mut self,
880        object_id: GlobalId,
881        level: usize,
882        plan: RenderPlan,
883        binding: BindingInfo,
884    ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
885        for BindStage { lets, recs } in plan.binds {
886            // Render the let bindings in order.
887            let mut let_iter = lets.into_iter().peekable();
888            while let Some(LetBind { id, value }) = let_iter.next() {
889                let bundle =
890                    self.scope
891                        .clone()
892                        .region_named(&format!("Binding({:?})", id), |region| {
893                            let depends = value.depends();
894                            let last = let_iter.peek().is_none();
895                            let binding = BindingInfo::Let { id, last };
896                            self.enter_region(region, Some(&depends))
897                                .render_letfree_plan(object_id, value, binding)
898                                .leave_region(self.scope)
899                        });
900                self.insert_id(Id::Local(id), bundle);
901            }
902
903            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
904
905            // Define variables for rec bindings.
906            // It is important that we only use the `Variable` until the object is bound.
907            // At that point, all subsequent uses should have access to the object itself.
908            let mut variables = BTreeMap::new();
909            for id in rec_ids.iter() {
910                use differential_dataflow::dynamic::feedback_summary;
911                let inner = feedback_summary::<u64>(level + 1, 1);
912                let (oks_v, oks_collection) =
913                    Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
914                let (err_v, err_collection) =
915                    Variable::new(self.scope, Product::new(Default::default(), inner));
916
917                self.insert_id(
918                    Id::Local(*id),
919                    CollectionBundle::from_collections(oks_collection, err_collection),
920                );
921                variables.insert(Id::Local(*id), (oks_v, err_v));
922            }
923            // Now render each of the rec bindings.
924            let mut rec_iter = recs.into_iter().peekable();
925            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
926                let last = rec_iter.peek().is_none();
927                let binding = BindingInfo::LetRec { id, last };
928                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
929                // We need to ensure that the raw collection exists, but do not have enough information
930                // here to cause that to happen.
931                let (oks, mut err) = bundle.collection.clone().unwrap();
932                self.insert_id(Id::Local(id), bundle);
933                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
934
935                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
936                let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
937                    oks,
938                    "LetRecConsolidation",
939                );
940
941                if let Some(limit) = limit {
942                    // We swallow the results of the `max_iter`th iteration, because
943                    // these results would go into the `max_iter + 1`th iteration.
944                    let (in_limit, over_limit) =
945                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
946                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
947                            let iteration_index = *ps.get(level).unwrap_or(&0);
948                            // The pointstamp starts counting from 0, so we need to add 1.
949                            iteration_index + 1 >= limit.max_iters.into()
950                        });
951                    oks = VecCollection::new(in_limit);
952                    if !limit.return_at_limit {
953                        err = err.concat(VecCollection::new(over_limit).map(move |_data| {
954                            DataflowErrorSer::from(EvalError::LetRecLimitExceeded(
955                                format!("{}", limit.max_iters.get()).into(),
956                            ))
957                        }));
958                    }
959                }
960
961                // Set err variable to the distinct elements of `err`.
962                // Distinctness is important, as we otherwise might add the same error each iteration,
963                // say if the limit of `oks` has an error. This would result in non-terminating rather
964                // than a clean report of the error. The trade-off is that we lose information about
965                // multiplicities of errors, but .. this seems to be the better call.
966                let err: KeyCollection<_, _, _> = err.into();
967                let errs = err
968                    .mz_arrange::<
969                        ColumnationChunker<_>,
970                        ErrBatcher<_, _>,
971                        ErrBuilder<_, _>,
972                        ErrSpine<_, _>,
973                    >(
974                        "Arrange recursive err",
975                    )
976                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
977                        "Distinct recursive err",
978                        move |_k, _s, t| t.push(((), Diff::ONE)),
979                    )
980                    .as_collection(|k, _| k.clone());
981
982                oks_v.set(oks);
983                err_v.set(errs);
984            }
985            // Now extract each of the rec bindings into the outer scope.
986            for id in rec_ids.into_iter() {
987                let bundle = self.remove_id(Id::Local(id)).unwrap();
988                let (oks, err) = bundle.collection.unwrap();
989                self.insert_id(
990                    Id::Local(id),
991                    CollectionBundle::from_collections(
992                        oks.leave_dynamic(level + 1),
993                        err.leave_dynamic(level + 1),
994                    ),
995                );
996            }
997        }
998
999        self.render_letfree_plan(object_id, plan.body, binding)
1000    }
1001}
1002
1003impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
1004    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
1005    /// results.
1006    ///
1007    /// The return type reflects the uncertainty about the data representation, perhaps
1008    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
1009    ///
1010    /// # Panics
1011    ///
1012    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
1013    /// `render_recursive_plan` instead.
1014    fn render_plan(
1015        &mut self,
1016        object_id: GlobalId,
1017        plan: RenderPlan,
1018    ) -> CollectionBundle<'scope, T> {
1019        let mut in_let = false;
1020        for BindStage { lets, recs } in plan.binds {
1021            assert!(recs.is_empty());
1022
1023            let mut let_iter = lets.into_iter().peekable();
1024            while let Some(LetBind { id, value }) = let_iter.next() {
1025                // if we encounter a single let, the body is in a let
1026                in_let = true;
1027                let bundle =
1028                    self.scope
1029                        .clone()
1030                        .region_named(&format!("Binding({:?})", id), |region| {
1031                            let depends = value.depends();
1032                            let last = let_iter.peek().is_none();
1033                            let binding = BindingInfo::Let { id, last };
1034                            self.enter_region(region, Some(&depends))
1035                                .render_letfree_plan(object_id, value, binding)
1036                                .leave_region(self.scope)
1037                        });
1038                self.insert_id(Id::Local(id), bundle);
1039            }
1040        }
1041
1042        self.scope.clone().region_named("Main Body", |region| {
1043            let depends = plan.body.depends();
1044            self.enter_region(region, Some(&depends))
1045                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1046                .leave_region(self.scope)
1047        })
1048    }
1049
1050    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1051    fn render_letfree_plan(
1052        &self,
1053        object_id: GlobalId,
1054        plan: LetFreePlan,
1055        binding: BindingInfo,
1056    ) -> CollectionBundle<'scope, T> {
1057        let (mut nodes, root_id, topological_order) = plan.destruct();
1058
1059        // Rendered collections by their `LirId`.
1060        let mut collections = BTreeMap::new();
1061
1062        // Mappings to send along.
1063        // To save overhead, we'll only compute mappings when we need to,
1064        // which means things get gated behind options. Unfortunately, that means we
1065        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1066        // but there's no convenient way to express the invariant.
1067        let should_compute_lir_metadata = self.compute_logger.is_some();
1068        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1069            Some(Vec::with_capacity(nodes.len()))
1070        } else {
1071            None
1072        };
1073
1074        let mut topo_iter = topological_order.into_iter().peekable();
1075        while let Some(lir_id) = topo_iter.next() {
1076            let node = nodes.remove(&lir_id).unwrap();
1077
1078            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1079            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1080            // in some other structure and have that structure impl ExprHumanizer
1081            let metadata = if should_compute_lir_metadata {
1082                let operator = node.expr.humanize(&DummyHumanizer);
1083
1084                // mark the last operator in topo order with any binding decoration
1085                let operator = if topo_iter.peek().is_none() {
1086                    match &binding {
1087                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1088                        BindingInfo::Body { in_let: false } => operator,
1089                        BindingInfo::Let { id, last: true } => {
1090                            format!("With {id} = {operator}")
1091                        }
1092                        BindingInfo::Let { id, last: false } => {
1093                            format!("{id} = {operator}")
1094                        }
1095                        BindingInfo::LetRec { id, last: true } => {
1096                            format!("With Recursive {id} = {operator}")
1097                        }
1098                        BindingInfo::LetRec { id, last: false } => {
1099                            format!("{id} = {operator}")
1100                        }
1101                    }
1102                } else {
1103                    operator
1104                };
1105
1106                let operator_id_start = self.scope.worker().peek_identifier();
1107                Some((operator, operator_id_start))
1108            } else {
1109                None
1110            };
1111
1112            let mut bundle = self.render_plan_expr(node.expr, &collections);
1113
1114            if let Some((operator, operator_id_start)) = metadata {
1115                let operator_id_end = self.scope.worker().peek_identifier();
1116                let operator_span = (operator_id_start, operator_id_end);
1117
1118                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1119                    lir_mapping_metadata.push((
1120                        lir_id,
1121                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1122                    ))
1123                }
1124            }
1125
1126            self.log_operator_hydration(&mut bundle, lir_id);
1127
1128            collections.insert(lir_id, bundle);
1129        }
1130
1131        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1132            self.log_lir_mapping(object_id, lir_mapping_metadata);
1133        }
1134
1135        collections
1136            .remove(&root_id)
1137            .expect("LetFreePlan invariant (1)")
1138    }
1139
1140    /// Renders a [`render_plan::Expr`], producing the collection of results.
1141    ///
1142    /// # Panics
1143    ///
1144    /// Panics if any of the expr's inputs is not found in `collections`.
1145    /// Callers must ensure that input nodes have been rendered previously.
1146    fn render_plan_expr(
1147        &self,
1148        expr: render_plan::Expr,
1149        collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1150    ) -> CollectionBundle<'scope, T> {
1151        use render_plan::Expr::*;
1152
1153        let expect_input = |id| {
1154            collections
1155                .get(&id)
1156                .cloned()
1157                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1158        };
1159
1160        match expr {
1161            Constant { rows } => {
1162                // Produce both rows and errs to avoid conditional dataflow construction.
1163                let (rows, errs) = match rows {
1164                    Ok(rows) => (rows, Vec::new()),
1165                    Err(e) => (Vec::new(), vec![e]),
1166                };
1167
1168                // We should advance times in constant collections to start from `as_of`.
1169                let as_of_frontier = self.as_of_frontier.clone();
1170                let until = self.until.clone();
1171                let ok_collection = rows
1172                    .into_iter()
1173                    .filter_map(move |(row, mut time, diff)| {
1174                        time.advance_by(as_of_frontier.borrow());
1175                        if !until.less_equal(&time) {
1176                            Some((
1177                                row,
1178                                <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1179                                diff,
1180                            ))
1181                        } else {
1182                            None
1183                        }
1184                    })
1185                    .to_stream(self.scope)
1186                    .as_collection();
1187
1188                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1189                error_time.advance_by(self.as_of_frontier.borrow());
1190                let err_collection = errs
1191                    .into_iter()
1192                    .map(move |e| {
1193                        (
1194                            DataflowErrorSer::from(e),
1195                            <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1196                            Diff::ONE,
1197                        )
1198                    })
1199                    .to_stream(self.scope)
1200                    .as_collection();
1201
1202                CollectionBundle::from_collections(ok_collection, err_collection)
1203            }
1204            Get { id, keys, plan } => {
1205                // Recover the collection from `self` and then apply `mfp` to it.
1206                // If `mfp` happens to be trivial, we can just return the collection.
1207                let mut collection = self
1208                    .lookup_id(id)
1209                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1210                match plan {
1211                    mz_compute_types::plan::GetPlan::PassArrangements => {
1212                        // Assert that each of `keys` are present in `collection`.
1213                        assert!(
1214                            keys.arranged
1215                                .iter()
1216                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1217                        );
1218                        assert!(keys.raw <= collection.collection.is_some());
1219                        // Retain only those keys we want to import.
1220                        collection.arranged.retain(|key, _value| {
1221                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1222                        });
1223                        collection
1224                    }
1225                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1226                        let (oks, errs) = collection.as_collection_core(
1227                            mfp,
1228                            Some((key, row)),
1229                            self.until.clone(),
1230                            &self.config_set,
1231                        );
1232                        CollectionBundle::from_collections(oks, errs)
1233                    }
1234                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1235                        let (oks, errs) = collection.as_collection_core(
1236                            mfp,
1237                            None,
1238                            self.until.clone(),
1239                            &self.config_set,
1240                        );
1241                        CollectionBundle::from_collections(oks, errs)
1242                    }
1243                }
1244            }
1245            Mfp {
1246                input,
1247                mfp,
1248                input_key_val,
1249            } => {
1250                let input = expect_input(input);
1251                // If `mfp` is non-trivial, we should apply it and produce a collection.
1252                if mfp.is_identity() {
1253                    input
1254                } else {
1255                    let (oks, errs) = input.as_collection_core(
1256                        mfp,
1257                        input_key_val,
1258                        self.until.clone(),
1259                        &self.config_set,
1260                    );
1261                    CollectionBundle::from_collections(oks, errs)
1262                }
1263            }
1264            FlatMap {
1265                input_key,
1266                input,
1267                exprs,
1268                func,
1269                mfp_after: mfp,
1270            } => {
1271                let input = expect_input(input);
1272                self.render_flat_map(input_key, input, exprs, func, mfp)
1273            }
1274            Join { inputs, plan } => {
1275                let inputs = inputs.into_iter().map(expect_input).collect();
1276                match plan {
1277                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1278                        self.render_join(inputs, linear_plan)
1279                    }
1280                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1281                        self.render_delta_join(inputs, delta_plan)
1282                    }
1283                }
1284            }
1285            Reduce {
1286                input_key,
1287                input,
1288                key_val_plan,
1289                plan,
1290                mfp_after,
1291                temporal_bucketing_strategy,
1292            } => {
1293                let input = expect_input(input);
1294                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1295                self.render_reduce(
1296                    input_key,
1297                    input,
1298                    key_val_plan,
1299                    plan,
1300                    mfp_option,
1301                    temporal_bucketing_strategy,
1302                )
1303            }
1304            TopK {
1305                input,
1306                top_k_plan,
1307                temporal_bucketing_strategy,
1308            } => {
1309                let input = expect_input(input);
1310                self.render_topk(input, top_k_plan, temporal_bucketing_strategy)
1311            }
1312            Negate { input } => {
1313                let input = expect_input(input);
1314                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1315                CollectionBundle::from_collections(oks.negate(), errs)
1316            }
1317            Threshold {
1318                input,
1319                threshold_plan,
1320            } => {
1321                let input = expect_input(input);
1322                self.render_threshold(input, threshold_plan)
1323            }
1324            Union {
1325                inputs,
1326                consolidate_output,
1327                temporal_bucketing_strategies,
1328            } => {
1329                let mut oks = Vec::new();
1330                let mut errs = Vec::new();
1331                for (input, strategy) in inputs.into_iter().zip_eq(temporal_bucketing_strategies) {
1332                    let (os, es) =
1333                        expect_input(input).as_specific_collection(None, &self.config_set);
1334                    // Apply per-input temporal bucketing. No-op for `Direct`.
1335                    // Only consolidating Unions carry non-`Direct` strategies;
1336                    // see the `Union` arm of `lower_mir_expr_stack_safe`.
1337                    let os = if matches!(strategy, ArrangementStrategy::TemporalBucketing)
1338                        && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
1339                    {
1340                        let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1341                            .get(&self.config_set)
1342                            .try_into()
1343                            .expect("must fit");
1344                        T::maybe_apply_temporal_bucketing(
1345                            os.inner,
1346                            self.as_of_frontier.clone(),
1347                            summary,
1348                        )
1349                    } else {
1350                        os
1351                    };
1352                    oks.push(os);
1353                    errs.push(es);
1354                }
1355                let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1356                if consolidate_output {
1357                    oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1358                        oks,
1359                        "UnionConsolidation",
1360                    )
1361                }
1362                let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1363                CollectionBundle::from_collections(oks, errs)
1364            }
1365            ArrangeBy {
1366                input_key,
1367                input,
1368                input_mfp,
1369                forms: keys,
1370                strategy,
1371            } => {
1372                let input = expect_input(input);
1373                input.ensure_collections(
1374                    keys,
1375                    input_key,
1376                    input_mfp,
1377                    self.as_of_frontier.clone(),
1378                    self.until.clone(),
1379                    &self.config_set,
1380                    strategy,
1381                )
1382            }
1383        }
1384    }
1385
1386    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1387        if let Some(logger) = &self.compute_logger {
1388            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1389                dataflow_index,
1390                global_id,
1391            }));
1392        }
1393    }
1394
1395    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1396        if let Some(logger) = &self.compute_logger {
1397            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1398        }
1399    }
1400
1401    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1402        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1403        // which we should attach the logging operator.
1404        //
1405        // We could attach to each collection and track the lower bound of output frontiers.
1406        // However, that would be of limited use because we expect all collections to hydrate at
1407        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1408        // frontier of the unarranged collection advance, it will perform all work necessary to
1409        // also advance its own frontier. We don't expect significant delays between frontier
1410        // advancements of the unarranged and arranged collections, so attaching the logging
1411        // operator to any one of them should produce accurate results.
1412        //
1413        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1414        // beneficial to attach the logging operator to one of the arranged representation to avoid
1415        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1416        // if we attached the logging operator to it, we would introduce a fork in its output
1417        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1418        // that the output streams of the arrangements don't yet feed into anything else, so
1419        // attaching a (pass-through) logging operator does not introduce a fork.
1420
1421        match bundle.arranged.values_mut().next() {
1422            Some(arrangement) => {
1423                use ArrangementFlavor::*;
1424
1425                match arrangement {
1426                    Local(a, _) => {
1427                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1428                    }
1429                    Trace(_, a, _) => {
1430                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1431                    }
1432                }
1433            }
1434            None => {
1435                let (oks, _) = bundle
1436                    .collection
1437                    .as_mut()
1438                    .expect("CollectionBundle invariant");
1439                let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1440                *oks = stream.as_collection();
1441            }
1442        }
1443    }
1444
1445    fn log_operator_hydration_inner<D>(
1446        &self,
1447        stream: Stream<'scope, T, D>,
1448        lir_id: LirId,
1449    ) -> Stream<'scope, T, D>
1450    where
1451        D: timely::Container + Clone + 'static,
1452    {
1453        let Some(logger) = self.compute_logger.clone() else {
1454            return stream.clone(); // hydration logging disabled
1455        };
1456
1457        let export_ids = self.export_ids.clone();
1458
1459        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1460        //
1461        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1462        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1463        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1464        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1465        // instead.
1466        let mut hydration_frontier = Antichain::new();
1467        for time in self.as_of_frontier.iter() {
1468            if let Some(time) = time.try_step_forward() {
1469                hydration_frontier.insert(Refines::to_inner(time));
1470            }
1471        }
1472
1473        let name = format!("LogOperatorHydration ({lir_id})");
1474        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1475            let mut hydrated = false;
1476
1477            for &export_id in &export_ids {
1478                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1479                    export_id,
1480                    lir_id,
1481                    hydrated,
1482                }));
1483            }
1484
1485            move |(input, frontier), output| {
1486                // Pass through inputs.
1487                input.for_each(|cap, data| {
1488                    output.session(&cap).give_container(data);
1489                });
1490
1491                if hydrated {
1492                    return;
1493                }
1494
1495                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1496                    hydrated = true;
1497
1498                    for &export_id in &export_ids {
1499                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1500                            export_id,
1501                            lir_id,
1502                            hydrated,
1503                        }));
1504                    }
1505                }
1506            }
1507        })
1508    }
1509}
1510
1511#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1512/// A timestamp type that can be used for operations within MZ's dataflow layer.
1513pub trait RenderTimestamp: MzTimestamp + Default + Refines<mz_repr::Timestamp> {
1514    /// The system timestamp component of the timestamp.
1515    ///
1516    /// This is useful for manipulating the system time, as when delaying
1517    /// updates for subsequent cancellation, as with monotonic reduction.
1518    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1519    /// Effects a system delay in terms of the timestamp summary.
1520    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1521    /// The event timestamp component of the timestamp.
1522    fn event_time(&self) -> mz_repr::Timestamp;
1523    /// The event timestamp component of the timestamp, as a mutable reference.
1524    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1525    /// Effects an event delay in terms of the timestamp summary.
1526    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1527    /// Steps the timestamp back so that logical compaction to the output will
1528    /// not conflate `self` with any historical times.
1529    fn step_back(&self) -> Self;
1530}
1531
1532/// Apply temporal bucketing to a stream when the timestamp type supports it.
1533///
1534/// Sibling to [`RenderTimestamp`]: bucketing is an arrangement-time concern, not a
1535/// general property of a render timestamp, so the dispatch lives in its own trait.
1536/// Total-ordered timestamps perform real bucketing; partially-ordered timestamps
1537/// (e.g. `Product<…>` in iterative scopes) implement this as a no-op.
1538pub trait MaybeBucketByTime: Timestamp {
1539    fn maybe_apply_temporal_bucketing<'scope, D>(
1540        stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1541        as_of: Antichain<mz_repr::Timestamp>,
1542        summary: mz_repr::Timestamp,
1543    ) -> VecCollection<'scope, Self, D, Diff>
1544    where
1545        D: differential_dataflow::ExchangeData
1546            + crate::typedefs::MzData
1547            + differential_dataflow::Hashable;
1548}
1549
1550impl RenderTimestamp for mz_repr::Timestamp {
1551    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1552        self
1553    }
1554    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1555        delay
1556    }
1557    fn event_time(&self) -> mz_repr::Timestamp {
1558        *self
1559    }
1560    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1561        self
1562    }
1563    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1564        delay
1565    }
1566    fn step_back(&self) -> Self {
1567        self.saturating_sub(1)
1568    }
1569}
1570
1571impl MaybeBucketByTime for mz_repr::Timestamp {
1572    fn maybe_apply_temporal_bucketing<'scope, D>(
1573        stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1574        as_of: Antichain<mz_repr::Timestamp>,
1575        summary: mz_repr::Timestamp,
1576    ) -> VecCollection<'scope, Self, D, Diff>
1577    where
1578        D: differential_dataflow::ExchangeData
1579            + crate::typedefs::MzData
1580            + differential_dataflow::Hashable,
1581    {
1582        stream
1583            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
1584            .as_collection()
1585    }
1586}
1587
1588impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1589    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1590        &mut self.outer
1591    }
1592    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1593        Product::new(delay, Default::default())
1594    }
1595    fn event_time(&self) -> mz_repr::Timestamp {
1596        self.outer
1597    }
1598    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1599        &mut self.outer
1600    }
1601    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1602        Product::new(delay, Default::default())
1603    }
1604    fn step_back(&self) -> Self {
1605        // It is necessary to step back both coordinates of a product,
1606        // and when one is a `PointStamp` that also means all coordinates
1607        // of the pointstamp.
1608        let inner = self.inner.clone();
1609        let mut vec = inner.into_inner();
1610        for item in vec.iter_mut() {
1611            *item = item.saturating_sub(1);
1612        }
1613        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1614    }
1615}
1616
1617impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
1618    fn maybe_apply_temporal_bucketing<'scope, D>(
1619        stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1620        _as_of: Antichain<mz_repr::Timestamp>,
1621        _summary: mz_repr::Timestamp,
1622    ) -> VecCollection<'scope, Self, D, Diff>
1623    where
1624        D: differential_dataflow::ExchangeData
1625            + crate::typedefs::MzData
1626            + differential_dataflow::Hashable,
1627    {
1628        // TODO: Implement bucketing on outer timestamp for iterative scopes.
1629        stream.as_collection()
1630    }
1631}
1632
1633/// A signal that can be awaited by operators to suspend them prior to startup.
1634///
1635/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1636///
1637/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1638///
1639///  * Async operators can simply `await` it.
1640///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1641///    and then check `StartSignal::has_fired()` on each activation.
1642#[derive(Clone)]
1643pub(crate) struct StartSignal {
1644    /// A future that completes when the signal fires.
1645    ///
1646    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1647    /// signal is activated by dropping the corresponding `Sender`.
1648    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1649    /// A weak reference to the token, to register drop-on-fire values.
1650    token_ref: Weak<RefCell<Box<dyn Any>>>,
1651}
1652
1653impl StartSignal {
1654    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1655    /// dropped.
1656    pub fn new() -> (Self, Rc<dyn Any>) {
1657        let (tx, rx) = oneshot::channel::<Infallible>();
1658        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1659        let signal = Self {
1660            fut: rx.shared(),
1661            token_ref: Rc::downgrade(&token),
1662        };
1663        (signal, token)
1664    }
1665
1666    pub fn has_fired(&self) -> bool {
1667        self.token_ref.strong_count() == 0
1668    }
1669
1670    /// Returns a Send-safe future that completes when the signal fires.
1671    ///
1672    /// Unlike `StartSignal` itself, the returned future does not retain a reference to the token,
1673    /// so it cannot be used for `drop_on_fire` or `has_fired` checks.
1674    pub fn into_send_future(self) -> impl Future<Output = ()> + Send {
1675        use futures::FutureExt;
1676        self.fut.map(|_| ())
1677    }
1678
1679    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1680        if let Some(token) = self.token_ref.upgrade() {
1681            let mut token = token.borrow_mut();
1682            let inner = std::mem::replace(&mut *token, Box::new(()));
1683            *token = Box::new((inner, to_drop));
1684        }
1685    }
1686}
1687
1688impl Future for StartSignal {
1689    type Output = ();
1690
1691    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1692        self.fut.poll_unpin(cx).map(|_| ())
1693    }
1694}
1695
1696/// Extension trait to attach a `StartSignal` to operator outputs.
1697pub(crate) trait WithStartSignal {
1698    /// Delays data and progress updates until the start signal has fired.
1699    ///
1700    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1701    /// depending on the amount and shape of its inputs.
1702    fn with_start_signal(self, signal: StartSignal) -> Self;
1703}
1704
1705impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1706where
1707    Tr: TraceReader<Time: RenderTimestamp> + Clone,
1708{
1709    fn with_start_signal(self, signal: StartSignal) -> Self {
1710        Arranged {
1711            stream: self.stream.with_start_signal(signal),
1712            trace: self.trace,
1713        }
1714    }
1715}
1716
1717impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1718where
1719    D: timely::Container + Clone + 'static,
1720{
1721    fn with_start_signal(self, signal: StartSignal) -> Self {
1722        let activations = self.scope().activations();
1723        self.unary(Pipeline, "StartSignal", |_cap, info| {
1724            let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1725            signal.drop_on_fire(token);
1726
1727            let mut stash = Vec::new();
1728
1729            move |input, output| {
1730                // Stash incoming updates as long as the start signal has not fired.
1731                if !signal.has_fired() {
1732                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1733                    return;
1734                }
1735
1736                // Release any data we might still have stashed.
1737                for (cap, mut data) in std::mem::take(&mut stash) {
1738                    output.session(&cap).give_container(&mut data);
1739                }
1740
1741                // Pass through all remaining input data.
1742                input.for_each(|cap, data| {
1743                    output.session(&cap).give_container(data);
1744                });
1745            }
1746        })
1747    }
1748}
1749
1750/// Suppress progress messages for times before the given `as_of`.
1751///
1752/// This operator exists specifically to work around a memory spike we'd otherwise see when
1753/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1754/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1755/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1756/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1757/// the snapshot batch.
1758///
1759/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1760/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1761/// beyond the `as_of`, so the snapshot data has already been collected.
1762///
1763/// To ensure this, this operator needs to take two measures:
1764///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1765///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1766///
1767/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1768/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1769/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1770/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1771fn suppress_early_progress<'scope, T: Timestamp, D>(
1772    stream: Stream<'scope, T, D>,
1773    as_of: Antichain<T>,
1774) -> Stream<'scope, T, D>
1775where
1776    D: Data + timely::Container,
1777{
1778    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1779        let mut early_cap = Some(default_cap);
1780
1781        move |(input, frontier), output| {
1782            input.for_each_time(|data_cap, data| {
1783                if as_of.less_than(data_cap.time()) {
1784                    let mut session = output.session(&data_cap);
1785                    for data in data {
1786                        session.give_container(data);
1787                    }
1788                } else {
1789                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1790                    let mut session = output.session(&cap);
1791                    for data in data {
1792                        session.give_container(data);
1793                    }
1794                }
1795            });
1796
1797            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1798                early_cap.take();
1799            }
1800        }
1801    })
1802}
1803
1804/// Extension trait for [`Stream`] to selectively limit progress.
1805trait LimitProgress<T: Timestamp> {
1806    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1807    /// the implementation to observe times in data, and release capabilities based on the probe's
1808    /// frontier, after applying `slack` to round up timestamps.
1809    ///
1810    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1811    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1812    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1813    /// for times before the `upper`, which is the time until which the source can distinguish
1814    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1815    /// our capability.
1816    ///
1817    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1818    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1819    ///
1820    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1821    /// allow downstream operators to release resources.
1822    ///
1823    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1824    /// avoid unbounded memory usage.
1825    ///
1826    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1827    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1828    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1829    /// * `name` is a human-readable name for the operator.
1830    /// * `limit` is the maximum number of pending times to keep around.
1831    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1832    ///   retain a capability.
1833    fn limit_progress(
1834        self,
1835        handle: MzProbeHandle<T>,
1836        slack_ms: u64,
1837        limit: Option<usize>,
1838        upper: Antichain<T>,
1839        name: String,
1840    ) -> Self;
1841}
1842
1843// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1844// number.
1845impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1846    for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1847where
1848    D: Clone + 'static,
1849    R: Clone + 'static,
1850{
1851    fn limit_progress(
1852        self,
1853        handle: MzProbeHandle<mz_repr::Timestamp>,
1854        slack_ms: u64,
1855        limit: Option<usize>,
1856        upper: Antichain<mz_repr::Timestamp>,
1857        name: String,
1858    ) -> Self {
1859        let scope = self.scope();
1860        let stream =
1861            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1862                // Times that we've observed on our input.
1863                let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1864                // Capability for the lower bound of `pending_times`, if any.
1865                let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1866
1867                let activator = scope.activator_for(info.address);
1868                handle.activate(activator.clone());
1869
1870                move |(input, frontier), output| {
1871                    input.for_each(|cap, data| {
1872                        for time in data
1873                            .iter()
1874                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1875                        {
1876                            // `slack_ms == 0` means no rounding; otherwise round up to the next
1877                            // multiple of `slack_ms`. Avoids a divide-by-zero panic when the
1878                            // operator is configured without slack.
1879                            let rounded_time = if slack_ms == 0 {
1880                                time
1881                            } else {
1882                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms)
1883                            };
1884                            if !upper.less_than(&rounded_time.into()) {
1885                                pending_times.insert(rounded_time.into());
1886                            }
1887                        }
1888                        output.session(&cap).give_container(data);
1889                        if retained_cap.as_ref().is_none_or(|c| {
1890                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1891                        }) {
1892                            retained_cap = Some(cap.retain(0));
1893                        }
1894                    });
1895
1896                    handle.with_frontier(|f| {
1897                        while pending_times
1898                            .first()
1899                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1900                        {
1901                            let _ = pending_times.pop_first();
1902                        }
1903                    });
1904
1905                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1906                        let _ = pending_times.pop_first();
1907                    }
1908
1909                    match (retained_cap.as_mut(), pending_times.first()) {
1910                        (Some(cap), Some(first)) => cap.downgrade(first),
1911                        (_, None) => retained_cap = None,
1912                        _ => {}
1913                    }
1914
1915                    if frontier.is_empty() {
1916                        retained_cap = None;
1917                        pending_times.clear();
1918                    }
1919
1920                    if !pending_times.is_empty() {
1921                        tracing::debug!(
1922                            name,
1923                            info.global_id,
1924                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1925                            frontier = ?frontier.frontier().get(0),
1926                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1927                            ?upper,
1928                            "pending times",
1929                        );
1930                    }
1931                }
1932            });
1933        stream
1934    }
1935}
1936
1937/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1938/// the difference between timestamps.
1939struct PendingTimesDisplay<T>(T);
1940
1941impl<T> std::fmt::Display for PendingTimesDisplay<T>
1942where
1943    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1944{
1945    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1946        let mut iter = self.0.clone().into_iter();
1947        write!(f, "[")?;
1948        if let Some(first) = iter.next() {
1949            write!(f, "{}", first)?;
1950            let mut last = u64::from(first);
1951            for time in iter {
1952                write!(f, ", +{}", u64::from(time) - last)?;
1953                last = u64::from(time);
1954            }
1955        }
1956        write!(f, "]")?;
1957        Ok(())
1958    }
1959}
1960
1961/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1962/// into two rows, given the arity of the first component.
1963#[derive(Clone, Copy, Debug)]
1964struct Pairer {
1965    split_arity: usize,
1966}
1967
1968impl Pairer {
1969    /// Creates a pairer with knowledge of the arity of first component in the pair.
1970    fn new(split_arity: usize) -> Self {
1971        Self { split_arity }
1972    }
1973
1974    /// Merges a pair of datum iterators creating a `Row` instance.
1975    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1976    where
1977        I1: IntoIterator<Item = Datum<'a>>,
1978        I2: IntoIterator<Item = Datum<'a>>,
1979    {
1980        SharedRow::pack(first.into_iter().chain(second))
1981    }
1982
1983    /// Splits a datum iterator into a pair of `Row` instances.
1984    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1985        let mut datum_iter = datum_iter.into_iter();
1986        let mut row_builder = SharedRow::get();
1987        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1988        let second = row_builder.pack_using(datum_iter);
1989        (first, second)
1990    }
1991}