Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::Variable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    SUBSCRIBE_SNAPSHOT_OPTIMIZATION,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_timely_util::operator::{CollectionExt, StreamExt};
138use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
139use mz_timely_util::scope_label::ScopeExt;
140use timely::PartialOrder;
141use timely::container::CapacityContainerBuilder;
142use timely::dataflow::channels::pact::Pipeline;
143use timely::dataflow::operators::vec::ToStream;
144use timely::dataflow::operators::vec::{BranchWhen, Filter};
145use timely::dataflow::operators::{Capability, Operator, Probe, probe};
146use timely::dataflow::{Scope, Stream, StreamVec};
147use timely::order::{Product, TotalOrder};
148use timely::progress::timestamp::Refines;
149use timely::progress::{Antichain, Timestamp};
150use timely::scheduling::ActivateOnDrop;
151use timely::worker::Worker as TimelyWorker;
152
153use crate::arrangement::manager::TraceBundle;
154use crate::compute_state::ComputeState;
155use crate::extensions::arrange::{KeyCollection, MzArrange};
156use crate::extensions::reduce::MzReduce;
157use crate::extensions::temporal_bucket::TemporalBucketing;
158use crate::logging::compute::{
159    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
160};
161use crate::render::context::{ArrangementFlavor, Context};
162use crate::render::errors::DataflowErrorSer;
163use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
164use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
165
166pub mod context;
167pub(crate) mod errors;
168mod flat_map;
169mod join;
170mod reduce;
171pub mod sinks;
172mod threshold;
173mod top_k;
174
175pub use context::CollectionBundle;
176pub use join::LinearJoinSpec;
177
178/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
179///
180/// This method imports sources from provided assets, and then builds the remaining
181/// dataflow using "compute-local" assets like shared arrangements, and producing
182/// both arrangements and sinks.
183pub fn build_compute_dataflow(
184    timely_worker: &mut TimelyWorker,
185    compute_state: &mut ComputeState,
186    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
187    start_signal: StartSignal,
188    until: Antichain<mz_repr::Timestamp>,
189    dataflow_expiration: Antichain<mz_repr::Timestamp>,
190) {
191    // Mutually recursive view definitions require special handling.
192    let recursive = dataflow
193        .objects_to_build
194        .iter()
195        .any(|object| object.plan.is_recursive());
196
197    // Determine indexes to export, and their dependencies.
198    let indexes = dataflow
199        .index_exports
200        .iter()
201        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
202        .collect::<Vec<_>>();
203
204    // Determine sinks to export, and their dependencies.
205    let sinks = dataflow
206        .sink_exports
207        .iter()
208        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
209        .collect::<Vec<_>>();
210
211    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
212    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
213    let subscribe_snapshot_optimization =
214        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
215
216    let name = format!("Dataflow: {}", &dataflow.debug_name);
217    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
218    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
219
220    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
221        let scope = scope.with_label();
222
223        // The scope.clone() occurs to allow import in the region.
224        // We build a region here to establish a pattern of a scope inside the dataflow,
225        // so that other similar uses (e.g. with iterative scopes) do not require weird
226        // alternate type signatures.
227        let mut imported_sources = Vec::new();
228        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
229        let output_probe = MzProbeHandle::default();
230
231        scope.clone().region_named(&input_name, |region| {
232            // Import declared sources into the rendering context.
233            for (source_id, import) in dataflow.source_imports.iter() {
234                region.region_named(&format!("Source({:?})", source_id), |inner| {
235                    let mut read_schema = None;
236                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
237                        // If enabled, we read from Persist with a `RelationDesc` that
238                        // omits uneeded columns.
239                        if apply_demands {
240                            let demands = ops.demand();
241                            let new_desc = import
242                                .desc
243                                .storage_metadata
244                                .relation_desc
245                                .apply_demand(&demands);
246                            let new_arity = demands.len();
247                            let remap: BTreeMap<_, _> = demands
248                                .into_iter()
249                                .enumerate()
250                                .map(|(new, old)| (old, new))
251                                .collect();
252                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
253                            read_schema = Some(new_desc);
254                        }
255
256                        mz_expr::MfpPlan::create_from(ops)
257                            .expect("Linear operators should always be valid")
258                    });
259
260                    let snapshot_mode = if import.with_snapshot || !subscribe_snapshot_optimization
261                    {
262                        SnapshotMode::Include
263                    } else {
264                        compute_state.metrics.inc_subscribe_snapshot_optimization();
265                        SnapshotMode::Exclude
266                    };
267                    let suppress_early_progress_as_of = dataflow.as_of.clone();
268
269                    // Note: For correctness, we require that sources only emit times advanced by
270                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
271                    let (mut ok_stream, err_stream, token) =
272                        persist_source::persist_source::<DataflowErrorSer>(
273                            inner,
274                            *source_id,
275                            Arc::clone(&compute_state.persist_clients),
276                            &compute_state.txns_ctx,
277                            import.desc.storage_metadata.clone(),
278                            read_schema,
279                            dataflow.as_of.clone(),
280                            snapshot_mode,
281                            until.clone(),
282                            mfp.as_mut(),
283                            compute_state.dataflow_max_inflight_bytes(),
284                            start_signal.clone().into_send_future(),
285                            ErrorHandler::Halt("compute_import"),
286                        );
287
288                    // If `mfp` is non-identity, we need to apply what remains.
289                    // For the moment, assert that it is either trivial or `None`.
290                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
291
292                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
293                    // ensure that the first frontier we report into the dataflow is beyond the
294                    // `as_of`.
295                    if let Some(as_of) = suppress_early_progress_as_of {
296                        ok_stream = suppress_early_progress(ok_stream, as_of);
297                    }
298
299                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
300                        // Apply logical backpressure to the source.
301                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
302                            .get(&compute_state.worker_config);
303                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
304                            .get(&compute_state.worker_config)
305                            .as_millis()
306                            .try_into()
307                            .expect("must fit");
308
309                        let stream = ok_stream.limit_progress(
310                            output_probe.clone(),
311                            slack,
312                            limit,
313                            import.upper.clone(),
314                            name.clone(),
315                        );
316                        ok_stream = stream;
317                    }
318
319                    // Attach a probe reporting the input frontier.
320                    let input_probe =
321                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
322                    ok_stream = ok_stream.probe_with(&input_probe);
323
324                    let (oks, errs) = (
325                        ok_stream
326                            .as_collection()
327                            .leave_region(region)
328                            .leave_region(scope),
329                        err_stream
330                            .as_collection()
331                            .leave_region(region)
332                            .leave_region(scope),
333                    );
334
335                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
336
337                    // Associate returned tokens with the source identifier.
338                    tokens.insert(*source_id, Rc::new(token));
339                });
340            }
341        });
342
343        // If there exists a recursive expression, we'll need to use a non-region scope,
344        // in order to support additional timestamp coordinates for iteration.
345        if recursive {
346            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
347                let mut context = Context::for_dataflow_in(
348                    &dataflow,
349                    region.clone(),
350                    compute_state,
351                    until,
352                    dataflow_expiration,
353                );
354
355                for (id, (oks, errs)) in imported_sources.into_iter() {
356                    let bundle = crate::render::CollectionBundle::from_collections(
357                        oks.enter(region),
358                        errs.enter(region),
359                    );
360                    // Associate collection bundle with the source identifier.
361                    context.insert_id(id, bundle);
362                }
363
364                // Import declared indexes into the rendering context.
365                for (idx_id, idx) in &dataflow.index_imports {
366                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
367                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
368                        SnapshotMode::Include
369                    } else {
370                        compute_state.metrics.inc_subscribe_snapshot_optimization();
371                        SnapshotMode::Exclude
372                    };
373                    context.import_index(
374                        scope,
375                        compute_state,
376                        &mut tokens,
377                        input_probe,
378                        *idx_id,
379                        &idx.desc,
380                        &idx.typ,
381                        snapshot_mode,
382                        start_signal.clone(),
383                    );
384                }
385
386                // Build declared objects.
387                for object in dataflow.objects_to_build {
388                    let bundle = context.scope.clone().region_named(
389                        &format!("BuildingObject({:?})", object.id),
390                        |region| {
391                            let depends = object.plan.depends();
392                            let in_let = object.plan.is_recursive();
393                            context
394                                .enter_region(region, Some(&depends))
395                                .render_recursive_plan(
396                                    object.id,
397                                    0,
398                                    object.plan,
399                                    // recursive plans _must_ have bodies in a let
400                                    BindingInfo::Body { in_let },
401                                )
402                                .leave_region(context.scope)
403                        },
404                    );
405                    let global_id = object.id;
406
407                    context.log_dataflow_global_id(
408                        *bundle
409                            .scope()
410                            .addr()
411                            .first()
412                            .expect("Dataflow root id must exist"),
413                        global_id,
414                    );
415                    context.insert_id(Id::Global(object.id), bundle);
416                }
417
418                // Export declared indexes.
419                for (idx_id, dependencies, idx) in indexes {
420                    context.export_index_iterative(
421                        scope,
422                        compute_state,
423                        &tokens,
424                        dependencies,
425                        idx_id,
426                        &idx,
427                        &output_probe,
428                    );
429                }
430
431                // Export declared sinks.
432                for (sink_id, dependencies, sink) in sinks {
433                    context.export_sink(
434                        compute_state,
435                        &tokens,
436                        dependencies,
437                        sink_id,
438                        &sink,
439                        start_signal.clone(),
440                        &output_probe,
441                        scope,
442                    );
443                }
444            });
445        } else {
446            scope.clone().region_named(&build_name, |region| {
447                let mut context = Context::for_dataflow_in(
448                    &dataflow,
449                    region.clone(),
450                    compute_state,
451                    until,
452                    dataflow_expiration,
453                );
454
455                for (id, (oks, errs)) in imported_sources.into_iter() {
456                    let bundle = crate::render::CollectionBundle::from_collections(
457                        oks.enter_region(region),
458                        errs.enter_region(region),
459                    );
460                    // Associate collection bundle with the source identifier.
461                    context.insert_id(id, bundle);
462                }
463
464                // Import declared indexes into the rendering context.
465                for (idx_id, idx) in &dataflow.index_imports {
466                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
467                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
468                        SnapshotMode::Include
469                    } else {
470                        compute_state.metrics.inc_subscribe_snapshot_optimization();
471                        SnapshotMode::Exclude
472                    };
473                    context.import_index(
474                        scope,
475                        compute_state,
476                        &mut tokens,
477                        input_probe,
478                        *idx_id,
479                        &idx.desc,
480                        &idx.typ,
481                        snapshot_mode,
482                        start_signal.clone(),
483                    );
484                }
485
486                // Build declared objects.
487                for object in dataflow.objects_to_build {
488                    let bundle = context.scope.clone().region_named(
489                        &format!("BuildingObject({:?})", object.id),
490                        |region| {
491                            let depends = object.plan.depends();
492                            context
493                                .enter_region(region, Some(&depends))
494                                .render_plan(object.id, object.plan)
495                                .leave_region(context.scope)
496                        },
497                    );
498                    let global_id = object.id;
499                    context.log_dataflow_global_id(
500                        *bundle
501                            .scope()
502                            .addr()
503                            .first()
504                            .expect("Dataflow root id must exist"),
505                        global_id,
506                    );
507                    context.insert_id(Id::Global(object.id), bundle);
508                }
509
510                // Export declared indexes.
511                for (idx_id, dependencies, idx) in indexes {
512                    context.export_index(
513                        compute_state,
514                        &tokens,
515                        dependencies,
516                        idx_id,
517                        &idx,
518                        &output_probe,
519                    );
520                }
521
522                // Export declared sinks.
523                for (sink_id, dependencies, sink) in sinks {
524                    context.export_sink(
525                        compute_state,
526                        &tokens,
527                        dependencies,
528                        sink_id,
529                        &sink,
530                        start_signal.clone(),
531                        &output_probe,
532                        scope,
533                    );
534                }
535            });
536        }
537    });
538}
539
540// This implementation block allows child timestamps to vary from parent timestamps,
541// but requires the parent timestamp to be `repr::Timestamp`.
542impl<'g, T> Context<'g, T>
543where
544    T: Refines<mz_repr::Timestamp> + RenderTimestamp,
545{
546    /// Import the collection from the arrangement, discarding batches from the snapshot.
547    /// (This does not guarantee that no records from the snapshot are included; the assumption is
548    /// that we'll filter those out later if necessary.)
549    fn import_filtered_index_collection<
550        'outer,
551        Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
552        V: Data,
553    >(
554        &self,
555        arranged: Arranged<'outer, Tr>,
556        start_signal: StartSignal,
557        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
558    ) -> VecCollection<'g, T, V, Tr::Diff>
559    where
560        // This is implied by the fact that the outer timestamp = mz_repr::Timestamp, but it's essential
561        // for our batch-level filtering to be safe, so we document it here regardless.
562        mz_repr::Timestamp: TotalOrder,
563    {
564        let oks = arranged.stream.with_start_signal(start_signal).filter({
565            let as_of = self.as_of_frontier.clone();
566            move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
567        });
568        Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
569    }
570
571    pub(crate) fn import_index<'outer>(
572        &mut self,
573        outer: Scope<'outer, mz_repr::Timestamp>,
574        compute_state: &mut ComputeState,
575        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
576        input_probe: probe::Handle<mz_repr::Timestamp>,
577        idx_id: GlobalId,
578        idx: &IndexDesc,
579        typ: &ReprRelationType,
580        snapshot_mode: SnapshotMode,
581        start_signal: StartSignal,
582    ) {
583        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
584            assert!(
585                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
586                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
587            );
588
589            let token = traces.to_drop().clone();
590
591            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
592                outer,
593                &format!("Index({}, {:?})", idx.on_id, idx.key),
594                self.as_of_frontier.clone(),
595                self.until.clone(),
596            );
597
598            oks.stream = oks.stream.probe_with(&input_probe);
599
600            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
601                outer,
602                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
603                self.as_of_frontier.clone(),
604                self.until.clone(),
605            );
606
607            let bundle = match snapshot_mode {
608                SnapshotMode::Include => {
609                    let ok_arranged = oks
610                        .enter(self.scope)
611                        .with_start_signal(start_signal.clone());
612                    let err_arranged = err_arranged
613                        .enter(self.scope)
614                        .with_start_signal(start_signal);
615                    CollectionBundle::from_expressions(
616                        idx.key.clone(),
617                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
618                    )
619                }
620                SnapshotMode::Exclude => {
621                    // When we import an index without a snapshot, we have two balancing considerations:
622                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
623                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
624                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
625                    //   (joins, reduces, etc.) also require the snapshot data.
626                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
627                    let oks = {
628                        let mut datums = DatumVec::new();
629                        let (permutation, _thinning) =
630                            permutation_for_arrangement(&idx.key, typ.arity());
631                        self.import_filtered_index_collection(
632                            oks,
633                            start_signal.clone(),
634                            move |k: DatumSeq, v: DatumSeq| {
635                                let mut datums_borrow = datums.borrow();
636                                datums_borrow.extend(k);
637                                datums_borrow.extend(v);
638                                SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
639                            },
640                        )
641                    };
642                    let errs = self.import_filtered_index_collection(
643                        err_arranged,
644                        start_signal,
645                        |e, _| e.clone(),
646                    );
647                    CollectionBundle::from_collections(oks, errs)
648                }
649            };
650            self.update_id(Id::Global(idx.on_id), bundle);
651            tokens.insert(
652                idx_id,
653                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
654            );
655        } else {
656            panic!(
657                "import of index {} failed while building dataflow {}",
658                idx_id, self.dataflow_id
659            );
660        }
661    }
662}
663
664// This implementation block requires the scopes have the same timestamp as the trace manager.
665// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
666impl<'g> Context<'g, mz_repr::Timestamp> {
667    pub(crate) fn export_index(
668        &self,
669        compute_state: &mut ComputeState,
670        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
671        dependency_ids: BTreeSet<GlobalId>,
672        idx_id: GlobalId,
673        idx: &IndexDesc,
674        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
675    ) {
676        // put together tokens that belong to the export
677        let mut needed_tokens = Vec::new();
678        for dep_id in dependency_ids {
679            if let Some(token) = tokens.get(&dep_id) {
680                needed_tokens.push(Rc::clone(token));
681            }
682        }
683        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
684            panic!(
685                "Arrangement alarmingly absent! id: {:?}",
686                Id::Global(idx_id)
687            )
688        });
689
690        match bundle.arrangement(&idx.key) {
691            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
692                // Ensure that the frontier does not advance past the expiration time, if set.
693                // Otherwise, we might write down incorrect data.
694                if let Some(&expiration) = self.dataflow_expiration.as_option() {
695                    oks.stream = oks.stream.expire_stream_at(
696                        &format!("{}_export_index_oks", self.debug_name),
697                        expiration,
698                    );
699                    errs.stream = errs.stream.expire_stream_at(
700                        &format!("{}_export_index_errs", self.debug_name),
701                        expiration,
702                    );
703                }
704
705                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
706
707                // Attach logging of dataflow errors.
708                if let Some(logger) = compute_state.compute_logger.clone() {
709                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
710                }
711
712                compute_state.traces.set(
713                    idx_id,
714                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
715                );
716            }
717            Some(ArrangementFlavor::Trace(gid, _, _)) => {
718                // Duplicate of existing arrangement with id `gid`, so
719                // just create another handle to that arrangement.
720                let trace = compute_state.traces.get(&gid).unwrap().clone();
721                compute_state.traces.set(idx_id, trace);
722            }
723            None => {
724                println!("collection available: {:?}", bundle.collection.is_none());
725                println!(
726                    "keys available: {:?}",
727                    bundle.arranged.keys().collect::<Vec<_>>()
728                );
729                panic!(
730                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
731                    Id::Global(idx_id),
732                    &idx.key
733                );
734            }
735        };
736    }
737}
738
739// This implementation block requires the scopes have the same timestamp as the trace manager.
740// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
741impl<'g, T> Context<'g, T>
742where
743    T: RenderTimestamp,
744{
745    pub(crate) fn export_index_iterative<'outer>(
746        &self,
747        outer: Scope<'outer, mz_repr::Timestamp>,
748        compute_state: &mut ComputeState,
749        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
750        dependency_ids: BTreeSet<GlobalId>,
751        idx_id: GlobalId,
752        idx: &IndexDesc,
753        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
754    ) {
755        // put together tokens that belong to the export
756        let mut needed_tokens = Vec::new();
757        for dep_id in dependency_ids {
758            if let Some(token) = tokens.get(&dep_id) {
759                needed_tokens.push(Rc::clone(token));
760            }
761        }
762        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
763            panic!(
764                "Arrangement alarmingly absent! id: {:?}",
765                Id::Global(idx_id)
766            )
767        });
768
769        match bundle.arrangement(&idx.key) {
770            Some(ArrangementFlavor::Local(oks, errs)) => {
771                // TODO: The following as_collection/leave/arrange sequence could be optimized.
772                //   * Combine as_collection and leave into a single function.
773                //   * Use columnar to extract columns from the batches to implement leave.
774                let mut oks = oks
775                    .as_collection(|k, v| (k.to_row(), v.to_row()))
776                    .leave(outer)
777                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
778                        "Arrange export iterative",
779                    );
780
781                let mut errs = errs
782                    .as_collection(|k, v| (k.clone(), v.clone()))
783                    .leave(outer)
784                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
785                        "Arrange export iterative err",
786                    );
787
788                // Ensure that the frontier does not advance past the expiration time, if set.
789                // Otherwise, we might write down incorrect data.
790                if let Some(&expiration) = self.dataflow_expiration.as_option() {
791                    oks.stream = oks.stream.expire_stream_at(
792                        &format!("{}_export_index_iterative_oks", self.debug_name),
793                        expiration,
794                    );
795                    errs.stream = errs.stream.expire_stream_at(
796                        &format!("{}_export_index_iterative_err", self.debug_name),
797                        expiration,
798                    );
799                }
800
801                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
802
803                // Attach logging of dataflow errors.
804                if let Some(logger) = compute_state.compute_logger.clone() {
805                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
806                }
807
808                compute_state.traces.set(
809                    idx_id,
810                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
811                );
812            }
813            Some(ArrangementFlavor::Trace(gid, _, _)) => {
814                // Duplicate of existing arrangement with id `gid`, so
815                // just create another handle to that arrangement.
816                let trace = compute_state.traces.get(&gid).unwrap().clone();
817                compute_state.traces.set(idx_id, trace);
818            }
819            None => {
820                println!("collection available: {:?}", bundle.collection.is_none());
821                println!(
822                    "keys available: {:?}",
823                    bundle.arranged.keys().collect::<Vec<_>>()
824                );
825                panic!(
826                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
827                    Id::Global(idx_id),
828                    &idx.key
829                );
830            }
831        };
832    }
833}
834
835/// Information about bindings, tracked in `render_recursive_plan` and
836/// `render_plan`, to be passed to `render_letfree_plan`.
837///
838/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
839/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
840enum BindingInfo {
841    Body { in_let: bool },
842    Let { id: LocalId, last: bool },
843    LetRec { id: LocalId, last: bool },
844}
845
846impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
847    /// Renders a plan to a differential dataflow, producing the collection of results.
848    ///
849    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
850    /// in the context of `level` pre-existing iteration coordinates.
851    ///
852    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
853    /// and establishing the appropriate recursive dependencies among the bound variables.
854    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
855    /// further [`RecBind`]s are found.
856    ///
857    /// The method requires that all variables conclude with a physical representation that
858    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
859    fn render_recursive_plan(
860        &mut self,
861        object_id: GlobalId,
862        level: usize,
863        plan: RenderPlan,
864        binding: BindingInfo,
865    ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
866        for BindStage { lets, recs } in plan.binds {
867            // Render the let bindings in order.
868            let mut let_iter = lets.into_iter().peekable();
869            while let Some(LetBind { id, value }) = let_iter.next() {
870                let bundle =
871                    self.scope
872                        .clone()
873                        .region_named(&format!("Binding({:?})", id), |region| {
874                            let depends = value.depends();
875                            let last = let_iter.peek().is_none();
876                            let binding = BindingInfo::Let { id, last };
877                            self.enter_region(region, Some(&depends))
878                                .render_letfree_plan(object_id, value, binding)
879                                .leave_region(self.scope)
880                        });
881                self.insert_id(Id::Local(id), bundle);
882            }
883
884            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
885
886            // Define variables for rec bindings.
887            // It is important that we only use the `Variable` until the object is bound.
888            // At that point, all subsequent uses should have access to the object itself.
889            let mut variables = BTreeMap::new();
890            for id in rec_ids.iter() {
891                use differential_dataflow::dynamic::feedback_summary;
892                let inner = feedback_summary::<u64>(level + 1, 1);
893                let (oks_v, oks_collection) =
894                    Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
895                let (err_v, err_collection) =
896                    Variable::new(self.scope, Product::new(Default::default(), inner));
897
898                self.insert_id(
899                    Id::Local(*id),
900                    CollectionBundle::from_collections(oks_collection, err_collection),
901                );
902                variables.insert(Id::Local(*id), (oks_v, err_v));
903            }
904            // Now render each of the rec bindings.
905            let mut rec_iter = recs.into_iter().peekable();
906            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
907                let last = rec_iter.peek().is_none();
908                let binding = BindingInfo::LetRec { id, last };
909                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
910                // We need to ensure that the raw collection exists, but do not have enough information
911                // here to cause that to happen.
912                let (oks, mut err) = bundle.collection.clone().unwrap();
913                self.insert_id(Id::Local(id), bundle);
914                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
915
916                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
917                let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
918                    oks,
919                    "LetRecConsolidation",
920                );
921
922                if let Some(limit) = limit {
923                    // We swallow the results of the `max_iter`th iteration, because
924                    // these results would go into the `max_iter + 1`th iteration.
925                    let (in_limit, over_limit) =
926                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
927                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
928                            let iteration_index = *ps.get(level).unwrap_or(&0);
929                            // The pointstamp starts counting from 0, so we need to add 1.
930                            iteration_index + 1 >= limit.max_iters.into()
931                        });
932                    oks = VecCollection::new(in_limit);
933                    if !limit.return_at_limit {
934                        err = err.concat(VecCollection::new(over_limit).map(move |_data| {
935                            DataflowErrorSer::from(EvalError::LetRecLimitExceeded(
936                                format!("{}", limit.max_iters.get()).into(),
937                            ))
938                        }));
939                    }
940                }
941
942                // Set err variable to the distinct elements of `err`.
943                // Distinctness is important, as we otherwise might add the same error each iteration,
944                // say if the limit of `oks` has an error. This would result in non-terminating rather
945                // than a clean report of the error. The trade-off is that we lose information about
946                // multiplicities of errors, but .. this seems to be the better call.
947                let err: KeyCollection<_, _, _> = err.into();
948                let errs = err
949                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
950                        "Arrange recursive err",
951                    )
952                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
953                        "Distinct recursive err",
954                        move |_k, _s, t| t.push(((), Diff::ONE)),
955                    )
956                    .as_collection(|k, _| k.clone());
957
958                oks_v.set(oks);
959                err_v.set(errs);
960            }
961            // Now extract each of the rec bindings into the outer scope.
962            for id in rec_ids.into_iter() {
963                let bundle = self.remove_id(Id::Local(id)).unwrap();
964                let (oks, err) = bundle.collection.unwrap();
965                self.insert_id(
966                    Id::Local(id),
967                    CollectionBundle::from_collections(
968                        oks.leave_dynamic(level + 1),
969                        err.leave_dynamic(level + 1),
970                    ),
971                );
972            }
973        }
974
975        self.render_letfree_plan(object_id, plan.body, binding)
976    }
977}
978
979impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
980    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
981    /// results.
982    ///
983    /// The return type reflects the uncertainty about the data representation, perhaps
984    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
985    ///
986    /// # Panics
987    ///
988    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
989    /// `render_recursive_plan` instead.
990    fn render_plan(
991        &mut self,
992        object_id: GlobalId,
993        plan: RenderPlan,
994    ) -> CollectionBundle<'scope, T> {
995        let mut in_let = false;
996        for BindStage { lets, recs } in plan.binds {
997            assert!(recs.is_empty());
998
999            let mut let_iter = lets.into_iter().peekable();
1000            while let Some(LetBind { id, value }) = let_iter.next() {
1001                // if we encounter a single let, the body is in a let
1002                in_let = true;
1003                let bundle =
1004                    self.scope
1005                        .clone()
1006                        .region_named(&format!("Binding({:?})", id), |region| {
1007                            let depends = value.depends();
1008                            let last = let_iter.peek().is_none();
1009                            let binding = BindingInfo::Let { id, last };
1010                            self.enter_region(region, Some(&depends))
1011                                .render_letfree_plan(object_id, value, binding)
1012                                .leave_region(self.scope)
1013                        });
1014                self.insert_id(Id::Local(id), bundle);
1015            }
1016        }
1017
1018        self.scope.clone().region_named("Main Body", |region| {
1019            let depends = plan.body.depends();
1020            self.enter_region(region, Some(&depends))
1021                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1022                .leave_region(self.scope)
1023        })
1024    }
1025
1026    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1027    fn render_letfree_plan(
1028        &self,
1029        object_id: GlobalId,
1030        plan: LetFreePlan,
1031        binding: BindingInfo,
1032    ) -> CollectionBundle<'scope, T> {
1033        let (mut nodes, root_id, topological_order) = plan.destruct();
1034
1035        // Rendered collections by their `LirId`.
1036        let mut collections = BTreeMap::new();
1037
1038        // Mappings to send along.
1039        // To save overhead, we'll only compute mappings when we need to,
1040        // which means things get gated behind options. Unfortunately, that means we
1041        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1042        // but there's no convenient way to express the invariant.
1043        let should_compute_lir_metadata = self.compute_logger.is_some();
1044        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1045            Some(Vec::with_capacity(nodes.len()))
1046        } else {
1047            None
1048        };
1049
1050        let mut topo_iter = topological_order.into_iter().peekable();
1051        while let Some(lir_id) = topo_iter.next() {
1052            let node = nodes.remove(&lir_id).unwrap();
1053
1054            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1055            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1056            // in some other structure and have that structure impl ExprHumanizer
1057            let metadata = if should_compute_lir_metadata {
1058                let operator = node.expr.humanize(&DummyHumanizer);
1059
1060                // mark the last operator in topo order with any binding decoration
1061                let operator = if topo_iter.peek().is_none() {
1062                    match &binding {
1063                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1064                        BindingInfo::Body { in_let: false } => operator,
1065                        BindingInfo::Let { id, last: true } => {
1066                            format!("With {id} = {operator}")
1067                        }
1068                        BindingInfo::Let { id, last: false } => {
1069                            format!("{id} = {operator}")
1070                        }
1071                        BindingInfo::LetRec { id, last: true } => {
1072                            format!("With Recursive {id} = {operator}")
1073                        }
1074                        BindingInfo::LetRec { id, last: false } => {
1075                            format!("{id} = {operator}")
1076                        }
1077                    }
1078                } else {
1079                    operator
1080                };
1081
1082                let operator_id_start = self.scope.worker().peek_identifier();
1083                Some((operator, operator_id_start))
1084            } else {
1085                None
1086            };
1087
1088            let mut bundle = self.render_plan_expr(node.expr, &collections);
1089
1090            if let Some((operator, operator_id_start)) = metadata {
1091                let operator_id_end = self.scope.worker().peek_identifier();
1092                let operator_span = (operator_id_start, operator_id_end);
1093
1094                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1095                    lir_mapping_metadata.push((
1096                        lir_id,
1097                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1098                    ))
1099                }
1100            }
1101
1102            self.log_operator_hydration(&mut bundle, lir_id);
1103
1104            collections.insert(lir_id, bundle);
1105        }
1106
1107        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1108            self.log_lir_mapping(object_id, lir_mapping_metadata);
1109        }
1110
1111        collections
1112            .remove(&root_id)
1113            .expect("LetFreePlan invariant (1)")
1114    }
1115
1116    /// Renders a [`render_plan::Expr`], producing the collection of results.
1117    ///
1118    /// # Panics
1119    ///
1120    /// Panics if any of the expr's inputs is not found in `collections`.
1121    /// Callers must ensure that input nodes have been rendered previously.
1122    fn render_plan_expr(
1123        &self,
1124        expr: render_plan::Expr,
1125        collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1126    ) -> CollectionBundle<'scope, T> {
1127        use render_plan::Expr::*;
1128
1129        let expect_input = |id| {
1130            collections
1131                .get(&id)
1132                .cloned()
1133                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1134        };
1135
1136        match expr {
1137            Constant { rows } => {
1138                // Produce both rows and errs to avoid conditional dataflow construction.
1139                let (rows, errs) = match rows {
1140                    Ok(rows) => (rows, Vec::new()),
1141                    Err(e) => (Vec::new(), vec![e]),
1142                };
1143
1144                // We should advance times in constant collections to start from `as_of`.
1145                let as_of_frontier = self.as_of_frontier.clone();
1146                let until = self.until.clone();
1147                let ok_collection = rows
1148                    .into_iter()
1149                    .filter_map(move |(row, mut time, diff)| {
1150                        time.advance_by(as_of_frontier.borrow());
1151                        if !until.less_equal(&time) {
1152                            Some((
1153                                row,
1154                                <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1155                                diff,
1156                            ))
1157                        } else {
1158                            None
1159                        }
1160                    })
1161                    .to_stream(self.scope)
1162                    .as_collection();
1163
1164                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1165                error_time.advance_by(self.as_of_frontier.borrow());
1166                let err_collection = errs
1167                    .into_iter()
1168                    .map(move |e| {
1169                        (
1170                            DataflowErrorSer::from(e),
1171                            <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1172                            Diff::ONE,
1173                        )
1174                    })
1175                    .to_stream(self.scope)
1176                    .as_collection();
1177
1178                CollectionBundle::from_collections(ok_collection, err_collection)
1179            }
1180            Get { id, keys, plan } => {
1181                // Recover the collection from `self` and then apply `mfp` to it.
1182                // If `mfp` happens to be trivial, we can just return the collection.
1183                let mut collection = self
1184                    .lookup_id(id)
1185                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1186                match plan {
1187                    mz_compute_types::plan::GetPlan::PassArrangements => {
1188                        // Assert that each of `keys` are present in `collection`.
1189                        assert!(
1190                            keys.arranged
1191                                .iter()
1192                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1193                        );
1194                        assert!(keys.raw <= collection.collection.is_some());
1195                        // Retain only those keys we want to import.
1196                        collection.arranged.retain(|key, _value| {
1197                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1198                        });
1199                        collection
1200                    }
1201                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1202                        let (oks, errs) = collection.as_collection_core(
1203                            mfp,
1204                            Some((key, row)),
1205                            self.until.clone(),
1206                            &self.config_set,
1207                        );
1208                        CollectionBundle::from_collections(oks, errs)
1209                    }
1210                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1211                        let (oks, errs) = collection.as_collection_core(
1212                            mfp,
1213                            None,
1214                            self.until.clone(),
1215                            &self.config_set,
1216                        );
1217                        CollectionBundle::from_collections(oks, errs)
1218                    }
1219                }
1220            }
1221            Mfp {
1222                input,
1223                mfp,
1224                input_key_val,
1225            } => {
1226                let input = expect_input(input);
1227                // If `mfp` is non-trivial, we should apply it and produce a collection.
1228                if mfp.is_identity() {
1229                    input
1230                } else {
1231                    let (oks, errs) = input.as_collection_core(
1232                        mfp,
1233                        input_key_val,
1234                        self.until.clone(),
1235                        &self.config_set,
1236                    );
1237                    CollectionBundle::from_collections(oks, errs)
1238                }
1239            }
1240            FlatMap {
1241                input_key,
1242                input,
1243                exprs,
1244                func,
1245                mfp_after: mfp,
1246            } => {
1247                let input = expect_input(input);
1248                self.render_flat_map(input_key, input, exprs, func, mfp)
1249            }
1250            Join { inputs, plan } => {
1251                let inputs = inputs.into_iter().map(expect_input).collect();
1252                match plan {
1253                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1254                        self.render_join(inputs, linear_plan)
1255                    }
1256                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1257                        self.render_delta_join(inputs, delta_plan)
1258                    }
1259                }
1260            }
1261            Reduce {
1262                input_key,
1263                input,
1264                key_val_plan,
1265                plan,
1266                mfp_after,
1267            } => {
1268                let input = expect_input(input);
1269                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1270                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1271            }
1272            TopK { input, top_k_plan } => {
1273                let input = expect_input(input);
1274                self.render_topk(input, top_k_plan)
1275            }
1276            Negate { input } => {
1277                let input = expect_input(input);
1278                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1279                CollectionBundle::from_collections(oks.negate(), errs)
1280            }
1281            Threshold {
1282                input,
1283                threshold_plan,
1284            } => {
1285                let input = expect_input(input);
1286                self.render_threshold(input, threshold_plan)
1287            }
1288            Union {
1289                inputs,
1290                consolidate_output,
1291            } => {
1292                let mut oks = Vec::new();
1293                let mut errs = Vec::new();
1294                for input in inputs.into_iter() {
1295                    let (os, es) =
1296                        expect_input(input).as_specific_collection(None, &self.config_set);
1297                    oks.push(os);
1298                    errs.push(es);
1299                }
1300                let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1301                if consolidate_output {
1302                    oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1303                        oks,
1304                        "UnionConsolidation",
1305                    )
1306                }
1307                let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1308                CollectionBundle::from_collections(oks, errs)
1309            }
1310            ArrangeBy {
1311                input_key,
1312                input,
1313                input_mfp,
1314                forms: keys,
1315                strategy,
1316            } => {
1317                let input = expect_input(input);
1318                input.ensure_collections(
1319                    keys,
1320                    input_key,
1321                    input_mfp,
1322                    self.as_of_frontier.clone(),
1323                    self.until.clone(),
1324                    &self.config_set,
1325                    strategy,
1326                )
1327            }
1328        }
1329    }
1330
1331    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1332        if let Some(logger) = &self.compute_logger {
1333            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1334                dataflow_index,
1335                global_id,
1336            }));
1337        }
1338    }
1339
1340    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1341        if let Some(logger) = &self.compute_logger {
1342            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1343        }
1344    }
1345
1346    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1347        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1348        // which we should attach the logging operator.
1349        //
1350        // We could attach to each collection and track the lower bound of output frontiers.
1351        // However, that would be of limited use because we expect all collections to hydrate at
1352        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1353        // frontier of the unarranged collection advance, it will perform all work necessary to
1354        // also advance its own frontier. We don't expect significant delays between frontier
1355        // advancements of the unarranged and arranged collections, so attaching the logging
1356        // operator to any one of them should produce accurate results.
1357        //
1358        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1359        // beneficial to attach the logging operator to one of the arranged representation to avoid
1360        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1361        // if we attached the logging operator to it, we would introduce a fork in its output
1362        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1363        // that the output streams of the arrangements don't yet feed into anything else, so
1364        // attaching a (pass-through) logging operator does not introduce a fork.
1365
1366        match bundle.arranged.values_mut().next() {
1367            Some(arrangement) => {
1368                use ArrangementFlavor::*;
1369
1370                match arrangement {
1371                    Local(a, _) => {
1372                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1373                    }
1374                    Trace(_, a, _) => {
1375                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1376                    }
1377                }
1378            }
1379            None => {
1380                let (oks, _) = bundle
1381                    .collection
1382                    .as_mut()
1383                    .expect("CollectionBundle invariant");
1384                let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1385                *oks = stream.as_collection();
1386            }
1387        }
1388    }
1389
1390    fn log_operator_hydration_inner<D>(
1391        &self,
1392        stream: Stream<'scope, T, D>,
1393        lir_id: LirId,
1394    ) -> Stream<'scope, T, D>
1395    where
1396        D: timely::Container + Clone + 'static,
1397    {
1398        let Some(logger) = self.compute_logger.clone() else {
1399            return stream.clone(); // hydration logging disabled
1400        };
1401
1402        let export_ids = self.export_ids.clone();
1403
1404        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1405        //
1406        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1407        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1408        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1409        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1410        // instead.
1411        let mut hydration_frontier = Antichain::new();
1412        for time in self.as_of_frontier.iter() {
1413            if let Some(time) = time.try_step_forward() {
1414                hydration_frontier.insert(Refines::to_inner(time));
1415            }
1416        }
1417
1418        let name = format!("LogOperatorHydration ({lir_id})");
1419        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1420            let mut hydrated = false;
1421
1422            for &export_id in &export_ids {
1423                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1424                    export_id,
1425                    lir_id,
1426                    hydrated,
1427                }));
1428            }
1429
1430            move |(input, frontier), output| {
1431                // Pass through inputs.
1432                input.for_each(|cap, data| {
1433                    output.session(&cap).give_container(data);
1434                });
1435
1436                if hydrated {
1437                    return;
1438                }
1439
1440                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1441                    hydrated = true;
1442
1443                    for &export_id in &export_ids {
1444                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1445                            export_id,
1446                            lir_id,
1447                            hydrated,
1448                        }));
1449                    }
1450                }
1451            }
1452        })
1453    }
1454}
1455
1456#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1457/// A timestamp type that can be used for operations within MZ's dataflow layer.
1458pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1459    /// The system timestamp component of the timestamp.
1460    ///
1461    /// This is useful for manipulating the system time, as when delaying
1462    /// updates for subsequent cancellation, as with monotonic reduction.
1463    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1464    /// Effects a system delay in terms of the timestamp summary.
1465    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1466    /// The event timestamp component of the timestamp.
1467    fn event_time(&self) -> mz_repr::Timestamp;
1468    /// The event timestamp component of the timestamp, as a mutable reference.
1469    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1470    /// Effects an event delay in terms of the timestamp summary.
1471    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1472    /// Steps the timestamp back so that logical compaction to the output will
1473    /// not conflate `self` with any historical times.
1474    fn step_back(&self) -> Self;
1475}
1476
1477/// Apply temporal bucketing to a stream when the timestamp type supports it.
1478///
1479/// Sibling to [`RenderTimestamp`]: bucketing is an arrangement-time concern, not a
1480/// general property of a render timestamp, so the dispatch lives in its own trait.
1481/// Total-ordered timestamps perform real bucketing; partially-ordered timestamps
1482/// (e.g. `Product<…>` in iterative scopes) implement this as a no-op.
1483pub trait MaybeBucketByTime: Timestamp {
1484    fn maybe_apply_temporal_bucketing<'scope>(
1485        stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
1486        as_of: Antichain<mz_repr::Timestamp>,
1487        summary: mz_repr::Timestamp,
1488    ) -> VecCollection<'scope, Self, Row, Diff>;
1489}
1490
1491impl RenderTimestamp for mz_repr::Timestamp {
1492    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1493        self
1494    }
1495    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1496        delay
1497    }
1498    fn event_time(&self) -> mz_repr::Timestamp {
1499        *self
1500    }
1501    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1502        self
1503    }
1504    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1505        delay
1506    }
1507    fn step_back(&self) -> Self {
1508        self.saturating_sub(1)
1509    }
1510}
1511
1512impl MaybeBucketByTime for mz_repr::Timestamp {
1513    fn maybe_apply_temporal_bucketing<'scope>(
1514        stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
1515        as_of: Antichain<mz_repr::Timestamp>,
1516        summary: mz_repr::Timestamp,
1517    ) -> VecCollection<'scope, Self, Row, Diff> {
1518        stream
1519            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
1520            .as_collection()
1521    }
1522}
1523
1524impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1525    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1526        &mut self.outer
1527    }
1528    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1529        Product::new(delay, Default::default())
1530    }
1531    fn event_time(&self) -> mz_repr::Timestamp {
1532        self.outer
1533    }
1534    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1535        &mut self.outer
1536    }
1537    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1538        Product::new(delay, Default::default())
1539    }
1540    fn step_back(&self) -> Self {
1541        // It is necessary to step back both coordinates of a product,
1542        // and when one is a `PointStamp` that also means all coordinates
1543        // of the pointstamp.
1544        let inner = self.inner.clone();
1545        let mut vec = inner.into_inner();
1546        for item in vec.iter_mut() {
1547            *item = item.saturating_sub(1);
1548        }
1549        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1550    }
1551}
1552
1553impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
1554    fn maybe_apply_temporal_bucketing<'scope>(
1555        stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
1556        _as_of: Antichain<mz_repr::Timestamp>,
1557        _summary: mz_repr::Timestamp,
1558    ) -> VecCollection<'scope, Self, Row, Diff> {
1559        // TODO: Implement bucketing on outer timestamp for iterative scopes.
1560        stream.as_collection()
1561    }
1562}
1563
1564/// A signal that can be awaited by operators to suspend them prior to startup.
1565///
1566/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1567///
1568/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1569///
1570///  * Async operators can simply `await` it.
1571///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1572///    and then check `StartSignal::has_fired()` on each activation.
1573#[derive(Clone)]
1574pub(crate) struct StartSignal {
1575    /// A future that completes when the signal fires.
1576    ///
1577    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1578    /// signal is activated by dropping the corresponding `Sender`.
1579    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1580    /// A weak reference to the token, to register drop-on-fire values.
1581    token_ref: Weak<RefCell<Box<dyn Any>>>,
1582}
1583
1584impl StartSignal {
1585    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1586    /// dropped.
1587    pub fn new() -> (Self, Rc<dyn Any>) {
1588        let (tx, rx) = oneshot::channel::<Infallible>();
1589        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1590        let signal = Self {
1591            fut: rx.shared(),
1592            token_ref: Rc::downgrade(&token),
1593        };
1594        (signal, token)
1595    }
1596
1597    pub fn has_fired(&self) -> bool {
1598        self.token_ref.strong_count() == 0
1599    }
1600
1601    /// Returns a Send-safe future that completes when the signal fires.
1602    ///
1603    /// Unlike `StartSignal` itself, the returned future does not retain a reference to the token,
1604    /// so it cannot be used for `drop_on_fire` or `has_fired` checks.
1605    pub fn into_send_future(self) -> impl Future<Output = ()> + Send {
1606        use futures::FutureExt;
1607        self.fut.map(|_| ())
1608    }
1609
1610    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1611        if let Some(token) = self.token_ref.upgrade() {
1612            let mut token = token.borrow_mut();
1613            let inner = std::mem::replace(&mut *token, Box::new(()));
1614            *token = Box::new((inner, to_drop));
1615        }
1616    }
1617}
1618
1619impl Future for StartSignal {
1620    type Output = ();
1621
1622    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1623        self.fut.poll_unpin(cx).map(|_| ())
1624    }
1625}
1626
1627/// Extension trait to attach a `StartSignal` to operator outputs.
1628pub(crate) trait WithStartSignal {
1629    /// Delays data and progress updates until the start signal has fired.
1630    ///
1631    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1632    /// depending on the amount and shape of its inputs.
1633    fn with_start_signal(self, signal: StartSignal) -> Self;
1634}
1635
1636impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1637where
1638    Tr: TraceReader<Time: RenderTimestamp> + Clone,
1639{
1640    fn with_start_signal(self, signal: StartSignal) -> Self {
1641        Arranged {
1642            stream: self.stream.with_start_signal(signal),
1643            trace: self.trace,
1644        }
1645    }
1646}
1647
1648impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1649where
1650    D: timely::Container + Clone + 'static,
1651{
1652    fn with_start_signal(self, signal: StartSignal) -> Self {
1653        let activations = self.scope().activations();
1654        self.unary(Pipeline, "StartSignal", |_cap, info| {
1655            let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1656            signal.drop_on_fire(token);
1657
1658            let mut stash = Vec::new();
1659
1660            move |input, output| {
1661                // Stash incoming updates as long as the start signal has not fired.
1662                if !signal.has_fired() {
1663                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1664                    return;
1665                }
1666
1667                // Release any data we might still have stashed.
1668                for (cap, mut data) in std::mem::take(&mut stash) {
1669                    output.session(&cap).give_container(&mut data);
1670                }
1671
1672                // Pass through all remaining input data.
1673                input.for_each(|cap, data| {
1674                    output.session(&cap).give_container(data);
1675                });
1676            }
1677        })
1678    }
1679}
1680
1681/// Suppress progress messages for times before the given `as_of`.
1682///
1683/// This operator exists specifically to work around a memory spike we'd otherwise see when
1684/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1685/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1686/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1687/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1688/// the snapshot batch.
1689///
1690/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1691/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1692/// beyond the `as_of`, so the snapshot data has already been collected.
1693///
1694/// To ensure this, this operator needs to take two measures:
1695///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1696///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1697///
1698/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1699/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1700/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1701/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1702fn suppress_early_progress<'scope, T: Timestamp, D>(
1703    stream: Stream<'scope, T, D>,
1704    as_of: Antichain<T>,
1705) -> Stream<'scope, T, D>
1706where
1707    D: Data + timely::Container,
1708{
1709    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1710        let mut early_cap = Some(default_cap);
1711
1712        move |(input, frontier), output| {
1713            input.for_each_time(|data_cap, data| {
1714                if as_of.less_than(data_cap.time()) {
1715                    let mut session = output.session(&data_cap);
1716                    for data in data {
1717                        session.give_container(data);
1718                    }
1719                } else {
1720                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1721                    let mut session = output.session(&cap);
1722                    for data in data {
1723                        session.give_container(data);
1724                    }
1725                }
1726            });
1727
1728            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1729                early_cap.take();
1730            }
1731        }
1732    })
1733}
1734
1735/// Extension trait for [`Stream`] to selectively limit progress.
1736trait LimitProgress<T: Timestamp> {
1737    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1738    /// the implementation to observe times in data, and release capabilities based on the probe's
1739    /// frontier, after applying `slack` to round up timestamps.
1740    ///
1741    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1742    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1743    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1744    /// for times before the `upper`, which is the time until which the source can distinguish
1745    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1746    /// our capability.
1747    ///
1748    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1749    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1750    ///
1751    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1752    /// allow downstream operators to release resources.
1753    ///
1754    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1755    /// avoid unbounded memory usage.
1756    ///
1757    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1758    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1759    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1760    /// * `name` is a human-readable name for the operator.
1761    /// * `limit` is the maximum number of pending times to keep around.
1762    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1763    ///   retain a capability.
1764    fn limit_progress(
1765        self,
1766        handle: MzProbeHandle<T>,
1767        slack_ms: u64,
1768        limit: Option<usize>,
1769        upper: Antichain<T>,
1770        name: String,
1771    ) -> Self;
1772}
1773
1774// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1775// number.
1776impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1777    for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1778where
1779    D: Clone + 'static,
1780    R: Clone + 'static,
1781{
1782    fn limit_progress(
1783        self,
1784        handle: MzProbeHandle<mz_repr::Timestamp>,
1785        slack_ms: u64,
1786        limit: Option<usize>,
1787        upper: Antichain<mz_repr::Timestamp>,
1788        name: String,
1789    ) -> Self {
1790        let scope = self.scope();
1791        let stream =
1792            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1793                // Times that we've observed on our input.
1794                let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1795                // Capability for the lower bound of `pending_times`, if any.
1796                let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1797
1798                let activator = scope.activator_for(info.address);
1799                handle.activate(activator.clone());
1800
1801                move |(input, frontier), output| {
1802                    input.for_each(|cap, data| {
1803                        for time in data
1804                            .iter()
1805                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1806                        {
1807                            // `slack_ms == 0` means no rounding; otherwise round up to the next
1808                            // multiple of `slack_ms`. Avoids a divide-by-zero panic when the
1809                            // operator is configured without slack.
1810                            let rounded_time = if slack_ms == 0 {
1811                                time
1812                            } else {
1813                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms)
1814                            };
1815                            if !upper.less_than(&rounded_time.into()) {
1816                                pending_times.insert(rounded_time.into());
1817                            }
1818                        }
1819                        output.session(&cap).give_container(data);
1820                        if retained_cap.as_ref().is_none_or(|c| {
1821                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1822                        }) {
1823                            retained_cap = Some(cap.retain(0));
1824                        }
1825                    });
1826
1827                    handle.with_frontier(|f| {
1828                        while pending_times
1829                            .first()
1830                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1831                        {
1832                            let _ = pending_times.pop_first();
1833                        }
1834                    });
1835
1836                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1837                        let _ = pending_times.pop_first();
1838                    }
1839
1840                    match (retained_cap.as_mut(), pending_times.first()) {
1841                        (Some(cap), Some(first)) => cap.downgrade(first),
1842                        (_, None) => retained_cap = None,
1843                        _ => {}
1844                    }
1845
1846                    if frontier.is_empty() {
1847                        retained_cap = None;
1848                        pending_times.clear();
1849                    }
1850
1851                    if !pending_times.is_empty() {
1852                        tracing::debug!(
1853                            name,
1854                            info.global_id,
1855                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1856                            frontier = ?frontier.frontier().get(0),
1857                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1858                            ?upper,
1859                            "pending times",
1860                        );
1861                    }
1862                }
1863            });
1864        stream
1865    }
1866}
1867
1868/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1869/// the difference between timestamps.
1870struct PendingTimesDisplay<T>(T);
1871
1872impl<T> std::fmt::Display for PendingTimesDisplay<T>
1873where
1874    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1875{
1876    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1877        let mut iter = self.0.clone().into_iter();
1878        write!(f, "[")?;
1879        if let Some(first) = iter.next() {
1880            write!(f, "{}", first)?;
1881            let mut last = u64::from(first);
1882            for time in iter {
1883                write!(f, ", +{}", u64::from(time) - last)?;
1884                last = u64::from(time);
1885            }
1886        }
1887        write!(f, "]")?;
1888        Ok(())
1889    }
1890}
1891
1892/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1893/// into two rows, given the arity of the first component.
1894#[derive(Clone, Copy, Debug)]
1895struct Pairer {
1896    split_arity: usize,
1897}
1898
1899impl Pairer {
1900    /// Creates a pairer with knowledge of the arity of first component in the pair.
1901    fn new(split_arity: usize) -> Self {
1902        Self { split_arity }
1903    }
1904
1905    /// Merges a pair of datum iterators creating a `Row` instance.
1906    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1907    where
1908        I1: IntoIterator<Item = Datum<'a>>,
1909        I2: IntoIterator<Item = Datum<'a>>,
1910    {
1911        SharedRow::pack(first.into_iter().chain(second))
1912    }
1913
1914    /// Splits a datum iterator into a pair of `Row` instances.
1915    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1916        let mut datum_iter = datum_iter.into_iter();
1917        let mut row_builder = SharedRow::get();
1918        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1919        let second = row_builder.pack_using(datum_iter);
1920        (first, second)
1921    }
1922}