Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::Variable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use mz_timely_util::scope_label::ScopeExt;
141use timely::PartialOrder;
142use timely::communication::Allocate;
143use timely::container::CapacityContainerBuilder;
144use timely::dataflow::channels::pact::Pipeline;
145use timely::dataflow::operators::vec::ToStream;
146use timely::dataflow::operators::vec::{BranchWhen, Filter};
147use timely::dataflow::operators::{Capability, Operator, Probe, probe};
148use timely::dataflow::scopes::Child;
149use timely::dataflow::{Scope, Stream, StreamVec};
150use timely::order::{Product, TotalOrder};
151use timely::progress::timestamp::Refines;
152use timely::progress::{Antichain, Timestamp};
153use timely::scheduling::ActivateOnDrop;
154use timely::worker::{AsWorker, Worker as TimelyWorker};
155
156use crate::arrangement::manager::TraceBundle;
157use crate::compute_state::ComputeState;
158use crate::extensions::arrange::{KeyCollection, MzArrange};
159use crate::extensions::reduce::MzReduce;
160use crate::extensions::temporal_bucket::TemporalBucketing;
161use crate::logging::compute::{
162    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
163};
164use crate::render::context::{ArrangementFlavor, Context};
165use crate::render::continual_task::ContinualTaskCtx;
166use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
167use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
168
169pub mod context;
170pub(crate) mod continual_task;
171mod errors;
172mod flat_map;
173mod join;
174mod reduce;
175pub mod sinks;
176mod threshold;
177mod top_k;
178
179pub use context::CollectionBundle;
180pub use join::LinearJoinSpec;
181
182/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
183///
184/// This method imports sources from provided assets, and then builds the remaining
185/// dataflow using "compute-local" assets like shared arrangements, and producing
186/// both arrangements and sinks.
187pub fn build_compute_dataflow<A: Allocate>(
188    timely_worker: &mut TimelyWorker<A>,
189    compute_state: &mut ComputeState,
190    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
191    start_signal: StartSignal,
192    until: Antichain<mz_repr::Timestamp>,
193    dataflow_expiration: Antichain<mz_repr::Timestamp>,
194) {
195    // Mutually recursive view definitions require special handling.
196    let recursive = dataflow
197        .objects_to_build
198        .iter()
199        .any(|object| object.plan.is_recursive());
200
201    // Determine indexes to export, and their dependencies.
202    let indexes = dataflow
203        .index_exports
204        .iter()
205        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
206        .collect::<Vec<_>>();
207
208    // Determine sinks to export, and their dependencies.
209    let sinks = dataflow
210        .sink_exports
211        .iter()
212        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
213        .collect::<Vec<_>>();
214
215    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
216    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
217    let subscribe_snapshot_optimization =
218        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
219
220    let name = format!("Dataflow: {}", &dataflow.debug_name);
221    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
222    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
223
224    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
225        let scope = &mut scope.with_label();
226
227        // TODO(ct3): This should be a config of the source instead, but at least try
228        // to contain the hacks.
229        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
230
231        // The scope.clone() occurs to allow import in the region.
232        // We build a region here to establish a pattern of a scope inside the dataflow,
233        // so that other similar uses (e.g. with iterative scopes) do not require weird
234        // alternate type signatures.
235        let mut imported_sources = Vec::new();
236        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
237        let output_probe = MzProbeHandle::default();
238
239        scope.clone().region_named(&input_name, |region| {
240            // Import declared sources into the rendering context.
241            for (source_id, import) in dataflow.source_imports.iter() {
242                region.region_named(&format!("Source({:?})", source_id), |inner| {
243                    let mut read_schema = None;
244                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
245                        // If enabled, we read from Persist with a `RelationDesc` that
246                        // omits uneeded columns.
247                        if apply_demands {
248                            let demands = ops.demand();
249                            let new_desc = import
250                                .desc
251                                .storage_metadata
252                                .relation_desc
253                                .apply_demand(&demands);
254                            let new_arity = demands.len();
255                            let remap: BTreeMap<_, _> = demands
256                                .into_iter()
257                                .enumerate()
258                                .map(|(new, old)| (old, new))
259                                .collect();
260                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
261                            read_schema = Some(new_desc);
262                        }
263
264                        mz_expr::MfpPlan::create_from(ops)
265                            .expect("Linear operators should always be valid")
266                    });
267
268                    let mut snapshot_mode =
269                        if import.with_snapshot || !subscribe_snapshot_optimization {
270                            SnapshotMode::Include
271                        } else {
272                            compute_state.metrics.inc_subscribe_snapshot_optimization();
273                            SnapshotMode::Exclude
274                        };
275                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
276                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
277                    if let Some(x) = ct_source_transformer.as_ref() {
278                        snapshot_mode = x.snapshot_mode();
279                        suppress_early_progress_as_of = suppress_early_progress_as_of
280                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
281                    }
282
283                    // Note: For correctness, we require that sources only emit times advanced by
284                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
285                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
286                        inner,
287                        *source_id,
288                        Arc::clone(&compute_state.persist_clients),
289                        &compute_state.txns_ctx,
290                        import.desc.storage_metadata.clone(),
291                        read_schema,
292                        dataflow.as_of.clone(),
293                        snapshot_mode,
294                        until.clone(),
295                        mfp.as_mut(),
296                        compute_state.dataflow_max_inflight_bytes(),
297                        start_signal.clone(),
298                        ErrorHandler::Halt("compute_import"),
299                    );
300
301                    // If `mfp` is non-identity, we need to apply what remains.
302                    // For the moment, assert that it is either trivial or `None`.
303                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
304
305                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
306                    // ensure that the first frontier we report into the dataflow is beyond the
307                    // `as_of`.
308                    if let Some(as_of) = suppress_early_progress_as_of {
309                        ok_stream = suppress_early_progress(ok_stream, as_of);
310                    }
311
312                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
313                        // Apply logical backpressure to the source.
314                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
315                            .get(&compute_state.worker_config);
316                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
317                            .get(&compute_state.worker_config)
318                            .as_millis()
319                            .try_into()
320                            .expect("must fit");
321
322                        let stream = ok_stream.limit_progress(
323                            output_probe.clone(),
324                            slack,
325                            limit,
326                            import.upper.clone(),
327                            name.clone(),
328                        );
329                        ok_stream = stream;
330                    }
331
332                    // Attach a probe reporting the input frontier.
333                    let input_probe =
334                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
335                    ok_stream = ok_stream.probe_with(&input_probe);
336
337                    // The `suppress_early_progress` operator and the input
338                    // probe both want to work on the untransformed ct input,
339                    // make sure this stays after them.
340                    let (ok_stream, err_stream) = match ct_source_transformer {
341                        None => (ok_stream, err_stream),
342                        Some(ct_source_transformer) => {
343                            let (oks, errs, ct_times) = ct_source_transformer
344                                .transform(ok_stream.as_collection(), err_stream.as_collection());
345                            // TODO(ct3): Ideally this would be encapsulated by
346                            // ContinualTaskCtx, but the types are tricky.
347                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
348                            (oks.inner, errs.inner)
349                        }
350                    };
351
352                    let (oks, errs) = (
353                        ok_stream.as_collection().leave_region().leave_region(),
354                        err_stream.as_collection().leave_region().leave_region(),
355                    );
356
357                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
358
359                    // Associate returned tokens with the source identifier.
360                    tokens.insert(*source_id, Rc::new(token));
361                });
362            }
363        });
364
365        // If there exists a recursive expression, we'll need to use a non-region scope,
366        // in order to support additional timestamp coordinates for iteration.
367        if recursive {
368            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
369                let mut context = Context::for_dataflow_in(
370                    &dataflow,
371                    region.clone(),
372                    compute_state,
373                    until,
374                    dataflow_expiration,
375                );
376
377                for (id, (oks, errs)) in imported_sources.into_iter() {
378                    let bundle = crate::render::CollectionBundle::from_collections(
379                        oks.enter(region),
380                        errs.enter(region),
381                    );
382                    // Associate collection bundle with the source identifier.
383                    context.insert_id(id, bundle);
384                }
385
386                // Import declared indexes into the rendering context.
387                for (idx_id, idx) in &dataflow.index_imports {
388                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
389                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
390                        SnapshotMode::Include
391                    } else {
392                        compute_state.metrics.inc_subscribe_snapshot_optimization();
393                        SnapshotMode::Exclude
394                    };
395                    context.import_index(
396                        compute_state,
397                        &mut tokens,
398                        input_probe,
399                        *idx_id,
400                        &idx.desc,
401                        &idx.typ,
402                        snapshot_mode,
403                        start_signal.clone(),
404                    );
405                }
406
407                // Build declared objects.
408                for object in dataflow.objects_to_build {
409                    let bundle = context.scope.clone().region_named(
410                        &format!("BuildingObject({:?})", object.id),
411                        |region| {
412                            let depends = object.plan.depends();
413                            let in_let = object.plan.is_recursive();
414                            context
415                                .enter_region(region, Some(&depends))
416                                .render_recursive_plan(
417                                    object.id,
418                                    0,
419                                    object.plan,
420                                    // recursive plans _must_ have bodies in a let
421                                    BindingInfo::Body { in_let },
422                                )
423                                .leave_region()
424                        },
425                    );
426                    let global_id = object.id;
427
428                    context.log_dataflow_global_id(
429                        *bundle
430                            .scope()
431                            .addr()
432                            .first()
433                            .expect("Dataflow root id must exist"),
434                        global_id,
435                    );
436                    context.insert_id(Id::Global(object.id), bundle);
437                }
438
439                // Export declared indexes.
440                for (idx_id, dependencies, idx) in indexes {
441                    context.export_index_iterative(
442                        compute_state,
443                        &tokens,
444                        dependencies,
445                        idx_id,
446                        &idx,
447                        &output_probe,
448                    );
449                }
450
451                // Export declared sinks.
452                for (sink_id, dependencies, sink) in sinks {
453                    context.export_sink(
454                        compute_state,
455                        &tokens,
456                        dependencies,
457                        sink_id,
458                        &sink,
459                        start_signal.clone(),
460                        ct_ctx.input_times(&context.scope.parent),
461                        &output_probe,
462                    );
463                }
464            });
465        } else {
466            scope.clone().region_named(&build_name, |region| {
467                let mut context = Context::for_dataflow_in(
468                    &dataflow,
469                    region.clone(),
470                    compute_state,
471                    until,
472                    dataflow_expiration,
473                );
474
475                for (id, (oks, errs)) in imported_sources.into_iter() {
476                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
477                        let as_of = context.as_of_frontier.clone();
478                        let summary = TEMPORAL_BUCKETING_SUMMARY
479                            .get(&compute_state.worker_config)
480                            .try_into()
481                            .expect("must fit");
482                        oks.inner
483                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
484                            .as_collection()
485                    } else {
486                        oks
487                    };
488                    let bundle = crate::render::CollectionBundle::from_collections(
489                        oks.enter_region(region),
490                        errs.enter_region(region),
491                    );
492                    // Associate collection bundle with the source identifier.
493                    context.insert_id(id, bundle);
494                }
495
496                // Import declared indexes into the rendering context.
497                for (idx_id, idx) in &dataflow.index_imports {
498                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
499                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
500                        SnapshotMode::Include
501                    } else {
502                        compute_state.metrics.inc_subscribe_snapshot_optimization();
503                        SnapshotMode::Exclude
504                    };
505                    context.import_index(
506                        compute_state,
507                        &mut tokens,
508                        input_probe,
509                        *idx_id,
510                        &idx.desc,
511                        &idx.typ,
512                        snapshot_mode,
513                        start_signal.clone(),
514                    );
515                }
516
517                // Build declared objects.
518                for object in dataflow.objects_to_build {
519                    let bundle = context.scope.clone().region_named(
520                        &format!("BuildingObject({:?})", object.id),
521                        |region| {
522                            let depends = object.plan.depends();
523                            context
524                                .enter_region(region, Some(&depends))
525                                .render_plan(object.id, object.plan)
526                                .leave_region()
527                        },
528                    );
529                    let global_id = object.id;
530                    context.log_dataflow_global_id(
531                        *bundle
532                            .scope()
533                            .addr()
534                            .first()
535                            .expect("Dataflow root id must exist"),
536                        global_id,
537                    );
538                    context.insert_id(Id::Global(object.id), bundle);
539                }
540
541                // Export declared indexes.
542                for (idx_id, dependencies, idx) in indexes {
543                    context.export_index(
544                        compute_state,
545                        &tokens,
546                        dependencies,
547                        idx_id,
548                        &idx,
549                        &output_probe,
550                    );
551                }
552
553                // Export declared sinks.
554                for (sink_id, dependencies, sink) in sinks {
555                    context.export_sink(
556                        compute_state,
557                        &tokens,
558                        dependencies,
559                        sink_id,
560                        &sink,
561                        start_signal.clone(),
562                        ct_ctx.input_times(&context.scope.parent),
563                        &output_probe,
564                    );
565                }
566            });
567        }
568    });
569}
570
571// This implementation block allows child timestamps to vary from parent timestamps,
572// but requires the parent timestamp to be `repr::Timestamp`.
573impl<'g, G, T> Context<Child<'g, G, T>>
574where
575    G: Scope<Timestamp = mz_repr::Timestamp>,
576    T: Refines<G::Timestamp> + RenderTimestamp,
577{
578    /// Import the collection from the arrangement, discarding batches from the snapshot.
579    /// (This does not guarantee that no records from the snapshot are included; the assumption is
580    /// that we'll filter those out later if necessary.)
581    fn import_filtered_index_collection<Tr: TraceReader<Time = G::Timestamp> + Clone, V: Data>(
582        &self,
583        arranged: Arranged<G, Tr>,
584        start_signal: StartSignal,
585        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
586    ) -> VecCollection<Child<'g, G, T>, V, Tr::Diff>
587    where
588        // This is implied by the fact that G::Timestamp = mz_repr::Timestamp, but it's essential
589        // for our batch-level filtering to be safe, so we document it here regardless.
590        G::Timestamp: TotalOrder,
591    {
592        let oks = arranged.stream.with_start_signal(start_signal).filter({
593            let as_of = self.as_of_frontier.clone();
594            move |b| !<Antichain<G::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
595        });
596        Arranged::<G, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(&self.scope)
597    }
598
599    pub(crate) fn import_index(
600        &mut self,
601        compute_state: &mut ComputeState,
602        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
603        input_probe: probe::Handle<mz_repr::Timestamp>,
604        idx_id: GlobalId,
605        idx: &IndexDesc,
606        typ: &ReprRelationType,
607        snapshot_mode: SnapshotMode,
608        start_signal: StartSignal,
609    ) {
610        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
611            assert!(
612                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
613                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
614            );
615
616            let token = traces.to_drop().clone();
617
618            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
619                &self.scope.parent,
620                &format!("Index({}, {:?})", idx.on_id, idx.key),
621                self.as_of_frontier.clone(),
622                self.until.clone(),
623            );
624
625            oks.stream = oks.stream.probe_with(&input_probe);
626
627            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
628                &self.scope.parent,
629                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
630                self.as_of_frontier.clone(),
631                self.until.clone(),
632            );
633
634            let bundle = match snapshot_mode {
635                SnapshotMode::Include => {
636                    let ok_arranged = oks
637                        .enter(&self.scope)
638                        .with_start_signal(start_signal.clone());
639                    let err_arranged = err_arranged
640                        .enter(&self.scope)
641                        .with_start_signal(start_signal);
642                    CollectionBundle::from_expressions(
643                        idx.key.clone(),
644                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
645                    )
646                }
647                SnapshotMode::Exclude => {
648                    // When we import an index without a snapshot, we have two balancing considerations:
649                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
650                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
651                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
652                    //   (joins, reduces, etc.) also require the snapshot data.
653                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
654                    let oks = {
655                        let mut datums = DatumVec::new();
656                        let (permutation, _thinning) =
657                            permutation_for_arrangement(&idx.key, typ.arity());
658                        self.import_filtered_index_collection(
659                            oks,
660                            start_signal.clone(),
661                            move |k: DatumSeq, v: DatumSeq| {
662                                let mut datums_borrow = datums.borrow();
663                                datums_borrow.extend(k);
664                                datums_borrow.extend(v);
665                                SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
666                            },
667                        )
668                    };
669                    let errs = self.import_filtered_index_collection(
670                        err_arranged,
671                        start_signal,
672                        |e, _| e.clone(),
673                    );
674                    CollectionBundle::from_collections(oks, errs)
675                }
676            };
677            self.update_id(Id::Global(idx.on_id), bundle);
678            tokens.insert(
679                idx_id,
680                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
681            );
682        } else {
683            panic!(
684                "import of index {} failed while building dataflow {}",
685                idx_id, self.dataflow_id
686            );
687        }
688    }
689}
690
691// This implementation block requires the scopes have the same timestamp as the trace manager.
692// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
693impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
694where
695    G: Scope<Timestamp = mz_repr::Timestamp>,
696{
697    pub(crate) fn export_index(
698        &self,
699        compute_state: &mut ComputeState,
700        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
701        dependency_ids: BTreeSet<GlobalId>,
702        idx_id: GlobalId,
703        idx: &IndexDesc,
704        output_probe: &MzProbeHandle<G::Timestamp>,
705    ) {
706        // put together tokens that belong to the export
707        let mut needed_tokens = Vec::new();
708        for dep_id in dependency_ids {
709            if let Some(token) = tokens.get(&dep_id) {
710                needed_tokens.push(Rc::clone(token));
711            }
712        }
713        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
714            panic!(
715                "Arrangement alarmingly absent! id: {:?}",
716                Id::Global(idx_id)
717            )
718        });
719
720        match bundle.arrangement(&idx.key) {
721            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
722                // Ensure that the frontier does not advance past the expiration time, if set.
723                // Otherwise, we might write down incorrect data.
724                if let Some(&expiration) = self.dataflow_expiration.as_option() {
725                    oks.stream = oks.stream.expire_stream_at(
726                        &format!("{}_export_index_oks", self.debug_name),
727                        expiration,
728                    );
729                    errs.stream = errs.stream.expire_stream_at(
730                        &format!("{}_export_index_errs", self.debug_name),
731                        expiration,
732                    );
733                }
734
735                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
736
737                // Attach logging of dataflow errors.
738                if let Some(logger) = compute_state.compute_logger.clone() {
739                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
740                }
741
742                compute_state.traces.set(
743                    idx_id,
744                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
745                );
746            }
747            Some(ArrangementFlavor::Trace(gid, _, _)) => {
748                // Duplicate of existing arrangement with id `gid`, so
749                // just create another handle to that arrangement.
750                let trace = compute_state.traces.get(&gid).unwrap().clone();
751                compute_state.traces.set(idx_id, trace);
752            }
753            None => {
754                println!("collection available: {:?}", bundle.collection.is_none());
755                println!(
756                    "keys available: {:?}",
757                    bundle.arranged.keys().collect::<Vec<_>>()
758                );
759                panic!(
760                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
761                    Id::Global(idx_id),
762                    &idx.key
763                );
764            }
765        };
766    }
767}
768
769// This implementation block requires the scopes have the same timestamp as the trace manager.
770// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
771impl<'g, G, T> Context<Child<'g, G, T>>
772where
773    G: Scope<Timestamp = mz_repr::Timestamp>,
774    T: RenderTimestamp,
775{
776    pub(crate) fn export_index_iterative(
777        &self,
778        compute_state: &mut ComputeState,
779        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
780        dependency_ids: BTreeSet<GlobalId>,
781        idx_id: GlobalId,
782        idx: &IndexDesc,
783        output_probe: &MzProbeHandle<G::Timestamp>,
784    ) {
785        // put together tokens that belong to the export
786        let mut needed_tokens = Vec::new();
787        for dep_id in dependency_ids {
788            if let Some(token) = tokens.get(&dep_id) {
789                needed_tokens.push(Rc::clone(token));
790            }
791        }
792        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
793            panic!(
794                "Arrangement alarmingly absent! id: {:?}",
795                Id::Global(idx_id)
796            )
797        });
798
799        match bundle.arrangement(&idx.key) {
800            Some(ArrangementFlavor::Local(oks, errs)) => {
801                // TODO: The following as_collection/leave/arrange sequence could be optimized.
802                //   * Combine as_collection and leave into a single function.
803                //   * Use columnar to extract columns from the batches to implement leave.
804                let mut oks = oks
805                    .as_collection(|k, v| (k.to_row(), v.to_row()))
806                    .leave()
807                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
808                    "Arrange export iterative",
809                );
810
811                let mut errs = errs
812                    .as_collection(|k, v| (k.clone(), v.clone()))
813                    .leave()
814                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
815                        "Arrange export iterative err",
816                    );
817
818                // Ensure that the frontier does not advance past the expiration time, if set.
819                // Otherwise, we might write down incorrect data.
820                if let Some(&expiration) = self.dataflow_expiration.as_option() {
821                    oks.stream = oks.stream.expire_stream_at(
822                        &format!("{}_export_index_iterative_oks", self.debug_name),
823                        expiration,
824                    );
825                    errs.stream = errs.stream.expire_stream_at(
826                        &format!("{}_export_index_iterative_err", self.debug_name),
827                        expiration,
828                    );
829                }
830
831                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
832
833                // Attach logging of dataflow errors.
834                if let Some(logger) = compute_state.compute_logger.clone() {
835                    errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
836                }
837
838                compute_state.traces.set(
839                    idx_id,
840                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
841                );
842            }
843            Some(ArrangementFlavor::Trace(gid, _, _)) => {
844                // Duplicate of existing arrangement with id `gid`, so
845                // just create another handle to that arrangement.
846                let trace = compute_state.traces.get(&gid).unwrap().clone();
847                compute_state.traces.set(idx_id, trace);
848            }
849            None => {
850                println!("collection available: {:?}", bundle.collection.is_none());
851                println!(
852                    "keys available: {:?}",
853                    bundle.arranged.keys().collect::<Vec<_>>()
854                );
855                panic!(
856                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
857                    Id::Global(idx_id),
858                    &idx.key
859                );
860            }
861        };
862    }
863}
864
865/// Information about bindings, tracked in `render_recursive_plan` and
866/// `render_plan`, to be passed to `render_letfree_plan`.
867///
868/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
869/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
870enum BindingInfo {
871    Body { in_let: bool },
872    Let { id: LocalId, last: bool },
873    LetRec { id: LocalId, last: bool },
874}
875
876impl<G> Context<G>
877where
878    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
879{
880    /// Renders a plan to a differential dataflow, producing the collection of results.
881    ///
882    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
883    /// in the context of `level` pre-existing iteration coordinates.
884    ///
885    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
886    /// and establishing the appropriate recursive dependencies among the bound variables.
887    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
888    /// further [`RecBind`]s are found.
889    ///
890    /// The method requires that all variables conclude with a physical representation that
891    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
892    fn render_recursive_plan(
893        &mut self,
894        object_id: GlobalId,
895        level: usize,
896        plan: RenderPlan,
897        binding: BindingInfo,
898    ) -> CollectionBundle<G> {
899        for BindStage { lets, recs } in plan.binds {
900            // Render the let bindings in order.
901            let mut let_iter = lets.into_iter().peekable();
902            while let Some(LetBind { id, value }) = let_iter.next() {
903                let bundle =
904                    self.scope
905                        .clone()
906                        .region_named(&format!("Binding({:?})", id), |region| {
907                            let depends = value.depends();
908                            let last = let_iter.peek().is_none();
909                            let binding = BindingInfo::Let { id, last };
910                            self.enter_region(region, Some(&depends))
911                                .render_letfree_plan(object_id, value, binding)
912                                .leave_region()
913                        });
914                self.insert_id(Id::Local(id), bundle);
915            }
916
917            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
918
919            // Define variables for rec bindings.
920            // It is important that we only use the `Variable` until the object is bound.
921            // At that point, all subsequent uses should have access to the object itself.
922            let mut variables = BTreeMap::new();
923            for id in rec_ids.iter() {
924                use differential_dataflow::dynamic::feedback_summary;
925                let inner = feedback_summary::<u64>(level + 1, 1);
926                let (oks_v, oks_collection) = Variable::new(
927                    &mut self.scope,
928                    Product::new(Default::default(), inner.clone()),
929                );
930                let (err_v, err_collection) =
931                    Variable::new(&mut self.scope, Product::new(Default::default(), inner));
932
933                self.insert_id(
934                    Id::Local(*id),
935                    CollectionBundle::from_collections(oks_collection, err_collection),
936                );
937                variables.insert(Id::Local(*id), (oks_v, err_v));
938            }
939            // Now render each of the rec bindings.
940            let mut rec_iter = recs.into_iter().peekable();
941            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
942                let last = rec_iter.peek().is_none();
943                let binding = BindingInfo::LetRec { id, last };
944                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
945                // We need to ensure that the raw collection exists, but do not have enough information
946                // here to cause that to happen.
947                let (oks, mut err) = bundle.collection.clone().unwrap();
948                self.insert_id(Id::Local(id), bundle);
949                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
950
951                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
952                let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
953                    oks,
954                    "LetRecConsolidation",
955                );
956
957                if let Some(limit) = limit {
958                    // We swallow the results of the `max_iter`th iteration, because
959                    // these results would go into the `max_iter + 1`th iteration.
960                    let (in_limit, over_limit) =
961                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
962                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
963                            let iteration_index = *ps.get(level).unwrap_or(&0);
964                            // The pointstamp starts counting from 0, so we need to add 1.
965                            iteration_index + 1 >= limit.max_iters.into()
966                        });
967                    oks = VecCollection::new(in_limit);
968                    if !limit.return_at_limit {
969                        err = err.concat(VecCollection::new(over_limit).map(move |_data| {
970                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
971                                format!("{}", limit.max_iters.get()).into(),
972                            )))
973                        }));
974                    }
975                }
976
977                // Set err variable to the distinct elements of `err`.
978                // Distinctness is important, as we otherwise might add the same error each iteration,
979                // say if the limit of `oks` has an error. This would result in non-terminating rather
980                // than a clean report of the error. The trade-off is that we lose information about
981                // multiplicities of errors, but .. this seems to be the better call.
982                let err: KeyCollection<_, _, _> = err.into();
983                let errs = err
984                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
985                        "Arrange recursive err",
986                    )
987                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
988                        "Distinct recursive err",
989                        move |_k, _s, t| t.push(((), Diff::ONE)),
990                    )
991                    .as_collection(|k, _| k.clone());
992
993                oks_v.set(oks);
994                err_v.set(errs);
995            }
996            // Now extract each of the rec bindings into the outer scope.
997            for id in rec_ids.into_iter() {
998                let bundle = self.remove_id(Id::Local(id)).unwrap();
999                let (oks, err) = bundle.collection.unwrap();
1000                self.insert_id(
1001                    Id::Local(id),
1002                    CollectionBundle::from_collections(
1003                        oks.leave_dynamic(level + 1),
1004                        err.leave_dynamic(level + 1),
1005                    ),
1006                );
1007            }
1008        }
1009
1010        self.render_letfree_plan(object_id, plan.body, binding)
1011    }
1012}
1013
1014impl<G> Context<G>
1015where
1016    G: Scope,
1017    G::Timestamp: RenderTimestamp,
1018{
1019    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
1020    /// results.
1021    ///
1022    /// The return type reflects the uncertainty about the data representation, perhaps
1023    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
1024    ///
1025    /// # Panics
1026    ///
1027    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
1028    /// `render_recursive_plan` instead.
1029    fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
1030        let mut in_let = false;
1031        for BindStage { lets, recs } in plan.binds {
1032            assert!(recs.is_empty());
1033
1034            let mut let_iter = lets.into_iter().peekable();
1035            while let Some(LetBind { id, value }) = let_iter.next() {
1036                // if we encounter a single let, the body is in a let
1037                in_let = true;
1038                let bundle =
1039                    self.scope
1040                        .clone()
1041                        .region_named(&format!("Binding({:?})", id), |region| {
1042                            let depends = value.depends();
1043                            let last = let_iter.peek().is_none();
1044                            let binding = BindingInfo::Let { id, last };
1045                            self.enter_region(region, Some(&depends))
1046                                .render_letfree_plan(object_id, value, binding)
1047                                .leave_region()
1048                        });
1049                self.insert_id(Id::Local(id), bundle);
1050            }
1051        }
1052
1053        self.scope.clone().region_named("Main Body", |region| {
1054            let depends = plan.body.depends();
1055            self.enter_region(region, Some(&depends))
1056                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1057                .leave_region()
1058        })
1059    }
1060
1061    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1062    fn render_letfree_plan(
1063        &mut self,
1064        object_id: GlobalId,
1065        plan: LetFreePlan,
1066        binding: BindingInfo,
1067    ) -> CollectionBundle<G> {
1068        let (mut nodes, root_id, topological_order) = plan.destruct();
1069
1070        // Rendered collections by their `LirId`.
1071        let mut collections = BTreeMap::new();
1072
1073        // Mappings to send along.
1074        // To save overhead, we'll only compute mappings when we need to,
1075        // which means things get gated behind options. Unfortunately, that means we
1076        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1077        // but there's no convenient way to express the invariant.
1078        let should_compute_lir_metadata = self.compute_logger.is_some();
1079        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1080            Some(Vec::with_capacity(nodes.len()))
1081        } else {
1082            None
1083        };
1084
1085        let mut topo_iter = topological_order.into_iter().peekable();
1086        while let Some(lir_id) = topo_iter.next() {
1087            let node = nodes.remove(&lir_id).unwrap();
1088
1089            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1090            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1091            // in some other structure and have that structure impl ExprHumanizer
1092            let metadata = if should_compute_lir_metadata {
1093                let operator = node.expr.humanize(&DummyHumanizer);
1094
1095                // mark the last operator in topo order with any binding decoration
1096                let operator = if topo_iter.peek().is_none() {
1097                    match &binding {
1098                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1099                        BindingInfo::Body { in_let: false } => operator,
1100                        BindingInfo::Let { id, last: true } => {
1101                            format!("With {id} = {operator}")
1102                        }
1103                        BindingInfo::Let { id, last: false } => {
1104                            format!("{id} = {operator}")
1105                        }
1106                        BindingInfo::LetRec { id, last: true } => {
1107                            format!("With Recursive {id} = {operator}")
1108                        }
1109                        BindingInfo::LetRec { id, last: false } => {
1110                            format!("{id} = {operator}")
1111                        }
1112                    }
1113                } else {
1114                    operator
1115                };
1116
1117                let operator_id_start = self.scope.peek_identifier();
1118                Some((operator, operator_id_start))
1119            } else {
1120                None
1121            };
1122
1123            let mut bundle = self.render_plan_expr(node.expr, &collections);
1124
1125            if let Some((operator, operator_id_start)) = metadata {
1126                let operator_id_end = self.scope.peek_identifier();
1127                let operator_span = (operator_id_start, operator_id_end);
1128
1129                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1130                    lir_mapping_metadata.push((
1131                        lir_id,
1132                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1133                    ))
1134                }
1135            }
1136
1137            self.log_operator_hydration(&mut bundle, lir_id);
1138
1139            collections.insert(lir_id, bundle);
1140        }
1141
1142        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1143            self.log_lir_mapping(object_id, lir_mapping_metadata);
1144        }
1145
1146        collections
1147            .remove(&root_id)
1148            .expect("LetFreePlan invariant (1)")
1149    }
1150
1151    /// Renders a [`render_plan::Expr`], producing the collection of results.
1152    ///
1153    /// # Panics
1154    ///
1155    /// Panics if any of the expr's inputs is not found in `collections`.
1156    /// Callers must ensure that input nodes have been rendered previously.
1157    fn render_plan_expr(
1158        &mut self,
1159        expr: render_plan::Expr,
1160        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1161    ) -> CollectionBundle<G> {
1162        use render_plan::Expr::*;
1163
1164        let expect_input = |id| {
1165            collections
1166                .get(&id)
1167                .cloned()
1168                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1169        };
1170
1171        match expr {
1172            Constant { rows } => {
1173                // Produce both rows and errs to avoid conditional dataflow construction.
1174                let (rows, errs) = match rows {
1175                    Ok(rows) => (rows, Vec::new()),
1176                    Err(e) => (Vec::new(), vec![e]),
1177                };
1178
1179                // We should advance times in constant collections to start from `as_of`.
1180                let as_of_frontier = self.as_of_frontier.clone();
1181                let until = self.until.clone();
1182                let ok_collection = rows
1183                    .into_iter()
1184                    .filter_map(move |(row, mut time, diff)| {
1185                        time.advance_by(as_of_frontier.borrow());
1186                        if !until.less_equal(&time) {
1187                            Some((
1188                                row,
1189                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1190                                diff,
1191                            ))
1192                        } else {
1193                            None
1194                        }
1195                    })
1196                    .to_stream(&mut self.scope)
1197                    .as_collection();
1198
1199                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1200                error_time.advance_by(self.as_of_frontier.borrow());
1201                let err_collection = errs
1202                    .into_iter()
1203                    .map(move |e| {
1204                        (
1205                            DataflowError::from(e),
1206                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1207                            Diff::ONE,
1208                        )
1209                    })
1210                    .to_stream(&mut self.scope)
1211                    .as_collection();
1212
1213                CollectionBundle::from_collections(ok_collection, err_collection)
1214            }
1215            Get { id, keys, plan } => {
1216                // Recover the collection from `self` and then apply `mfp` to it.
1217                // If `mfp` happens to be trivial, we can just return the collection.
1218                let mut collection = self
1219                    .lookup_id(id)
1220                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1221                match plan {
1222                    mz_compute_types::plan::GetPlan::PassArrangements => {
1223                        // Assert that each of `keys` are present in `collection`.
1224                        assert!(
1225                            keys.arranged
1226                                .iter()
1227                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1228                        );
1229                        assert!(keys.raw <= collection.collection.is_some());
1230                        // Retain only those keys we want to import.
1231                        collection.arranged.retain(|key, _value| {
1232                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1233                        });
1234                        collection
1235                    }
1236                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1237                        let (oks, errs) = collection.as_collection_core(
1238                            mfp,
1239                            Some((key, row)),
1240                            self.until.clone(),
1241                            &self.config_set,
1242                        );
1243                        CollectionBundle::from_collections(oks, errs)
1244                    }
1245                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1246                        let (oks, errs) = collection.as_collection_core(
1247                            mfp,
1248                            None,
1249                            self.until.clone(),
1250                            &self.config_set,
1251                        );
1252                        CollectionBundle::from_collections(oks, errs)
1253                    }
1254                }
1255            }
1256            Mfp {
1257                input,
1258                mfp,
1259                input_key_val,
1260            } => {
1261                let input = expect_input(input);
1262                // If `mfp` is non-trivial, we should apply it and produce a collection.
1263                if mfp.is_identity() {
1264                    input
1265                } else {
1266                    let (oks, errs) = input.as_collection_core(
1267                        mfp,
1268                        input_key_val,
1269                        self.until.clone(),
1270                        &self.config_set,
1271                    );
1272                    CollectionBundle::from_collections(oks, errs)
1273                }
1274            }
1275            FlatMap {
1276                input_key,
1277                input,
1278                exprs,
1279                func,
1280                mfp_after: mfp,
1281            } => {
1282                let input = expect_input(input);
1283                self.render_flat_map(input_key, input, exprs, func, mfp)
1284            }
1285            Join { inputs, plan } => {
1286                let inputs = inputs.into_iter().map(expect_input).collect();
1287                match plan {
1288                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1289                        self.render_join(inputs, linear_plan)
1290                    }
1291                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1292                        self.render_delta_join(inputs, delta_plan)
1293                    }
1294                }
1295            }
1296            Reduce {
1297                input_key,
1298                input,
1299                key_val_plan,
1300                plan,
1301                mfp_after,
1302            } => {
1303                let input = expect_input(input);
1304                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1305                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1306            }
1307            TopK { input, top_k_plan } => {
1308                let input = expect_input(input);
1309                self.render_topk(input, top_k_plan)
1310            }
1311            Negate { input } => {
1312                let input = expect_input(input);
1313                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1314                CollectionBundle::from_collections(oks.negate(), errs)
1315            }
1316            Threshold {
1317                input,
1318                threshold_plan,
1319            } => {
1320                let input = expect_input(input);
1321                self.render_threshold(input, threshold_plan)
1322            }
1323            Union {
1324                inputs,
1325                consolidate_output,
1326            } => {
1327                let mut oks = Vec::new();
1328                let mut errs = Vec::new();
1329                for input in inputs.into_iter() {
1330                    let (os, es) =
1331                        expect_input(input).as_specific_collection(None, &self.config_set);
1332                    oks.push(os);
1333                    errs.push(es);
1334                }
1335                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1336                if consolidate_output {
1337                    oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1338                        oks,
1339                        "UnionConsolidation",
1340                    )
1341                }
1342                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1343                CollectionBundle::from_collections(oks, errs)
1344            }
1345            ArrangeBy {
1346                input_key,
1347                input,
1348                input_mfp,
1349                forms: keys,
1350            } => {
1351                let input = expect_input(input);
1352                input.ensure_collections(
1353                    keys,
1354                    input_key,
1355                    input_mfp,
1356                    self.until.clone(),
1357                    &self.config_set,
1358                )
1359            }
1360        }
1361    }
1362
1363    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1364        if let Some(logger) = &self.compute_logger {
1365            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1366                dataflow_index,
1367                global_id,
1368            }));
1369        }
1370    }
1371
1372    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1373        if let Some(logger) = &self.compute_logger {
1374            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1375        }
1376    }
1377
1378    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1379        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1380        // which we should attach the logging operator.
1381        //
1382        // We could attach to each collection and track the lower bound of output frontiers.
1383        // However, that would be of limited use because we expect all collections to hydrate at
1384        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1385        // frontier of the unarranged collection advance, it will perform all work necessary to
1386        // also advance its own frontier. We don't expect significant delays between frontier
1387        // advancements of the unarranged and arranged collections, so attaching the logging
1388        // operator to any one of them should produce accurate results.
1389        //
1390        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1391        // beneficial to attach the logging operator to one of the arranged representation to avoid
1392        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1393        // if we attached the logging operator to it, we would introduce a fork in its output
1394        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1395        // that the output streams of the arrangements don't yet feed into anything else, so
1396        // attaching a (pass-through) logging operator does not introduce a fork.
1397
1398        match bundle.arranged.values_mut().next() {
1399            Some(arrangement) => {
1400                use ArrangementFlavor::*;
1401
1402                match arrangement {
1403                    Local(a, _) => {
1404                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1405                    }
1406                    Trace(_, a, _) => {
1407                        a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1408                    }
1409                }
1410            }
1411            None => {
1412                let (oks, _) = bundle
1413                    .collection
1414                    .as_mut()
1415                    .expect("CollectionBundle invariant");
1416                let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1417                *oks = stream.as_collection();
1418            }
1419        }
1420    }
1421
1422    fn log_operator_hydration_inner<D>(&self, stream: Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1423    where
1424        D: timely::Container + Clone + 'static,
1425    {
1426        let Some(logger) = self.compute_logger.clone() else {
1427            return stream.clone(); // hydration logging disabled
1428        };
1429
1430        let export_ids = self.export_ids.clone();
1431
1432        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1433        //
1434        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1435        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1436        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1437        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1438        // instead.
1439        let mut hydration_frontier = Antichain::new();
1440        for time in self.as_of_frontier.iter() {
1441            if let Some(time) = time.try_step_forward() {
1442                hydration_frontier.insert(Refines::to_inner(time));
1443            }
1444        }
1445
1446        let name = format!("LogOperatorHydration ({lir_id})");
1447        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1448            let mut hydrated = false;
1449
1450            for &export_id in &export_ids {
1451                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1452                    export_id,
1453                    lir_id,
1454                    hydrated,
1455                }));
1456            }
1457
1458            move |(input, frontier), output| {
1459                // Pass through inputs.
1460                input.for_each(|cap, data| {
1461                    output.session(&cap).give_container(data);
1462                });
1463
1464                if hydrated {
1465                    return;
1466                }
1467
1468                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1469                    hydrated = true;
1470
1471                    for &export_id in &export_ids {
1472                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1473                            export_id,
1474                            lir_id,
1475                            hydrated,
1476                        }));
1477                    }
1478                }
1479            }
1480        })
1481    }
1482}
1483
1484#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1485/// A timestamp type that can be used for operations within MZ's dataflow layer.
1486pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1487    /// The system timestamp component of the timestamp.
1488    ///
1489    /// This is useful for manipulating the system time, as when delaying
1490    /// updates for subsequent cancellation, as with monotonic reduction.
1491    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1492    /// Effects a system delay in terms of the timestamp summary.
1493    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1494    /// The event timestamp component of the timestamp.
1495    fn event_time(&self) -> mz_repr::Timestamp;
1496    /// The event timestamp component of the timestamp, as a mutable reference.
1497    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1498    /// Effects an event delay in terms of the timestamp summary.
1499    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1500    /// Steps the timestamp back so that logical compaction to the output will
1501    /// not conflate `self` with any historical times.
1502    fn step_back(&self) -> Self;
1503}
1504
1505impl RenderTimestamp for mz_repr::Timestamp {
1506    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1507        self
1508    }
1509    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1510        delay
1511    }
1512    fn event_time(&self) -> mz_repr::Timestamp {
1513        *self
1514    }
1515    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1516        self
1517    }
1518    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1519        delay
1520    }
1521    fn step_back(&self) -> Self {
1522        self.saturating_sub(1)
1523    }
1524}
1525
1526impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1527    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1528        &mut self.outer
1529    }
1530    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1531        Product::new(delay, Default::default())
1532    }
1533    fn event_time(&self) -> mz_repr::Timestamp {
1534        self.outer
1535    }
1536    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1537        &mut self.outer
1538    }
1539    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1540        Product::new(delay, Default::default())
1541    }
1542    fn step_back(&self) -> Self {
1543        // It is necessary to step back both coordinates of a product,
1544        // and when one is a `PointStamp` that also means all coordinates
1545        // of the pointstamp.
1546        let inner = self.inner.clone();
1547        let mut vec = inner.into_inner();
1548        for item in vec.iter_mut() {
1549            *item = item.saturating_sub(1);
1550        }
1551        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1552    }
1553}
1554
1555/// A signal that can be awaited by operators to suspend them prior to startup.
1556///
1557/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1558///
1559/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1560///
1561///  * Async operators can simply `await` it.
1562///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1563///    and then check `StartSignal::has_fired()` on each activation.
1564#[derive(Clone)]
1565pub(crate) struct StartSignal {
1566    /// A future that completes when the signal fires.
1567    ///
1568    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1569    /// signal is activated by dropping the corresponding `Sender`.
1570    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1571    /// A weak reference to the token, to register drop-on-fire values.
1572    token_ref: Weak<RefCell<Box<dyn Any>>>,
1573}
1574
1575impl StartSignal {
1576    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1577    /// dropped.
1578    pub fn new() -> (Self, Rc<dyn Any>) {
1579        let (tx, rx) = oneshot::channel::<Infallible>();
1580        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1581        let signal = Self {
1582            fut: rx.shared(),
1583            token_ref: Rc::downgrade(&token),
1584        };
1585        (signal, token)
1586    }
1587
1588    pub fn has_fired(&self) -> bool {
1589        self.token_ref.strong_count() == 0
1590    }
1591
1592    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1593        if let Some(token) = self.token_ref.upgrade() {
1594            let mut token = token.borrow_mut();
1595            let inner = std::mem::replace(&mut *token, Box::new(()));
1596            *token = Box::new((inner, to_drop));
1597        }
1598    }
1599}
1600
1601impl Future for StartSignal {
1602    type Output = ();
1603
1604    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1605        self.fut.poll_unpin(cx).map(|_| ())
1606    }
1607}
1608
1609/// Extension trait to attach a `StartSignal` to operator outputs.
1610pub(crate) trait WithStartSignal {
1611    /// Delays data and progress updates until the start signal has fired.
1612    ///
1613    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1614    /// depending on the amount and shape of its inputs.
1615    fn with_start_signal(self, signal: StartSignal) -> Self;
1616}
1617
1618impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1619where
1620    S: Scope,
1621    S::Timestamp: RenderTimestamp,
1622    Tr: TraceReader + Clone,
1623{
1624    fn with_start_signal(self, signal: StartSignal) -> Self {
1625        Arranged {
1626            stream: self.stream.with_start_signal(signal),
1627            trace: self.trace,
1628        }
1629    }
1630}
1631
1632impl<S, D> WithStartSignal for Stream<S, D>
1633where
1634    S: Scope,
1635    D: timely::Container + Clone + 'static,
1636{
1637    fn with_start_signal(self, signal: StartSignal) -> Self {
1638        let activations = self.scope().activations();
1639        self.unary(Pipeline, "StartSignal", |_cap, info| {
1640            let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1641            signal.drop_on_fire(token);
1642
1643            let mut stash = Vec::new();
1644
1645            move |input, output| {
1646                // Stash incoming updates as long as the start signal has not fired.
1647                if !signal.has_fired() {
1648                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1649                    return;
1650                }
1651
1652                // Release any data we might still have stashed.
1653                for (cap, mut data) in std::mem::take(&mut stash) {
1654                    output.session(&cap).give_container(&mut data);
1655                }
1656
1657                // Pass through all remaining input data.
1658                input.for_each(|cap, data| {
1659                    output.session(&cap).give_container(data);
1660                });
1661            }
1662        })
1663    }
1664}
1665
1666/// Suppress progress messages for times before the given `as_of`.
1667///
1668/// This operator exists specifically to work around a memory spike we'd otherwise see when
1669/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1670/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1671/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1672/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1673/// the snapshot batch.
1674///
1675/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1676/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1677/// beyond the `as_of`, so the snapshot data has already been collected.
1678///
1679/// To ensure this, this operator needs to take two measures:
1680///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1681///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1682///
1683/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1684/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1685/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1686/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1687fn suppress_early_progress<G, D>(
1688    stream: Stream<G, D>,
1689    as_of: Antichain<G::Timestamp>,
1690) -> Stream<G, D>
1691where
1692    G: Scope,
1693    D: Data + timely::Container,
1694{
1695    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1696        let mut early_cap = Some(default_cap);
1697
1698        move |(input, frontier), output| {
1699            input.for_each_time(|data_cap, data| {
1700                if as_of.less_than(data_cap.time()) {
1701                    let mut session = output.session(&data_cap);
1702                    for data in data {
1703                        session.give_container(data);
1704                    }
1705                } else {
1706                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1707                    let mut session = output.session(&cap);
1708                    for data in data {
1709                        session.give_container(data);
1710                    }
1711                }
1712            });
1713
1714            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1715                early_cap.take();
1716            }
1717        }
1718    })
1719}
1720
1721/// Extension trait for [`Stream`] to selectively limit progress.
1722trait LimitProgress<T: Timestamp> {
1723    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1724    /// the implementation to observe times in data, and release capabilities based on the probe's
1725    /// frontier, after applying `slack` to round up timestamps.
1726    ///
1727    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1728    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1729    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1730    /// for times before the `upper`, which is the time until which the source can distinguish
1731    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1732    /// our capability.
1733    ///
1734    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1735    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1736    ///
1737    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1738    /// allow downstream operators to release resources.
1739    ///
1740    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1741    /// avoid unbounded memory usage.
1742    ///
1743    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1744    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1745    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1746    /// * `name` is a human-readable name for the operator.
1747    /// * `limit` is the maximum number of pending times to keep around.
1748    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1749    ///   retain a capability.
1750    fn limit_progress(
1751        self,
1752        handle: MzProbeHandle<T>,
1753        slack_ms: u64,
1754        limit: Option<usize>,
1755        upper: Antichain<T>,
1756        name: String,
1757    ) -> Self;
1758}
1759
1760// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1761// number.
1762impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamVec<G, (D, mz_repr::Timestamp, R)>
1763where
1764    G: Scope<Timestamp = mz_repr::Timestamp>,
1765    D: Clone + 'static,
1766    R: Clone + 'static,
1767{
1768    fn limit_progress(
1769        self,
1770        handle: MzProbeHandle<mz_repr::Timestamp>,
1771        slack_ms: u64,
1772        limit: Option<usize>,
1773        upper: Antichain<mz_repr::Timestamp>,
1774        name: String,
1775    ) -> Self {
1776        let scope = self.scope();
1777        let stream =
1778            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1779                // Times that we've observed on our input.
1780                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1781                // Capability for the lower bound of `pending_times`, if any.
1782                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1783
1784                let activator = scope.activator_for(info.address);
1785                handle.activate(activator.clone());
1786
1787                move |(input, frontier), output| {
1788                    input.for_each(|cap, data| {
1789                        for time in data
1790                            .iter()
1791                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1792                        {
1793                            let rounded_time =
1794                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1795                            if !upper.less_than(&rounded_time.into()) {
1796                                pending_times.insert(rounded_time.into());
1797                            }
1798                        }
1799                        output.session(&cap).give_container(data);
1800                        if retained_cap.as_ref().is_none_or(|c| {
1801                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1802                        }) {
1803                            retained_cap = Some(cap.retain(0));
1804                        }
1805                    });
1806
1807                    handle.with_frontier(|f| {
1808                        while pending_times
1809                            .first()
1810                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1811                        {
1812                            let _ = pending_times.pop_first();
1813                        }
1814                    });
1815
1816                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1817                        let _ = pending_times.pop_first();
1818                    }
1819
1820                    match (retained_cap.as_mut(), pending_times.first()) {
1821                        (Some(cap), Some(first)) => cap.downgrade(first),
1822                        (_, None) => retained_cap = None,
1823                        _ => {}
1824                    }
1825
1826                    if frontier.is_empty() {
1827                        retained_cap = None;
1828                        pending_times.clear();
1829                    }
1830
1831                    if !pending_times.is_empty() {
1832                        tracing::debug!(
1833                            name,
1834                            info.global_id,
1835                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1836                            frontier = ?frontier.frontier().get(0),
1837                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1838                            ?upper,
1839                            "pending times",
1840                        );
1841                    }
1842                }
1843            });
1844        stream
1845    }
1846}
1847
1848/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1849/// the difference between timestamps.
1850struct PendingTimesDisplay<T>(T);
1851
1852impl<T> std::fmt::Display for PendingTimesDisplay<T>
1853where
1854    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1855{
1856    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1857        let mut iter = self.0.clone().into_iter();
1858        write!(f, "[")?;
1859        if let Some(first) = iter.next() {
1860            write!(f, "{}", first)?;
1861            let mut last = u64::from(first);
1862            for time in iter {
1863                write!(f, ", +{}", u64::from(time) - last)?;
1864                last = u64::from(time);
1865            }
1866        }
1867        write!(f, "]")?;
1868        Ok(())
1869    }
1870}
1871
1872/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1873/// into two rows, given the arity of the first component.
1874#[derive(Clone, Copy, Debug)]
1875struct Pairer {
1876    split_arity: usize,
1877}
1878
1879impl Pairer {
1880    /// Creates a pairer with knowledge of the arity of first component in the pair.
1881    fn new(split_arity: usize) -> Self {
1882        Self { split_arity }
1883    }
1884
1885    /// Merges a pair of datum iterators creating a `Row` instance.
1886    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1887    where
1888        I1: IntoIterator<Item = Datum<'a>>,
1889        I2: IntoIterator<Item = Datum<'a>>,
1890    {
1891        SharedRow::pack(first.into_iter().chain(second))
1892    }
1893
1894    /// Splits a datum iterator into a pair of `Row` instances.
1895    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1896        let mut datum_iter = datum_iter.into_iter();
1897        let mut row_builder = SharedRow::get();
1898        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1899        let second = row_builder.pack_using(datum_iter);
1900        (first, second)
1901    }
1902}