Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::Variable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use mz_timely_util::scope_label::ScopeExt;
141use timely::PartialOrder;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::vec::ToStream;
145use timely::dataflow::operators::vec::{BranchWhen, Filter};
146use timely::dataflow::operators::{Capability, Operator, Probe, probe};
147use timely::dataflow::{Scope, Stream, StreamVec};
148use timely::order::{Product, TotalOrder};
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::Worker as TimelyWorker;
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
181///
182/// This method imports sources from provided assets, and then builds the remaining
183/// dataflow using "compute-local" assets like shared arrangements, and producing
184/// both arrangements and sinks.
185pub fn build_compute_dataflow(
186    timely_worker: &mut TimelyWorker,
187    compute_state: &mut ComputeState,
188    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189    start_signal: StartSignal,
190    until: Antichain<mz_repr::Timestamp>,
191    dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193    // Mutually recursive view definitions require special handling.
194    let recursive = dataflow
195        .objects_to_build
196        .iter()
197        .any(|object| object.plan.is_recursive());
198
199    // Determine indexes to export, and their dependencies.
200    let indexes = dataflow
201        .index_exports
202        .iter()
203        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204        .collect::<Vec<_>>();
205
206    // Determine sinks to export, and their dependencies.
207    let sinks = dataflow
208        .sink_exports
209        .iter()
210        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211        .collect::<Vec<_>>();
212
213    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215    let subscribe_snapshot_optimization =
216        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
217
218    let name = format!("Dataflow: {}", &dataflow.debug_name);
219    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
220    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
221
222    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
223        let scope = scope.with_label();
224
225        // TODO(ct3): This should be a config of the source instead, but at least try
226        // to contain the hacks.
227        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
228
229        // The scope.clone() occurs to allow import in the region.
230        // We build a region here to establish a pattern of a scope inside the dataflow,
231        // so that other similar uses (e.g. with iterative scopes) do not require weird
232        // alternate type signatures.
233        let mut imported_sources = Vec::new();
234        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
235        let output_probe = MzProbeHandle::default();
236
237        scope.clone().region_named(&input_name, |region| {
238            // Import declared sources into the rendering context.
239            for (source_id, import) in dataflow.source_imports.iter() {
240                region.region_named(&format!("Source({:?})", source_id), |inner| {
241                    let mut read_schema = None;
242                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
243                        // If enabled, we read from Persist with a `RelationDesc` that
244                        // omits uneeded columns.
245                        if apply_demands {
246                            let demands = ops.demand();
247                            let new_desc = import
248                                .desc
249                                .storage_metadata
250                                .relation_desc
251                                .apply_demand(&demands);
252                            let new_arity = demands.len();
253                            let remap: BTreeMap<_, _> = demands
254                                .into_iter()
255                                .enumerate()
256                                .map(|(new, old)| (old, new))
257                                .collect();
258                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
259                            read_schema = Some(new_desc);
260                        }
261
262                        mz_expr::MfpPlan::create_from(ops)
263                            .expect("Linear operators should always be valid")
264                    });
265
266                    let mut snapshot_mode =
267                        if import.with_snapshot || !subscribe_snapshot_optimization {
268                            SnapshotMode::Include
269                        } else {
270                            compute_state.metrics.inc_subscribe_snapshot_optimization();
271                            SnapshotMode::Exclude
272                        };
273                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
274                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
275                    if let Some(x) = ct_source_transformer.as_ref() {
276                        snapshot_mode = x.snapshot_mode();
277                        suppress_early_progress_as_of = suppress_early_progress_as_of
278                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
279                    }
280
281                    // Note: For correctness, we require that sources only emit times advanced by
282                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
283                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
284                        inner,
285                        *source_id,
286                        Arc::clone(&compute_state.persist_clients),
287                        &compute_state.txns_ctx,
288                        import.desc.storage_metadata.clone(),
289                        read_schema,
290                        dataflow.as_of.clone(),
291                        snapshot_mode,
292                        until.clone(),
293                        mfp.as_mut(),
294                        compute_state.dataflow_max_inflight_bytes(),
295                        start_signal.clone(),
296                        ErrorHandler::Halt("compute_import"),
297                    );
298
299                    // If `mfp` is non-identity, we need to apply what remains.
300                    // For the moment, assert that it is either trivial or `None`.
301                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
302
303                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
304                    // ensure that the first frontier we report into the dataflow is beyond the
305                    // `as_of`.
306                    if let Some(as_of) = suppress_early_progress_as_of {
307                        ok_stream = suppress_early_progress(ok_stream, as_of);
308                    }
309
310                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
311                        // Apply logical backpressure to the source.
312                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
313                            .get(&compute_state.worker_config);
314                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
315                            .get(&compute_state.worker_config)
316                            .as_millis()
317                            .try_into()
318                            .expect("must fit");
319
320                        let stream = ok_stream.limit_progress(
321                            output_probe.clone(),
322                            slack,
323                            limit,
324                            import.upper.clone(),
325                            name.clone(),
326                        );
327                        ok_stream = stream;
328                    }
329
330                    // Attach a probe reporting the input frontier.
331                    let input_probe =
332                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
333                    ok_stream = ok_stream.probe_with(&input_probe);
334
335                    // The `suppress_early_progress` operator and the input
336                    // probe both want to work on the untransformed ct input,
337                    // make sure this stays after them.
338                    let (ok_stream, err_stream) = match ct_source_transformer {
339                        None => (ok_stream, err_stream),
340                        Some(ct_source_transformer) => {
341                            let (oks, errs, ct_times) = ct_source_transformer
342                                .transform(ok_stream.as_collection(), err_stream.as_collection());
343                            // TODO(ct3): Ideally this would be encapsulated by
344                            // ContinualTaskCtx, but the types are tricky.
345                            ct_ctx
346                                .ct_times
347                                .push(ct_times.leave_region(region).leave_region(scope));
348                            (oks.inner, errs.inner)
349                        }
350                    };
351
352                    let (oks, errs) = (
353                        ok_stream
354                            .as_collection()
355                            .leave_region(region)
356                            .leave_region(scope),
357                        err_stream
358                            .as_collection()
359                            .leave_region(region)
360                            .leave_region(scope),
361                    );
362
363                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
364
365                    // Associate returned tokens with the source identifier.
366                    tokens.insert(*source_id, Rc::new(token));
367                });
368            }
369        });
370
371        // If there exists a recursive expression, we'll need to use a non-region scope,
372        // in order to support additional timestamp coordinates for iteration.
373        if recursive {
374            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
375                let mut context = Context::for_dataflow_in(
376                    &dataflow,
377                    region.clone(),
378                    compute_state,
379                    until,
380                    dataflow_expiration,
381                );
382
383                for (id, (oks, errs)) in imported_sources.into_iter() {
384                    let bundle = crate::render::CollectionBundle::from_collections(
385                        oks.enter(region),
386                        errs.enter(region),
387                    );
388                    // Associate collection bundle with the source identifier.
389                    context.insert_id(id, bundle);
390                }
391
392                // Import declared indexes into the rendering context.
393                for (idx_id, idx) in &dataflow.index_imports {
394                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
395                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
396                        SnapshotMode::Include
397                    } else {
398                        compute_state.metrics.inc_subscribe_snapshot_optimization();
399                        SnapshotMode::Exclude
400                    };
401                    context.import_index(
402                        scope,
403                        compute_state,
404                        &mut tokens,
405                        input_probe,
406                        *idx_id,
407                        &idx.desc,
408                        &idx.typ,
409                        snapshot_mode,
410                        start_signal.clone(),
411                    );
412                }
413
414                // Build declared objects.
415                for object in dataflow.objects_to_build {
416                    let bundle = context.scope.clone().region_named(
417                        &format!("BuildingObject({:?})", object.id),
418                        |region| {
419                            let depends = object.plan.depends();
420                            let in_let = object.plan.is_recursive();
421                            context
422                                .enter_region(region, Some(&depends))
423                                .render_recursive_plan(
424                                    object.id,
425                                    0,
426                                    object.plan,
427                                    // recursive plans _must_ have bodies in a let
428                                    BindingInfo::Body { in_let },
429                                )
430                                .leave_region(context.scope)
431                        },
432                    );
433                    let global_id = object.id;
434
435                    context.log_dataflow_global_id(
436                        *bundle
437                            .scope()
438                            .addr()
439                            .first()
440                            .expect("Dataflow root id must exist"),
441                        global_id,
442                    );
443                    context.insert_id(Id::Global(object.id), bundle);
444                }
445
446                // Export declared indexes.
447                for (idx_id, dependencies, idx) in indexes {
448                    context.export_index_iterative(
449                        scope,
450                        compute_state,
451                        &tokens,
452                        dependencies,
453                        idx_id,
454                        &idx,
455                        &output_probe,
456                    );
457                }
458
459                // Export declared sinks.
460                for (sink_id, dependencies, sink) in sinks {
461                    context.export_sink(
462                        compute_state,
463                        &tokens,
464                        dependencies,
465                        sink_id,
466                        &sink,
467                        start_signal.clone(),
468                        ct_ctx.input_times(scope),
469                        &output_probe,
470                        scope,
471                    );
472                }
473            });
474        } else {
475            scope.clone().region_named(&build_name, |region| {
476                let mut context = Context::for_dataflow_in(
477                    &dataflow,
478                    region.clone(),
479                    compute_state,
480                    until,
481                    dataflow_expiration,
482                );
483
484                for (id, (oks, errs)) in imported_sources.into_iter() {
485                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
486                        let as_of = context.as_of_frontier.clone();
487                        let summary = TEMPORAL_BUCKETING_SUMMARY
488                            .get(&compute_state.worker_config)
489                            .try_into()
490                            .expect("must fit");
491                        oks.inner
492                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
493                            .as_collection()
494                    } else {
495                        oks
496                    };
497                    let bundle = crate::render::CollectionBundle::from_collections(
498                        oks.enter_region(region),
499                        errs.enter_region(region),
500                    );
501                    // Associate collection bundle with the source identifier.
502                    context.insert_id(id, bundle);
503                }
504
505                // Import declared indexes into the rendering context.
506                for (idx_id, idx) in &dataflow.index_imports {
507                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
508                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
509                        SnapshotMode::Include
510                    } else {
511                        compute_state.metrics.inc_subscribe_snapshot_optimization();
512                        SnapshotMode::Exclude
513                    };
514                    context.import_index(
515                        scope,
516                        compute_state,
517                        &mut tokens,
518                        input_probe,
519                        *idx_id,
520                        &idx.desc,
521                        &idx.typ,
522                        snapshot_mode,
523                        start_signal.clone(),
524                    );
525                }
526
527                // Build declared objects.
528                for object in dataflow.objects_to_build {
529                    let bundle = context.scope.clone().region_named(
530                        &format!("BuildingObject({:?})", object.id),
531                        |region| {
532                            let depends = object.plan.depends();
533                            context
534                                .enter_region(region, Some(&depends))
535                                .render_plan(object.id, object.plan)
536                                .leave_region(context.scope)
537                        },
538                    );
539                    let global_id = object.id;
540                    context.log_dataflow_global_id(
541                        *bundle
542                            .scope()
543                            .addr()
544                            .first()
545                            .expect("Dataflow root id must exist"),
546                        global_id,
547                    );
548                    context.insert_id(Id::Global(object.id), bundle);
549                }
550
551                // Export declared indexes.
552                for (idx_id, dependencies, idx) in indexes {
553                    context.export_index(
554                        compute_state,
555                        &tokens,
556                        dependencies,
557                        idx_id,
558                        &idx,
559                        &output_probe,
560                    );
561                }
562
563                // Export declared sinks.
564                for (sink_id, dependencies, sink) in sinks {
565                    context.export_sink(
566                        compute_state,
567                        &tokens,
568                        dependencies,
569                        sink_id,
570                        &sink,
571                        start_signal.clone(),
572                        ct_ctx.input_times(scope),
573                        &output_probe,
574                        scope,
575                    );
576                }
577            });
578        }
579    });
580}
581
582// This implementation block allows child timestamps to vary from parent timestamps,
583// but requires the parent timestamp to be `repr::Timestamp`.
584impl<'g, T> Context<'g, T>
585where
586    T: Refines<mz_repr::Timestamp> + RenderTimestamp,
587{
588    /// Import the collection from the arrangement, discarding batches from the snapshot.
589    /// (This does not guarantee that no records from the snapshot are included; the assumption is
590    /// that we'll filter those out later if necessary.)
591    fn import_filtered_index_collection<
592        'outer,
593        Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
594        V: Data,
595    >(
596        &self,
597        arranged: Arranged<'outer, Tr>,
598        start_signal: StartSignal,
599        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
600    ) -> VecCollection<'g, T, V, Tr::Diff>
601    where
602        // This is implied by the fact that the outer timestamp = mz_repr::Timestamp, but it's essential
603        // for our batch-level filtering to be safe, so we document it here regardless.
604        mz_repr::Timestamp: TotalOrder,
605    {
606        let oks = arranged.stream.with_start_signal(start_signal).filter({
607            let as_of = self.as_of_frontier.clone();
608            move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
609        });
610        Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
611    }
612
613    pub(crate) fn import_index<'outer>(
614        &mut self,
615        outer: Scope<'outer, mz_repr::Timestamp>,
616        compute_state: &mut ComputeState,
617        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
618        input_probe: probe::Handle<mz_repr::Timestamp>,
619        idx_id: GlobalId,
620        idx: &IndexDesc,
621        typ: &ReprRelationType,
622        snapshot_mode: SnapshotMode,
623        start_signal: StartSignal,
624    ) {
625        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
626            assert!(
627                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
628                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
629            );
630
631            let token = traces.to_drop().clone();
632
633            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
634                outer,
635                &format!("Index({}, {:?})", idx.on_id, idx.key),
636                self.as_of_frontier.clone(),
637                self.until.clone(),
638            );
639
640            oks.stream = oks.stream.probe_with(&input_probe);
641
642            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
643                outer,
644                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
645                self.as_of_frontier.clone(),
646                self.until.clone(),
647            );
648
649            let bundle = match snapshot_mode {
650                SnapshotMode::Include => {
651                    let ok_arranged = oks
652                        .enter(self.scope)
653                        .with_start_signal(start_signal.clone());
654                    let err_arranged = err_arranged
655                        .enter(self.scope)
656                        .with_start_signal(start_signal);
657                    CollectionBundle::from_expressions(
658                        idx.key.clone(),
659                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
660                    )
661                }
662                SnapshotMode::Exclude => {
663                    // When we import an index without a snapshot, we have two balancing considerations:
664                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
665                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
666                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
667                    //   (joins, reduces, etc.) also require the snapshot data.
668                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
669                    let oks = {
670                        let mut datums = DatumVec::new();
671                        let (permutation, _thinning) =
672                            permutation_for_arrangement(&idx.key, typ.arity());
673                        self.import_filtered_index_collection(
674                            oks,
675                            start_signal.clone(),
676                            move |k: DatumSeq, v: DatumSeq| {
677                                let mut datums_borrow = datums.borrow();
678                                datums_borrow.extend(k);
679                                datums_borrow.extend(v);
680                                SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
681                            },
682                        )
683                    };
684                    let errs = self.import_filtered_index_collection(
685                        err_arranged,
686                        start_signal,
687                        |e, _| e.clone(),
688                    );
689                    CollectionBundle::from_collections(oks, errs)
690                }
691            };
692            self.update_id(Id::Global(idx.on_id), bundle);
693            tokens.insert(
694                idx_id,
695                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
696            );
697        } else {
698            panic!(
699                "import of index {} failed while building dataflow {}",
700                idx_id, self.dataflow_id
701            );
702        }
703    }
704}
705
706// This implementation block requires the scopes have the same timestamp as the trace manager.
707// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
708impl<'g> Context<'g, mz_repr::Timestamp> {
709    pub(crate) fn export_index(
710        &self,
711        compute_state: &mut ComputeState,
712        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
713        dependency_ids: BTreeSet<GlobalId>,
714        idx_id: GlobalId,
715        idx: &IndexDesc,
716        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
717    ) {
718        // put together tokens that belong to the export
719        let mut needed_tokens = Vec::new();
720        for dep_id in dependency_ids {
721            if let Some(token) = tokens.get(&dep_id) {
722                needed_tokens.push(Rc::clone(token));
723            }
724        }
725        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
726            panic!(
727                "Arrangement alarmingly absent! id: {:?}",
728                Id::Global(idx_id)
729            )
730        });
731
732        match bundle.arrangement(&idx.key) {
733            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
734                // Ensure that the frontier does not advance past the expiration time, if set.
735                // Otherwise, we might write down incorrect data.
736                if let Some(&expiration) = self.dataflow_expiration.as_option() {
737                    oks.stream = oks.stream.expire_stream_at(
738                        &format!("{}_export_index_oks", self.debug_name),
739                        expiration,
740                    );
741                    errs.stream = errs.stream.expire_stream_at(
742                        &format!("{}_export_index_errs", self.debug_name),
743                        expiration,
744                    );
745                }
746
747                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
748
749                // Attach logging of dataflow errors.
750                if let Some(logger) = compute_state.compute_logger.clone() {
751                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
752                }
753
754                compute_state.traces.set(
755                    idx_id,
756                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
757                );
758            }
759            Some(ArrangementFlavor::Trace(gid, _, _)) => {
760                // Duplicate of existing arrangement with id `gid`, so
761                // just create another handle to that arrangement.
762                let trace = compute_state.traces.get(&gid).unwrap().clone();
763                compute_state.traces.set(idx_id, trace);
764            }
765            None => {
766                println!("collection available: {:?}", bundle.collection.is_none());
767                println!(
768                    "keys available: {:?}",
769                    bundle.arranged.keys().collect::<Vec<_>>()
770                );
771                panic!(
772                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
773                    Id::Global(idx_id),
774                    &idx.key
775                );
776            }
777        };
778    }
779}
780
781// This implementation block requires the scopes have the same timestamp as the trace manager.
782// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
783impl<'g, T> Context<'g, T>
784where
785    T: RenderTimestamp,
786{
787    pub(crate) fn export_index_iterative<'outer>(
788        &self,
789        outer: Scope<'outer, mz_repr::Timestamp>,
790        compute_state: &mut ComputeState,
791        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
792        dependency_ids: BTreeSet<GlobalId>,
793        idx_id: GlobalId,
794        idx: &IndexDesc,
795        output_probe: &MzProbeHandle<mz_repr::Timestamp>,
796    ) {
797        // put together tokens that belong to the export
798        let mut needed_tokens = Vec::new();
799        for dep_id in dependency_ids {
800            if let Some(token) = tokens.get(&dep_id) {
801                needed_tokens.push(Rc::clone(token));
802            }
803        }
804        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
805            panic!(
806                "Arrangement alarmingly absent! id: {:?}",
807                Id::Global(idx_id)
808            )
809        });
810
811        match bundle.arrangement(&idx.key) {
812            Some(ArrangementFlavor::Local(oks, errs)) => {
813                // TODO: The following as_collection/leave/arrange sequence could be optimized.
814                //   * Combine as_collection and leave into a single function.
815                //   * Use columnar to extract columns from the batches to implement leave.
816                let mut oks = oks
817                    .as_collection(|k, v| (k.to_row(), v.to_row()))
818                    .leave(outer)
819                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
820                        "Arrange export iterative",
821                    );
822
823                let mut errs = errs
824                    .as_collection(|k, v| (k.clone(), v.clone()))
825                    .leave(outer)
826                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
827                        "Arrange export iterative err",
828                    );
829
830                // Ensure that the frontier does not advance past the expiration time, if set.
831                // Otherwise, we might write down incorrect data.
832                if let Some(&expiration) = self.dataflow_expiration.as_option() {
833                    oks.stream = oks.stream.expire_stream_at(
834                        &format!("{}_export_index_iterative_oks", self.debug_name),
835                        expiration,
836                    );
837                    errs.stream = errs.stream.expire_stream_at(
838                        &format!("{}_export_index_iterative_err", self.debug_name),
839                        expiration,
840                    );
841                }
842
843                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
844
845                // Attach logging of dataflow errors.
846                if let Some(logger) = compute_state.compute_logger.clone() {
847                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
848                }
849
850                compute_state.traces.set(
851                    idx_id,
852                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
853                );
854            }
855            Some(ArrangementFlavor::Trace(gid, _, _)) => {
856                // Duplicate of existing arrangement with id `gid`, so
857                // just create another handle to that arrangement.
858                let trace = compute_state.traces.get(&gid).unwrap().clone();
859                compute_state.traces.set(idx_id, trace);
860            }
861            None => {
862                println!("collection available: {:?}", bundle.collection.is_none());
863                println!(
864                    "keys available: {:?}",
865                    bundle.arranged.keys().collect::<Vec<_>>()
866                );
867                panic!(
868                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
869                    Id::Global(idx_id),
870                    &idx.key
871                );
872            }
873        };
874    }
875}
876
877/// Information about bindings, tracked in `render_recursive_plan` and
878/// `render_plan`, to be passed to `render_letfree_plan`.
879///
880/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
881/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
882enum BindingInfo {
883    Body { in_let: bool },
884    Let { id: LocalId, last: bool },
885    LetRec { id: LocalId, last: bool },
886}
887
888impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
889    /// Renders a plan to a differential dataflow, producing the collection of results.
890    ///
891    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
892    /// in the context of `level` pre-existing iteration coordinates.
893    ///
894    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
895    /// and establishing the appropriate recursive dependencies among the bound variables.
896    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
897    /// further [`RecBind`]s are found.
898    ///
899    /// The method requires that all variables conclude with a physical representation that
900    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
901    fn render_recursive_plan(
902        &mut self,
903        object_id: GlobalId,
904        level: usize,
905        plan: RenderPlan,
906        binding: BindingInfo,
907    ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
908        for BindStage { lets, recs } in plan.binds {
909            // Render the let bindings in order.
910            let mut let_iter = lets.into_iter().peekable();
911            while let Some(LetBind { id, value }) = let_iter.next() {
912                let bundle =
913                    self.scope
914                        .clone()
915                        .region_named(&format!("Binding({:?})", id), |region| {
916                            let depends = value.depends();
917                            let last = let_iter.peek().is_none();
918                            let binding = BindingInfo::Let { id, last };
919                            self.enter_region(region, Some(&depends))
920                                .render_letfree_plan(object_id, value, binding)
921                                .leave_region(self.scope)
922                        });
923                self.insert_id(Id::Local(id), bundle);
924            }
925
926            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
927
928            // Define variables for rec bindings.
929            // It is important that we only use the `Variable` until the object is bound.
930            // At that point, all subsequent uses should have access to the object itself.
931            let mut variables = BTreeMap::new();
932            for id in rec_ids.iter() {
933                use differential_dataflow::dynamic::feedback_summary;
934                let inner = feedback_summary::<u64>(level + 1, 1);
935                let (oks_v, oks_collection) =
936                    Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
937                let (err_v, err_collection) =
938                    Variable::new(self.scope, Product::new(Default::default(), inner));
939
940                self.insert_id(
941                    Id::Local(*id),
942                    CollectionBundle::from_collections(oks_collection, err_collection),
943                );
944                variables.insert(Id::Local(*id), (oks_v, err_v));
945            }
946            // Now render each of the rec bindings.
947            let mut rec_iter = recs.into_iter().peekable();
948            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
949                let last = rec_iter.peek().is_none();
950                let binding = BindingInfo::LetRec { id, last };
951                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
952                // We need to ensure that the raw collection exists, but do not have enough information
953                // here to cause that to happen.
954                let (oks, mut err) = bundle.collection.clone().unwrap();
955                self.insert_id(Id::Local(id), bundle);
956                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
957
958                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
959                let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
960                    oks,
961                    "LetRecConsolidation",
962                );
963
964                if let Some(limit) = limit {
965                    // We swallow the results of the `max_iter`th iteration, because
966                    // these results would go into the `max_iter + 1`th iteration.
967                    let (in_limit, over_limit) =
968                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
969                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
970                            let iteration_index = *ps.get(level).unwrap_or(&0);
971                            // The pointstamp starts counting from 0, so we need to add 1.
972                            iteration_index + 1 >= limit.max_iters.into()
973                        });
974                    oks = VecCollection::new(in_limit);
975                    if !limit.return_at_limit {
976                        err = err.concat(VecCollection::new(over_limit).map(move |_data| {
977                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
978                                format!("{}", limit.max_iters.get()).into(),
979                            )))
980                        }));
981                    }
982                }
983
984                // Set err variable to the distinct elements of `err`.
985                // Distinctness is important, as we otherwise might add the same error each iteration,
986                // say if the limit of `oks` has an error. This would result in non-terminating rather
987                // than a clean report of the error. The trade-off is that we lose information about
988                // multiplicities of errors, but .. this seems to be the better call.
989                let err: KeyCollection<_, _, _> = err.into();
990                let errs = err
991                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
992                        "Arrange recursive err",
993                    )
994                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
995                        "Distinct recursive err",
996                        move |_k, _s, t| t.push(((), Diff::ONE)),
997                    )
998                    .as_collection(|k, _| k.clone());
999
1000                oks_v.set(oks);
1001                err_v.set(errs);
1002            }
1003            // Now extract each of the rec bindings into the outer scope.
1004            for id in rec_ids.into_iter() {
1005                let bundle = self.remove_id(Id::Local(id)).unwrap();
1006                let (oks, err) = bundle.collection.unwrap();
1007                self.insert_id(
1008                    Id::Local(id),
1009                    CollectionBundle::from_collections(
1010                        oks.leave_dynamic(level + 1),
1011                        err.leave_dynamic(level + 1),
1012                    ),
1013                );
1014            }
1015        }
1016
1017        self.render_letfree_plan(object_id, plan.body, binding)
1018    }
1019}
1020
1021impl<'scope, T: RenderTimestamp> Context<'scope, T> {
1022    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
1023    /// results.
1024    ///
1025    /// The return type reflects the uncertainty about the data representation, perhaps
1026    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
1027    ///
1028    /// # Panics
1029    ///
1030    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
1031    /// `render_recursive_plan` instead.
1032    fn render_plan(
1033        &mut self,
1034        object_id: GlobalId,
1035        plan: RenderPlan,
1036    ) -> CollectionBundle<'scope, T> {
1037        let mut in_let = false;
1038        for BindStage { lets, recs } in plan.binds {
1039            assert!(recs.is_empty());
1040
1041            let mut let_iter = lets.into_iter().peekable();
1042            while let Some(LetBind { id, value }) = let_iter.next() {
1043                // if we encounter a single let, the body is in a let
1044                in_let = true;
1045                let bundle =
1046                    self.scope
1047                        .clone()
1048                        .region_named(&format!("Binding({:?})", id), |region| {
1049                            let depends = value.depends();
1050                            let last = let_iter.peek().is_none();
1051                            let binding = BindingInfo::Let { id, last };
1052                            self.enter_region(region, Some(&depends))
1053                                .render_letfree_plan(object_id, value, binding)
1054                                .leave_region(self.scope)
1055                        });
1056                self.insert_id(Id::Local(id), bundle);
1057            }
1058        }
1059
1060        self.scope.clone().region_named("Main Body", |region| {
1061            let depends = plan.body.depends();
1062            self.enter_region(region, Some(&depends))
1063                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1064                .leave_region(self.scope)
1065        })
1066    }
1067
1068    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1069    fn render_letfree_plan(
1070        &self,
1071        object_id: GlobalId,
1072        plan: LetFreePlan,
1073        binding: BindingInfo,
1074    ) -> CollectionBundle<'scope, T> {
1075        let (mut nodes, root_id, topological_order) = plan.destruct();
1076
1077        // Rendered collections by their `LirId`.
1078        let mut collections = BTreeMap::new();
1079
1080        // Mappings to send along.
1081        // To save overhead, we'll only compute mappings when we need to,
1082        // which means things get gated behind options. Unfortunately, that means we
1083        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1084        // but there's no convenient way to express the invariant.
1085        let should_compute_lir_metadata = self.compute_logger.is_some();
1086        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1087            Some(Vec::with_capacity(nodes.len()))
1088        } else {
1089            None
1090        };
1091
1092        let mut topo_iter = topological_order.into_iter().peekable();
1093        while let Some(lir_id) = topo_iter.next() {
1094            let node = nodes.remove(&lir_id).unwrap();
1095
1096            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1097            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1098            // in some other structure and have that structure impl ExprHumanizer
1099            let metadata = if should_compute_lir_metadata {
1100                let operator = node.expr.humanize(&DummyHumanizer);
1101
1102                // mark the last operator in topo order with any binding decoration
1103                let operator = if topo_iter.peek().is_none() {
1104                    match &binding {
1105                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1106                        BindingInfo::Body { in_let: false } => operator,
1107                        BindingInfo::Let { id, last: true } => {
1108                            format!("With {id} = {operator}")
1109                        }
1110                        BindingInfo::Let { id, last: false } => {
1111                            format!("{id} = {operator}")
1112                        }
1113                        BindingInfo::LetRec { id, last: true } => {
1114                            format!("With Recursive {id} = {operator}")
1115                        }
1116                        BindingInfo::LetRec { id, last: false } => {
1117                            format!("{id} = {operator}")
1118                        }
1119                    }
1120                } else {
1121                    operator
1122                };
1123
1124                let operator_id_start = self.scope.worker().peek_identifier();
1125                Some((operator, operator_id_start))
1126            } else {
1127                None
1128            };
1129
1130            let mut bundle = self.render_plan_expr(node.expr, &collections);
1131
1132            if let Some((operator, operator_id_start)) = metadata {
1133                let operator_id_end = self.scope.worker().peek_identifier();
1134                let operator_span = (operator_id_start, operator_id_end);
1135
1136                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1137                    lir_mapping_metadata.push((
1138                        lir_id,
1139                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1140                    ))
1141                }
1142            }
1143
1144            self.log_operator_hydration(&mut bundle, lir_id);
1145
1146            collections.insert(lir_id, bundle);
1147        }
1148
1149        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1150            self.log_lir_mapping(object_id, lir_mapping_metadata);
1151        }
1152
1153        collections
1154            .remove(&root_id)
1155            .expect("LetFreePlan invariant (1)")
1156    }
1157
1158    /// Renders a [`render_plan::Expr`], producing the collection of results.
1159    ///
1160    /// # Panics
1161    ///
1162    /// Panics if any of the expr's inputs is not found in `collections`.
1163    /// Callers must ensure that input nodes have been rendered previously.
1164    fn render_plan_expr(
1165        &self,
1166        expr: render_plan::Expr,
1167        collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1168    ) -> CollectionBundle<'scope, T> {
1169        use render_plan::Expr::*;
1170
1171        let expect_input = |id| {
1172            collections
1173                .get(&id)
1174                .cloned()
1175                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1176        };
1177
1178        match expr {
1179            Constant { rows } => {
1180                // Produce both rows and errs to avoid conditional dataflow construction.
1181                let (rows, errs) = match rows {
1182                    Ok(rows) => (rows, Vec::new()),
1183                    Err(e) => (Vec::new(), vec![e]),
1184                };
1185
1186                // We should advance times in constant collections to start from `as_of`.
1187                let as_of_frontier = self.as_of_frontier.clone();
1188                let until = self.until.clone();
1189                let ok_collection = rows
1190                    .into_iter()
1191                    .filter_map(move |(row, mut time, diff)| {
1192                        time.advance_by(as_of_frontier.borrow());
1193                        if !until.less_equal(&time) {
1194                            Some((
1195                                row,
1196                                <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1197                                diff,
1198                            ))
1199                        } else {
1200                            None
1201                        }
1202                    })
1203                    .to_stream(self.scope)
1204                    .as_collection();
1205
1206                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1207                error_time.advance_by(self.as_of_frontier.borrow());
1208                let err_collection = errs
1209                    .into_iter()
1210                    .map(move |e| {
1211                        (
1212                            DataflowError::from(e),
1213                            <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1214                            Diff::ONE,
1215                        )
1216                    })
1217                    .to_stream(self.scope)
1218                    .as_collection();
1219
1220                CollectionBundle::from_collections(ok_collection, err_collection)
1221            }
1222            Get { id, keys, plan } => {
1223                // Recover the collection from `self` and then apply `mfp` to it.
1224                // If `mfp` happens to be trivial, we can just return the collection.
1225                let mut collection = self
1226                    .lookup_id(id)
1227                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1228                match plan {
1229                    mz_compute_types::plan::GetPlan::PassArrangements => {
1230                        // Assert that each of `keys` are present in `collection`.
1231                        assert!(
1232                            keys.arranged
1233                                .iter()
1234                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1235                        );
1236                        assert!(keys.raw <= collection.collection.is_some());
1237                        // Retain only those keys we want to import.
1238                        collection.arranged.retain(|key, _value| {
1239                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1240                        });
1241                        collection
1242                    }
1243                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1244                        let (oks, errs) = collection.as_collection_core(
1245                            mfp,
1246                            Some((key, row)),
1247                            self.until.clone(),
1248                            &self.config_set,
1249                        );
1250                        CollectionBundle::from_collections(oks, errs)
1251                    }
1252                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1253                        let (oks, errs) = collection.as_collection_core(
1254                            mfp,
1255                            None,
1256                            self.until.clone(),
1257                            &self.config_set,
1258                        );
1259                        CollectionBundle::from_collections(oks, errs)
1260                    }
1261                }
1262            }
1263            Mfp {
1264                input,
1265                mfp,
1266                input_key_val,
1267            } => {
1268                let input = expect_input(input);
1269                // If `mfp` is non-trivial, we should apply it and produce a collection.
1270                if mfp.is_identity() {
1271                    input
1272                } else {
1273                    let (oks, errs) = input.as_collection_core(
1274                        mfp,
1275                        input_key_val,
1276                        self.until.clone(),
1277                        &self.config_set,
1278                    );
1279                    CollectionBundle::from_collections(oks, errs)
1280                }
1281            }
1282            FlatMap {
1283                input_key,
1284                input,
1285                exprs,
1286                func,
1287                mfp_after: mfp,
1288            } => {
1289                let input = expect_input(input);
1290                self.render_flat_map(input_key, input, exprs, func, mfp)
1291            }
1292            Join { inputs, plan } => {
1293                let inputs = inputs.into_iter().map(expect_input).collect();
1294                match plan {
1295                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1296                        self.render_join(inputs, linear_plan)
1297                    }
1298                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1299                        self.render_delta_join(inputs, delta_plan)
1300                    }
1301                }
1302            }
1303            Reduce {
1304                input_key,
1305                input,
1306                key_val_plan,
1307                plan,
1308                mfp_after,
1309            } => {
1310                let input = expect_input(input);
1311                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1312                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1313            }
1314            TopK { input, top_k_plan } => {
1315                let input = expect_input(input);
1316                self.render_topk(input, top_k_plan)
1317            }
1318            Negate { input } => {
1319                let input = expect_input(input);
1320                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1321                CollectionBundle::from_collections(oks.negate(), errs)
1322            }
1323            Threshold {
1324                input,
1325                threshold_plan,
1326            } => {
1327                let input = expect_input(input);
1328                self.render_threshold(input, threshold_plan)
1329            }
1330            Union {
1331                inputs,
1332                consolidate_output,
1333            } => {
1334                let mut oks = Vec::new();
1335                let mut errs = Vec::new();
1336                for input in inputs.into_iter() {
1337                    let (os, es) =
1338                        expect_input(input).as_specific_collection(None, &self.config_set);
1339                    oks.push(os);
1340                    errs.push(es);
1341                }
1342                let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1343                if consolidate_output {
1344                    oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1345                        oks,
1346                        "UnionConsolidation",
1347                    )
1348                }
1349                let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1350                CollectionBundle::from_collections(oks, errs)
1351            }
1352            ArrangeBy {
1353                input_key,
1354                input,
1355                input_mfp,
1356                forms: keys,
1357            } => {
1358                let input = expect_input(input);
1359                input.ensure_collections(
1360                    keys,
1361                    input_key,
1362                    input_mfp,
1363                    self.until.clone(),
1364                    &self.config_set,
1365                )
1366            }
1367        }
1368    }
1369
1370    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1371        if let Some(logger) = &self.compute_logger {
1372            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1373                dataflow_index,
1374                global_id,
1375            }));
1376        }
1377    }
1378
1379    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1380        if let Some(logger) = &self.compute_logger {
1381            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1382        }
1383    }
1384
1385    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1386        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1387        // which we should attach the logging operator.
1388        //
1389        // We could attach to each collection and track the lower bound of output frontiers.
1390        // However, that would be of limited use because we expect all collections to hydrate at
1391        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1392        // frontier of the unarranged collection advance, it will perform all work necessary to
1393        // also advance its own frontier. We don't expect significant delays between frontier
1394        // advancements of the unarranged and arranged collections, so attaching the logging
1395        // operator to any one of them should produce accurate results.
1396        //
1397        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1398        // beneficial to attach the logging operator to one of the arranged representation to avoid
1399        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1400        // if we attached the logging operator to it, we would introduce a fork in its output
1401        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1402        // that the output streams of the arrangements don't yet feed into anything else, so
1403        // attaching a (pass-through) logging operator does not introduce a fork.
1404
1405        match bundle.arranged.values_mut().next() {
1406            Some(arrangement) => {
1407                use ArrangementFlavor::*;
1408
1409                match arrangement {
1410                    Local(a, _) => {
1411                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1412                    }
1413                    Trace(_, a, _) => {
1414                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1415                    }
1416                }
1417            }
1418            None => {
1419                let (oks, _) = bundle
1420                    .collection
1421                    .as_mut()
1422                    .expect("CollectionBundle invariant");
1423                let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1424                *oks = stream.as_collection();
1425            }
1426        }
1427    }
1428
1429    fn log_operator_hydration_inner<D>(
1430        &self,
1431        stream: Stream<'scope, T, D>,
1432        lir_id: LirId,
1433    ) -> Stream<'scope, T, D>
1434    where
1435        D: timely::Container + Clone + 'static,
1436    {
1437        let Some(logger) = self.compute_logger.clone() else {
1438            return stream.clone(); // hydration logging disabled
1439        };
1440
1441        let export_ids = self.export_ids.clone();
1442
1443        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1444        //
1445        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1446        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1447        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1448        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1449        // instead.
1450        let mut hydration_frontier = Antichain::new();
1451        for time in self.as_of_frontier.iter() {
1452            if let Some(time) = time.try_step_forward() {
1453                hydration_frontier.insert(Refines::to_inner(time));
1454            }
1455        }
1456
1457        let name = format!("LogOperatorHydration ({lir_id})");
1458        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1459            let mut hydrated = false;
1460
1461            for &export_id in &export_ids {
1462                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1463                    export_id,
1464                    lir_id,
1465                    hydrated,
1466                }));
1467            }
1468
1469            move |(input, frontier), output| {
1470                // Pass through inputs.
1471                input.for_each(|cap, data| {
1472                    output.session(&cap).give_container(data);
1473                });
1474
1475                if hydrated {
1476                    return;
1477                }
1478
1479                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1480                    hydrated = true;
1481
1482                    for &export_id in &export_ids {
1483                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1484                            export_id,
1485                            lir_id,
1486                            hydrated,
1487                        }));
1488                    }
1489                }
1490            }
1491        })
1492    }
1493}
1494
1495#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1496/// A timestamp type that can be used for operations within MZ's dataflow layer.
1497pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1498    /// The system timestamp component of the timestamp.
1499    ///
1500    /// This is useful for manipulating the system time, as when delaying
1501    /// updates for subsequent cancellation, as with monotonic reduction.
1502    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1503    /// Effects a system delay in terms of the timestamp summary.
1504    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1505    /// The event timestamp component of the timestamp.
1506    fn event_time(&self) -> mz_repr::Timestamp;
1507    /// The event timestamp component of the timestamp, as a mutable reference.
1508    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1509    /// Effects an event delay in terms of the timestamp summary.
1510    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1511    /// Steps the timestamp back so that logical compaction to the output will
1512    /// not conflate `self` with any historical times.
1513    fn step_back(&self) -> Self;
1514}
1515
1516impl RenderTimestamp for mz_repr::Timestamp {
1517    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1518        self
1519    }
1520    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1521        delay
1522    }
1523    fn event_time(&self) -> mz_repr::Timestamp {
1524        *self
1525    }
1526    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1527        self
1528    }
1529    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1530        delay
1531    }
1532    fn step_back(&self) -> Self {
1533        self.saturating_sub(1)
1534    }
1535}
1536
1537impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1538    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1539        &mut self.outer
1540    }
1541    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1542        Product::new(delay, Default::default())
1543    }
1544    fn event_time(&self) -> mz_repr::Timestamp {
1545        self.outer
1546    }
1547    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1548        &mut self.outer
1549    }
1550    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1551        Product::new(delay, Default::default())
1552    }
1553    fn step_back(&self) -> Self {
1554        // It is necessary to step back both coordinates of a product,
1555        // and when one is a `PointStamp` that also means all coordinates
1556        // of the pointstamp.
1557        let inner = self.inner.clone();
1558        let mut vec = inner.into_inner();
1559        for item in vec.iter_mut() {
1560            *item = item.saturating_sub(1);
1561        }
1562        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1563    }
1564}
1565
1566/// A signal that can be awaited by operators to suspend them prior to startup.
1567///
1568/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1569///
1570/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1571///
1572///  * Async operators can simply `await` it.
1573///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1574///    and then check `StartSignal::has_fired()` on each activation.
1575#[derive(Clone)]
1576pub(crate) struct StartSignal {
1577    /// A future that completes when the signal fires.
1578    ///
1579    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1580    /// signal is activated by dropping the corresponding `Sender`.
1581    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1582    /// A weak reference to the token, to register drop-on-fire values.
1583    token_ref: Weak<RefCell<Box<dyn Any>>>,
1584}
1585
1586impl StartSignal {
1587    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1588    /// dropped.
1589    pub fn new() -> (Self, Rc<dyn Any>) {
1590        let (tx, rx) = oneshot::channel::<Infallible>();
1591        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1592        let signal = Self {
1593            fut: rx.shared(),
1594            token_ref: Rc::downgrade(&token),
1595        };
1596        (signal, token)
1597    }
1598
1599    pub fn has_fired(&self) -> bool {
1600        self.token_ref.strong_count() == 0
1601    }
1602
1603    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1604        if let Some(token) = self.token_ref.upgrade() {
1605            let mut token = token.borrow_mut();
1606            let inner = std::mem::replace(&mut *token, Box::new(()));
1607            *token = Box::new((inner, to_drop));
1608        }
1609    }
1610}
1611
1612impl Future for StartSignal {
1613    type Output = ();
1614
1615    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1616        self.fut.poll_unpin(cx).map(|_| ())
1617    }
1618}
1619
1620/// Extension trait to attach a `StartSignal` to operator outputs.
1621pub(crate) trait WithStartSignal {
1622    /// Delays data and progress updates until the start signal has fired.
1623    ///
1624    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1625    /// depending on the amount and shape of its inputs.
1626    fn with_start_signal(self, signal: StartSignal) -> Self;
1627}
1628
1629impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1630where
1631    Tr: TraceReader<Time: RenderTimestamp> + Clone,
1632{
1633    fn with_start_signal(self, signal: StartSignal) -> Self {
1634        Arranged {
1635            stream: self.stream.with_start_signal(signal),
1636            trace: self.trace,
1637        }
1638    }
1639}
1640
1641impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1642where
1643    D: timely::Container + Clone + 'static,
1644{
1645    fn with_start_signal(self, signal: StartSignal) -> Self {
1646        let activations = self.scope().activations();
1647        self.unary(Pipeline, "StartSignal", |_cap, info| {
1648            let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1649            signal.drop_on_fire(token);
1650
1651            let mut stash = Vec::new();
1652
1653            move |input, output| {
1654                // Stash incoming updates as long as the start signal has not fired.
1655                if !signal.has_fired() {
1656                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1657                    return;
1658                }
1659
1660                // Release any data we might still have stashed.
1661                for (cap, mut data) in std::mem::take(&mut stash) {
1662                    output.session(&cap).give_container(&mut data);
1663                }
1664
1665                // Pass through all remaining input data.
1666                input.for_each(|cap, data| {
1667                    output.session(&cap).give_container(data);
1668                });
1669            }
1670        })
1671    }
1672}
1673
1674/// Suppress progress messages for times before the given `as_of`.
1675///
1676/// This operator exists specifically to work around a memory spike we'd otherwise see when
1677/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1678/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1679/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1680/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1681/// the snapshot batch.
1682///
1683/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1684/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1685/// beyond the `as_of`, so the snapshot data has already been collected.
1686///
1687/// To ensure this, this operator needs to take two measures:
1688///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1689///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1690///
1691/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1692/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1693/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1694/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1695fn suppress_early_progress<'scope, T: Timestamp, D>(
1696    stream: Stream<'scope, T, D>,
1697    as_of: Antichain<T>,
1698) -> Stream<'scope, T, D>
1699where
1700    D: Data + timely::Container,
1701{
1702    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1703        let mut early_cap = Some(default_cap);
1704
1705        move |(input, frontier), output| {
1706            input.for_each_time(|data_cap, data| {
1707                if as_of.less_than(data_cap.time()) {
1708                    let mut session = output.session(&data_cap);
1709                    for data in data {
1710                        session.give_container(data);
1711                    }
1712                } else {
1713                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1714                    let mut session = output.session(&cap);
1715                    for data in data {
1716                        session.give_container(data);
1717                    }
1718                }
1719            });
1720
1721            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1722                early_cap.take();
1723            }
1724        }
1725    })
1726}
1727
1728/// Extension trait for [`Stream`] to selectively limit progress.
1729trait LimitProgress<T: Timestamp> {
1730    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1731    /// the implementation to observe times in data, and release capabilities based on the probe's
1732    /// frontier, after applying `slack` to round up timestamps.
1733    ///
1734    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1735    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1736    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1737    /// for times before the `upper`, which is the time until which the source can distinguish
1738    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1739    /// our capability.
1740    ///
1741    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1742    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1743    ///
1744    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1745    /// allow downstream operators to release resources.
1746    ///
1747    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1748    /// avoid unbounded memory usage.
1749    ///
1750    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1751    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1752    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1753    /// * `name` is a human-readable name for the operator.
1754    /// * `limit` is the maximum number of pending times to keep around.
1755    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1756    ///   retain a capability.
1757    fn limit_progress(
1758        self,
1759        handle: MzProbeHandle<T>,
1760        slack_ms: u64,
1761        limit: Option<usize>,
1762        upper: Antichain<T>,
1763        name: String,
1764    ) -> Self;
1765}
1766
1767// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1768// number.
1769impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1770    for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1771where
1772    D: Clone + 'static,
1773    R: Clone + 'static,
1774{
1775    fn limit_progress(
1776        self,
1777        handle: MzProbeHandle<mz_repr::Timestamp>,
1778        slack_ms: u64,
1779        limit: Option<usize>,
1780        upper: Antichain<mz_repr::Timestamp>,
1781        name: String,
1782    ) -> Self {
1783        let scope = self.scope();
1784        let stream =
1785            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1786                // Times that we've observed on our input.
1787                let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1788                // Capability for the lower bound of `pending_times`, if any.
1789                let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1790
1791                let activator = scope.activator_for(info.address);
1792                handle.activate(activator.clone());
1793
1794                move |(input, frontier), output| {
1795                    input.for_each(|cap, data| {
1796                        for time in data
1797                            .iter()
1798                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1799                        {
1800                            let rounded_time =
1801                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1802                            if !upper.less_than(&rounded_time.into()) {
1803                                pending_times.insert(rounded_time.into());
1804                            }
1805                        }
1806                        output.session(&cap).give_container(data);
1807                        if retained_cap.as_ref().is_none_or(|c| {
1808                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1809                        }) {
1810                            retained_cap = Some(cap.retain(0));
1811                        }
1812                    });
1813
1814                    handle.with_frontier(|f| {
1815                        while pending_times
1816                            .first()
1817                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1818                        {
1819                            let _ = pending_times.pop_first();
1820                        }
1821                    });
1822
1823                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1824                        let _ = pending_times.pop_first();
1825                    }
1826
1827                    match (retained_cap.as_mut(), pending_times.first()) {
1828                        (Some(cap), Some(first)) => cap.downgrade(first),
1829                        (_, None) => retained_cap = None,
1830                        _ => {}
1831                    }
1832
1833                    if frontier.is_empty() {
1834                        retained_cap = None;
1835                        pending_times.clear();
1836                    }
1837
1838                    if !pending_times.is_empty() {
1839                        tracing::debug!(
1840                            name,
1841                            info.global_id,
1842                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1843                            frontier = ?frontier.frontier().get(0),
1844                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1845                            ?upper,
1846                            "pending times",
1847                        );
1848                    }
1849                }
1850            });
1851        stream
1852    }
1853}
1854
1855/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1856/// the difference between timestamps.
1857struct PendingTimesDisplay<T>(T);
1858
1859impl<T> std::fmt::Display for PendingTimesDisplay<T>
1860where
1861    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1862{
1863    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1864        let mut iter = self.0.clone().into_iter();
1865        write!(f, "[")?;
1866        if let Some(first) = iter.next() {
1867            write!(f, "{}", first)?;
1868            let mut last = u64::from(first);
1869            for time in iter {
1870                write!(f, ", +{}", u64::from(time) - last)?;
1871                last = u64::from(time);
1872            }
1873        }
1874        write!(f, "]")?;
1875        Ok(())
1876    }
1877}
1878
1879/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1880/// into two rows, given the arity of the first component.
1881#[derive(Clone, Copy, Debug)]
1882struct Pairer {
1883    split_arity: usize,
1884}
1885
1886impl Pairer {
1887    /// Creates a pairer with knowledge of the arity of first component in the pair.
1888    fn new(split_arity: usize) -> Self {
1889        Self { split_arity }
1890    }
1891
1892    /// Merges a pair of datum iterators creating a `Row` instance.
1893    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1894    where
1895        I1: IntoIterator<Item = Datum<'a>>,
1896        I2: IntoIterator<Item = Datum<'a>>,
1897    {
1898        SharedRow::pack(first.into_iter().chain(second))
1899    }
1900
1901    /// Splits a datum iterator into a pair of `Row` instances.
1902    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1903        let mut datum_iter = datum_iter.into_iter();
1904        let mut row_builder = SharedRow::get();
1905        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1906        let second = row_builder.pack_using(datum_iter);
1907        (first, second)
1908    }
1909}