Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, SharedRow, SqlRelationType};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use mz_timely_util::scope_label::ScopeExt;
141use timely::PartialOrder;
142use timely::communication::Allocate;
143use timely::container::CapacityContainerBuilder;
144use timely::dataflow::channels::pact::Pipeline;
145use timely::dataflow::operators::to_stream::ToStream;
146use timely::dataflow::operators::{BranchWhen, Capability, Filter, Operator, Probe, probe};
147use timely::dataflow::scopes::Child;
148use timely::dataflow::{Scope, Stream, StreamCore};
149use timely::order::{Product, TotalOrder};
150use timely::progress::timestamp::Refines;
151use timely::progress::{Antichain, Timestamp};
152use timely::scheduling::ActivateOnDrop;
153use timely::worker::{AsWorker, Worker as TimelyWorker};
154
155use crate::arrangement::manager::TraceBundle;
156use crate::compute_state::ComputeState;
157use crate::extensions::arrange::{KeyCollection, MzArrange};
158use crate::extensions::reduce::MzReduce;
159use crate::extensions::temporal_bucket::TemporalBucketing;
160use crate::logging::compute::{
161    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
162};
163use crate::render::context::{ArrangementFlavor, Context};
164use crate::render::continual_task::ContinualTaskCtx;
165use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
166use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
167
168pub mod context;
169pub(crate) mod continual_task;
170mod errors;
171mod flat_map;
172mod join;
173mod reduce;
174pub mod sinks;
175mod threshold;
176mod top_k;
177
178pub use context::CollectionBundle;
179pub use join::LinearJoinSpec;
180
181/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
182///
183/// This method imports sources from provided assets, and then builds the remaining
184/// dataflow using "compute-local" assets like shared arrangements, and producing
185/// both arrangements and sinks.
186pub fn build_compute_dataflow<A: Allocate>(
187    timely_worker: &mut TimelyWorker<A>,
188    compute_state: &mut ComputeState,
189    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
190    start_signal: StartSignal,
191    until: Antichain<mz_repr::Timestamp>,
192    dataflow_expiration: Antichain<mz_repr::Timestamp>,
193) {
194    // Mutually recursive view definitions require special handling.
195    let recursive = dataflow
196        .objects_to_build
197        .iter()
198        .any(|object| object.plan.is_recursive());
199
200    // Determine indexes to export, and their dependencies.
201    let indexes = dataflow
202        .index_exports
203        .iter()
204        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
205        .collect::<Vec<_>>();
206
207    // Determine sinks to export, and their dependencies.
208    let sinks = dataflow
209        .sink_exports
210        .iter()
211        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
212        .collect::<Vec<_>>();
213
214    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
215    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
216    let subscribe_snapshot_optimization =
217        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
218
219    let name = format!("Dataflow: {}", &dataflow.debug_name);
220    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
221    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
222
223    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
224        let scope = &mut scope.with_label();
225
226        // TODO(ct3): This should be a config of the source instead, but at least try
227        // to contain the hacks.
228        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
229
230        // The scope.clone() occurs to allow import in the region.
231        // We build a region here to establish a pattern of a scope inside the dataflow,
232        // so that other similar uses (e.g. with iterative scopes) do not require weird
233        // alternate type signatures.
234        let mut imported_sources = Vec::new();
235        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
236        let output_probe = MzProbeHandle::default();
237
238        scope.clone().region_named(&input_name, |region| {
239            // Import declared sources into the rendering context.
240            for (source_id, import) in dataflow.source_imports.iter() {
241                region.region_named(&format!("Source({:?})", source_id), |inner| {
242                    let mut read_schema = None;
243                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
244                        // If enabled, we read from Persist with a `RelationDesc` that
245                        // omits uneeded columns.
246                        if apply_demands {
247                            let demands = ops.demand();
248                            let new_desc = import
249                                .desc
250                                .storage_metadata
251                                .relation_desc
252                                .apply_demand(&demands);
253                            let new_arity = demands.len();
254                            let remap: BTreeMap<_, _> = demands
255                                .into_iter()
256                                .enumerate()
257                                .map(|(new, old)| (old, new))
258                                .collect();
259                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
260                            read_schema = Some(new_desc);
261                        }
262
263                        mz_expr::MfpPlan::create_from(ops)
264                            .expect("Linear operators should always be valid")
265                    });
266
267                    let mut snapshot_mode =
268                        if import.with_snapshot || !subscribe_snapshot_optimization {
269                            SnapshotMode::Include
270                        } else {
271                            compute_state.metrics.inc_subscribe_snapshot_optimization();
272                            SnapshotMode::Exclude
273                        };
274                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
275                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
276                    if let Some(x) = ct_source_transformer.as_ref() {
277                        snapshot_mode = x.snapshot_mode();
278                        suppress_early_progress_as_of = suppress_early_progress_as_of
279                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
280                    }
281
282                    // Note: For correctness, we require that sources only emit times advanced by
283                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
284                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
285                        inner,
286                        *source_id,
287                        Arc::clone(&compute_state.persist_clients),
288                        &compute_state.txns_ctx,
289                        import.desc.storage_metadata.clone(),
290                        read_schema,
291                        dataflow.as_of.clone(),
292                        snapshot_mode,
293                        until.clone(),
294                        mfp.as_mut(),
295                        compute_state.dataflow_max_inflight_bytes(),
296                        start_signal.clone(),
297                        ErrorHandler::Halt("compute_import"),
298                    );
299
300                    // If `mfp` is non-identity, we need to apply what remains.
301                    // For the moment, assert that it is either trivial or `None`.
302                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
303
304                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
305                    // ensure that the first frontier we report into the dataflow is beyond the
306                    // `as_of`.
307                    if let Some(as_of) = suppress_early_progress_as_of {
308                        ok_stream = suppress_early_progress(ok_stream, as_of);
309                    }
310
311                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
312                        // Apply logical backpressure to the source.
313                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
314                            .get(&compute_state.worker_config);
315                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
316                            .get(&compute_state.worker_config)
317                            .as_millis()
318                            .try_into()
319                            .expect("must fit");
320
321                        let stream = ok_stream.limit_progress(
322                            output_probe.clone(),
323                            slack,
324                            limit,
325                            import.upper.clone(),
326                            name.clone(),
327                        );
328                        ok_stream = stream;
329                    }
330
331                    // Attach a probe reporting the input frontier.
332                    let input_probe =
333                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
334                    ok_stream = ok_stream.probe_with(&input_probe);
335
336                    // The `suppress_early_progress` operator and the input
337                    // probe both want to work on the untransformed ct input,
338                    // make sure this stays after them.
339                    let (ok_stream, err_stream) = match ct_source_transformer {
340                        None => (ok_stream, err_stream),
341                        Some(ct_source_transformer) => {
342                            let (oks, errs, ct_times) = ct_source_transformer
343                                .transform(ok_stream.as_collection(), err_stream.as_collection());
344                            // TODO(ct3): Ideally this would be encapsulated by
345                            // ContinualTaskCtx, but the types are tricky.
346                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
347                            (oks.inner, errs.inner)
348                        }
349                    };
350
351                    let (oks, errs) = (
352                        ok_stream.as_collection().leave_region().leave_region(),
353                        err_stream.as_collection().leave_region().leave_region(),
354                    );
355
356                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
357
358                    // Associate returned tokens with the source identifier.
359                    tokens.insert(*source_id, Rc::new(token));
360                });
361            }
362        });
363
364        // If there exists a recursive expression, we'll need to use a non-region scope,
365        // in order to support additional timestamp coordinates for iteration.
366        if recursive {
367            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
368                let mut context = Context::for_dataflow_in(
369                    &dataflow,
370                    region.clone(),
371                    compute_state,
372                    until,
373                    dataflow_expiration,
374                );
375
376                for (id, (oks, errs)) in imported_sources.into_iter() {
377                    let bundle = crate::render::CollectionBundle::from_collections(
378                        oks.enter(region),
379                        errs.enter(region),
380                    );
381                    // Associate collection bundle with the source identifier.
382                    context.insert_id(id, bundle);
383                }
384
385                // Import declared indexes into the rendering context.
386                for (idx_id, idx) in &dataflow.index_imports {
387                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
388                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
389                        SnapshotMode::Include
390                    } else {
391                        compute_state.metrics.inc_subscribe_snapshot_optimization();
392                        SnapshotMode::Exclude
393                    };
394                    context.import_index(
395                        compute_state,
396                        &mut tokens,
397                        input_probe,
398                        *idx_id,
399                        &idx.desc,
400                        &idx.typ,
401                        snapshot_mode,
402                        start_signal.clone(),
403                    );
404                }
405
406                // Build declared objects.
407                for object in dataflow.objects_to_build {
408                    let bundle = context.scope.clone().region_named(
409                        &format!("BuildingObject({:?})", object.id),
410                        |region| {
411                            let depends = object.plan.depends();
412                            let in_let = object.plan.is_recursive();
413                            context
414                                .enter_region(region, Some(&depends))
415                                .render_recursive_plan(
416                                    object.id,
417                                    0,
418                                    object.plan,
419                                    // recursive plans _must_ have bodies in a let
420                                    BindingInfo::Body { in_let },
421                                )
422                                .leave_region()
423                        },
424                    );
425                    let global_id = object.id;
426
427                    context.log_dataflow_global_id(
428                        *bundle
429                            .scope()
430                            .addr()
431                            .first()
432                            .expect("Dataflow root id must exist"),
433                        global_id,
434                    );
435                    context.insert_id(Id::Global(object.id), bundle);
436                }
437
438                // Export declared indexes.
439                for (idx_id, dependencies, idx) in indexes {
440                    context.export_index_iterative(
441                        compute_state,
442                        &tokens,
443                        dependencies,
444                        idx_id,
445                        &idx,
446                        &output_probe,
447                    );
448                }
449
450                // Export declared sinks.
451                for (sink_id, dependencies, sink) in sinks {
452                    context.export_sink(
453                        compute_state,
454                        &tokens,
455                        dependencies,
456                        sink_id,
457                        &sink,
458                        start_signal.clone(),
459                        ct_ctx.input_times(&context.scope.parent),
460                        &output_probe,
461                    );
462                }
463            });
464        } else {
465            scope.clone().region_named(&build_name, |region| {
466                let mut context = Context::for_dataflow_in(
467                    &dataflow,
468                    region.clone(),
469                    compute_state,
470                    until,
471                    dataflow_expiration,
472                );
473
474                for (id, (oks, errs)) in imported_sources.into_iter() {
475                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
476                        let as_of = context.as_of_frontier.clone();
477                        let summary = TEMPORAL_BUCKETING_SUMMARY
478                            .get(&compute_state.worker_config)
479                            .try_into()
480                            .expect("must fit");
481                        oks.inner
482                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
483                            .as_collection()
484                    } else {
485                        oks
486                    };
487                    let bundle = crate::render::CollectionBundle::from_collections(
488                        oks.enter_region(region),
489                        errs.enter_region(region),
490                    );
491                    // Associate collection bundle with the source identifier.
492                    context.insert_id(id, bundle);
493                }
494
495                // Import declared indexes into the rendering context.
496                for (idx_id, idx) in &dataflow.index_imports {
497                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
498                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
499                        SnapshotMode::Include
500                    } else {
501                        compute_state.metrics.inc_subscribe_snapshot_optimization();
502                        SnapshotMode::Exclude
503                    };
504                    context.import_index(
505                        compute_state,
506                        &mut tokens,
507                        input_probe,
508                        *idx_id,
509                        &idx.desc,
510                        &idx.typ,
511                        snapshot_mode,
512                        start_signal.clone(),
513                    );
514                }
515
516                // Build declared objects.
517                for object in dataflow.objects_to_build {
518                    let bundle = context.scope.clone().region_named(
519                        &format!("BuildingObject({:?})", object.id),
520                        |region| {
521                            let depends = object.plan.depends();
522                            context
523                                .enter_region(region, Some(&depends))
524                                .render_plan(object.id, object.plan)
525                                .leave_region()
526                        },
527                    );
528                    let global_id = object.id;
529                    context.log_dataflow_global_id(
530                        *bundle
531                            .scope()
532                            .addr()
533                            .first()
534                            .expect("Dataflow root id must exist"),
535                        global_id,
536                    );
537                    context.insert_id(Id::Global(object.id), bundle);
538                }
539
540                // Export declared indexes.
541                for (idx_id, dependencies, idx) in indexes {
542                    context.export_index(
543                        compute_state,
544                        &tokens,
545                        dependencies,
546                        idx_id,
547                        &idx,
548                        &output_probe,
549                    );
550                }
551
552                // Export declared sinks.
553                for (sink_id, dependencies, sink) in sinks {
554                    context.export_sink(
555                        compute_state,
556                        &tokens,
557                        dependencies,
558                        sink_id,
559                        &sink,
560                        start_signal.clone(),
561                        ct_ctx.input_times(&context.scope.parent),
562                        &output_probe,
563                    );
564                }
565            });
566        }
567    });
568}
569
570// This implementation block allows child timestamps to vary from parent timestamps,
571// but requires the parent timestamp to be `repr::Timestamp`.
572impl<'g, G, T> Context<Child<'g, G, T>>
573where
574    G: Scope<Timestamp = mz_repr::Timestamp>,
575    T: Refines<G::Timestamp> + RenderTimestamp,
576{
577    /// Import the collection from the arrangement, discarding batches from the snapshot.
578    /// (This does not guarantee that no records from the snapshot are included; the assumption is
579    /// that we'll filter those out later if necessary.)
580    fn import_filtered_index_collection<Tr: TraceReader<Time = G::Timestamp> + Clone, V: Data>(
581        &self,
582        arranged: Arranged<G, Tr>,
583        start_signal: StartSignal,
584        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
585    ) -> VecCollection<Child<'g, G, T>, V, Tr::Diff>
586    where
587        // This is implied by the fact that G::Timestamp = mz_repr::Timestamp, but it's essential
588        // for our batch-level filtering to be safe, so we document it here regardless.
589        G::Timestamp: TotalOrder,
590    {
591        let oks = arranged.stream.with_start_signal(start_signal).filter({
592            let as_of = self.as_of_frontier.clone();
593            move |b| !<Antichain<G::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
594        });
595        Arranged::<G, Tr>::flat_map_batches(&oks, move |a, b| [logic(a, b)]).enter(&self.scope)
596    }
597
598    pub(crate) fn import_index(
599        &mut self,
600        compute_state: &mut ComputeState,
601        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
602        input_probe: probe::Handle<mz_repr::Timestamp>,
603        idx_id: GlobalId,
604        idx: &IndexDesc,
605        typ: &SqlRelationType,
606        snapshot_mode: SnapshotMode,
607        start_signal: StartSignal,
608    ) {
609        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
610            assert!(
611                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
612                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
613            );
614
615            let token = traces.to_drop().clone();
616
617            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
618                &self.scope.parent,
619                &format!("Index({}, {:?})", idx.on_id, idx.key),
620                self.as_of_frontier.clone(),
621                self.until.clone(),
622            );
623
624            oks.stream = oks.stream.probe_with(&input_probe);
625
626            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
627                &self.scope.parent,
628                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
629                self.as_of_frontier.clone(),
630                self.until.clone(),
631            );
632
633            let bundle = match snapshot_mode {
634                SnapshotMode::Include => {
635                    let ok_arranged = oks
636                        .enter(&self.scope)
637                        .with_start_signal(start_signal.clone());
638                    let err_arranged = err_arranged
639                        .enter(&self.scope)
640                        .with_start_signal(start_signal);
641                    CollectionBundle::from_expressions(
642                        idx.key.clone(),
643                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
644                    )
645                }
646                SnapshotMode::Exclude => {
647                    // When we import an index without a snapshot, we have two balancing considerations:
648                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
649                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
650                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
651                    //   (joins, reduces, etc.) also require the snapshot data.
652                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
653                    let oks = {
654                        let mut datums = DatumVec::new();
655                        let (permutation, _thinning) =
656                            permutation_for_arrangement(&idx.key, typ.arity());
657                        self.import_filtered_index_collection(
658                            oks,
659                            start_signal.clone(),
660                            move |k: DatumSeq, v: DatumSeq| {
661                                let mut datums_borrow = datums.borrow();
662                                datums_borrow.extend(k);
663                                datums_borrow.extend(v);
664                                SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
665                            },
666                        )
667                    };
668                    let errs = self.import_filtered_index_collection(
669                        err_arranged,
670                        start_signal,
671                        |e, _| e.clone(),
672                    );
673                    CollectionBundle::from_collections(oks, errs)
674                }
675            };
676            self.update_id(Id::Global(idx.on_id), bundle);
677            tokens.insert(
678                idx_id,
679                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
680            );
681        } else {
682            panic!(
683                "import of index {} failed while building dataflow {}",
684                idx_id, self.dataflow_id
685            );
686        }
687    }
688}
689
690// This implementation block requires the scopes have the same timestamp as the trace manager.
691// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
692impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
693where
694    G: Scope<Timestamp = mz_repr::Timestamp>,
695{
696    pub(crate) fn export_index(
697        &self,
698        compute_state: &mut ComputeState,
699        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
700        dependency_ids: BTreeSet<GlobalId>,
701        idx_id: GlobalId,
702        idx: &IndexDesc,
703        output_probe: &MzProbeHandle<G::Timestamp>,
704    ) {
705        // put together tokens that belong to the export
706        let mut needed_tokens = Vec::new();
707        for dep_id in dependency_ids {
708            if let Some(token) = tokens.get(&dep_id) {
709                needed_tokens.push(Rc::clone(token));
710            }
711        }
712        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
713            panic!(
714                "Arrangement alarmingly absent! id: {:?}",
715                Id::Global(idx_id)
716            )
717        });
718
719        match bundle.arrangement(&idx.key) {
720            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
721                // Ensure that the frontier does not advance past the expiration time, if set.
722                // Otherwise, we might write down incorrect data.
723                if let Some(&expiration) = self.dataflow_expiration.as_option() {
724                    oks.stream = oks.stream.expire_stream_at(
725                        &format!("{}_export_index_oks", self.debug_name),
726                        expiration,
727                    );
728                    errs.stream = errs.stream.expire_stream_at(
729                        &format!("{}_export_index_errs", self.debug_name),
730                        expiration,
731                    );
732                }
733
734                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
735
736                // Attach logging of dataflow errors.
737                if let Some(logger) = compute_state.compute_logger.clone() {
738                    errs.stream.log_dataflow_errors(logger, idx_id);
739                }
740
741                compute_state.traces.set(
742                    idx_id,
743                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
744                );
745            }
746            Some(ArrangementFlavor::Trace(gid, _, _)) => {
747                // Duplicate of existing arrangement with id `gid`, so
748                // just create another handle to that arrangement.
749                let trace = compute_state.traces.get(&gid).unwrap().clone();
750                compute_state.traces.set(idx_id, trace);
751            }
752            None => {
753                println!("collection available: {:?}", bundle.collection.is_none());
754                println!(
755                    "keys available: {:?}",
756                    bundle.arranged.keys().collect::<Vec<_>>()
757                );
758                panic!(
759                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
760                    Id::Global(idx_id),
761                    &idx.key
762                );
763            }
764        };
765    }
766}
767
768// This implementation block requires the scopes have the same timestamp as the trace manager.
769// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
770impl<'g, G, T> Context<Child<'g, G, T>>
771where
772    G: Scope<Timestamp = mz_repr::Timestamp>,
773    T: RenderTimestamp,
774{
775    pub(crate) fn export_index_iterative(
776        &self,
777        compute_state: &mut ComputeState,
778        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
779        dependency_ids: BTreeSet<GlobalId>,
780        idx_id: GlobalId,
781        idx: &IndexDesc,
782        output_probe: &MzProbeHandle<G::Timestamp>,
783    ) {
784        // put together tokens that belong to the export
785        let mut needed_tokens = Vec::new();
786        for dep_id in dependency_ids {
787            if let Some(token) = tokens.get(&dep_id) {
788                needed_tokens.push(Rc::clone(token));
789            }
790        }
791        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
792            panic!(
793                "Arrangement alarmingly absent! id: {:?}",
794                Id::Global(idx_id)
795            )
796        });
797
798        match bundle.arrangement(&idx.key) {
799            Some(ArrangementFlavor::Local(oks, errs)) => {
800                // TODO: The following as_collection/leave/arrange sequence could be optimized.
801                //   * Combine as_collection and leave into a single function.
802                //   * Use columnar to extract columns from the batches to implement leave.
803                let mut oks = oks
804                    .as_collection(|k, v| (k.to_row(), v.to_row()))
805                    .leave()
806                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
807                    "Arrange export iterative",
808                );
809
810                let mut errs = errs
811                    .as_collection(|k, v| (k.clone(), v.clone()))
812                    .leave()
813                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
814                        "Arrange export iterative err",
815                    );
816
817                // Ensure that the frontier does not advance past the expiration time, if set.
818                // Otherwise, we might write down incorrect data.
819                if let Some(&expiration) = self.dataflow_expiration.as_option() {
820                    oks.stream = oks.stream.expire_stream_at(
821                        &format!("{}_export_index_iterative_oks", self.debug_name),
822                        expiration,
823                    );
824                    errs.stream = errs.stream.expire_stream_at(
825                        &format!("{}_export_index_iterative_err", self.debug_name),
826                        expiration,
827                    );
828                }
829
830                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
831
832                // Attach logging of dataflow errors.
833                if let Some(logger) = compute_state.compute_logger.clone() {
834                    errs.stream.log_dataflow_errors(logger, idx_id);
835                }
836
837                compute_state.traces.set(
838                    idx_id,
839                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
840                );
841            }
842            Some(ArrangementFlavor::Trace(gid, _, _)) => {
843                // Duplicate of existing arrangement with id `gid`, so
844                // just create another handle to that arrangement.
845                let trace = compute_state.traces.get(&gid).unwrap().clone();
846                compute_state.traces.set(idx_id, trace);
847            }
848            None => {
849                println!("collection available: {:?}", bundle.collection.is_none());
850                println!(
851                    "keys available: {:?}",
852                    bundle.arranged.keys().collect::<Vec<_>>()
853                );
854                panic!(
855                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
856                    Id::Global(idx_id),
857                    &idx.key
858                );
859            }
860        };
861    }
862}
863
864/// Information about bindings, tracked in `render_recursive_plan` and
865/// `render_plan`, to be passed to `render_letfree_plan`.
866///
867/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
868/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
869enum BindingInfo {
870    Body { in_let: bool },
871    Let { id: LocalId, last: bool },
872    LetRec { id: LocalId, last: bool },
873}
874
875impl<G> Context<G>
876where
877    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
878{
879    /// Renders a plan to a differential dataflow, producing the collection of results.
880    ///
881    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
882    /// in the context of `level` pre-existing iteration coordinates.
883    ///
884    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
885    /// and establishing the appropriate recursive dependencies among the bound variables.
886    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
887    /// further [`RecBind`]s are found.
888    ///
889    /// The method requires that all variables conclude with a physical representation that
890    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
891    fn render_recursive_plan(
892        &mut self,
893        object_id: GlobalId,
894        level: usize,
895        plan: RenderPlan,
896        binding: BindingInfo,
897    ) -> CollectionBundle<G> {
898        for BindStage { lets, recs } in plan.binds {
899            // Render the let bindings in order.
900            let mut let_iter = lets.into_iter().peekable();
901            while let Some(LetBind { id, value }) = let_iter.next() {
902                let bundle =
903                    self.scope
904                        .clone()
905                        .region_named(&format!("Binding({:?})", id), |region| {
906                            let depends = value.depends();
907                            let last = let_iter.peek().is_none();
908                            let binding = BindingInfo::Let { id, last };
909                            self.enter_region(region, Some(&depends))
910                                .render_letfree_plan(object_id, value, binding)
911                                .leave_region()
912                        });
913                self.insert_id(Id::Local(id), bundle);
914            }
915
916            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
917
918            // Define variables for rec bindings.
919            // It is important that we only use the `SemigroupVariable` until the object is bound.
920            // At that point, all subsequent uses should have access to the object itself.
921            let mut variables = BTreeMap::new();
922            for id in rec_ids.iter() {
923                use differential_dataflow::dynamic::feedback_summary;
924                let inner = feedback_summary::<u64>(level + 1, 1);
925                let oks_v = SemigroupVariable::new(
926                    &mut self.scope,
927                    Product::new(Default::default(), inner.clone()),
928                );
929                let err_v = SemigroupVariable::new(
930                    &mut self.scope,
931                    Product::new(Default::default(), inner),
932                );
933
934                self.insert_id(
935                    Id::Local(*id),
936                    CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
937                );
938                variables.insert(Id::Local(*id), (oks_v, err_v));
939            }
940            // Now render each of the rec bindings.
941            let mut rec_iter = recs.into_iter().peekable();
942            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
943                let last = rec_iter.peek().is_none();
944                let binding = BindingInfo::LetRec { id, last };
945                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
946                // We need to ensure that the raw collection exists, but do not have enough information
947                // here to cause that to happen.
948                let (oks, mut err) = bundle.collection.clone().unwrap();
949                self.insert_id(Id::Local(id), bundle);
950                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
951
952                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
953                let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
954
955                if let Some(limit) = limit {
956                    // We swallow the results of the `max_iter`th iteration, because
957                    // these results would go into the `max_iter + 1`th iteration.
958                    let (in_limit, over_limit) =
959                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
960                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
961                            let iteration_index = *ps.get(level).unwrap_or(&0);
962                            // The pointstamp starts counting from 0, so we need to add 1.
963                            iteration_index + 1 >= limit.max_iters.into()
964                        });
965                    oks = VecCollection::new(in_limit);
966                    if !limit.return_at_limit {
967                        err = err.concat(&VecCollection::new(over_limit).map(move |_data| {
968                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
969                                format!("{}", limit.max_iters.get()).into(),
970                            )))
971                        }));
972                    }
973                }
974
975                // Set err variable to the distinct elements of `err`.
976                // Distinctness is important, as we otherwise might add the same error each iteration,
977                // say if the limit of `oks` has an error. This would result in non-terminating rather
978                // than a clean report of the error. The trade-off is that we lose information about
979                // multiplicities of errors, but .. this seems to be the better call.
980                let err: KeyCollection<_, _, _> = err.into();
981                let errs = err
982                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
983                        "Arrange recursive err",
984                    )
985                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
986                        "Distinct recursive err",
987                        move |_k, _s, t| t.push(((), Diff::ONE)),
988                    )
989                    .as_collection(|k, _| k.clone());
990
991                oks_v.set(&oks);
992                err_v.set(&errs);
993            }
994            // Now extract each of the rec bindings into the outer scope.
995            for id in rec_ids.into_iter() {
996                let bundle = self.remove_id(Id::Local(id)).unwrap();
997                let (oks, err) = bundle.collection.unwrap();
998                self.insert_id(
999                    Id::Local(id),
1000                    CollectionBundle::from_collections(
1001                        oks.leave_dynamic(level + 1),
1002                        err.leave_dynamic(level + 1),
1003                    ),
1004                );
1005            }
1006        }
1007
1008        self.render_letfree_plan(object_id, plan.body, binding)
1009    }
1010}
1011
1012impl<G> Context<G>
1013where
1014    G: Scope,
1015    G::Timestamp: RenderTimestamp,
1016{
1017    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
1018    /// results.
1019    ///
1020    /// The return type reflects the uncertainty about the data representation, perhaps
1021    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
1022    ///
1023    /// # Panics
1024    ///
1025    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
1026    /// `render_recursive_plan` instead.
1027    fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
1028        let mut in_let = false;
1029        for BindStage { lets, recs } in plan.binds {
1030            assert!(recs.is_empty());
1031
1032            let mut let_iter = lets.into_iter().peekable();
1033            while let Some(LetBind { id, value }) = let_iter.next() {
1034                // if we encounter a single let, the body is in a let
1035                in_let = true;
1036                let bundle =
1037                    self.scope
1038                        .clone()
1039                        .region_named(&format!("Binding({:?})", id), |region| {
1040                            let depends = value.depends();
1041                            let last = let_iter.peek().is_none();
1042                            let binding = BindingInfo::Let { id, last };
1043                            self.enter_region(region, Some(&depends))
1044                                .render_letfree_plan(object_id, value, binding)
1045                                .leave_region()
1046                        });
1047                self.insert_id(Id::Local(id), bundle);
1048            }
1049        }
1050
1051        self.scope.clone().region_named("Main Body", |region| {
1052            let depends = plan.body.depends();
1053            self.enter_region(region, Some(&depends))
1054                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1055                .leave_region()
1056        })
1057    }
1058
1059    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1060    fn render_letfree_plan(
1061        &mut self,
1062        object_id: GlobalId,
1063        plan: LetFreePlan,
1064        binding: BindingInfo,
1065    ) -> CollectionBundle<G> {
1066        let (mut nodes, root_id, topological_order) = plan.destruct();
1067
1068        // Rendered collections by their `LirId`.
1069        let mut collections = BTreeMap::new();
1070
1071        // Mappings to send along.
1072        // To save overhead, we'll only compute mappings when we need to,
1073        // which means things get gated behind options. Unfortunately, that means we
1074        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1075        // but there's no convenient way to express the invariant.
1076        let should_compute_lir_metadata = self.compute_logger.is_some();
1077        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1078            Some(Vec::with_capacity(nodes.len()))
1079        } else {
1080            None
1081        };
1082
1083        let mut topo_iter = topological_order.into_iter().peekable();
1084        while let Some(lir_id) = topo_iter.next() {
1085            let node = nodes.remove(&lir_id).unwrap();
1086
1087            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1088            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1089            // in some other structure and have that structure impl ExprHumanizer
1090            let metadata = if should_compute_lir_metadata {
1091                let operator = node.expr.humanize(&DummyHumanizer);
1092
1093                // mark the last operator in topo order with any binding decoration
1094                let operator = if topo_iter.peek().is_none() {
1095                    match &binding {
1096                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1097                        BindingInfo::Body { in_let: false } => operator,
1098                        BindingInfo::Let { id, last: true } => {
1099                            format!("With {id} = {operator}")
1100                        }
1101                        BindingInfo::Let { id, last: false } => {
1102                            format!("{id} = {operator}")
1103                        }
1104                        BindingInfo::LetRec { id, last: true } => {
1105                            format!("With Recursive {id} = {operator}")
1106                        }
1107                        BindingInfo::LetRec { id, last: false } => {
1108                            format!("{id} = {operator}")
1109                        }
1110                    }
1111                } else {
1112                    operator
1113                };
1114
1115                let operator_id_start = self.scope.peek_identifier();
1116                Some((operator, operator_id_start))
1117            } else {
1118                None
1119            };
1120
1121            let mut bundle = self.render_plan_expr(node.expr, &collections);
1122
1123            if let Some((operator, operator_id_start)) = metadata {
1124                let operator_id_end = self.scope.peek_identifier();
1125                let operator_span = (operator_id_start, operator_id_end);
1126
1127                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1128                    lir_mapping_metadata.push((
1129                        lir_id,
1130                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1131                    ))
1132                }
1133            }
1134
1135            self.log_operator_hydration(&mut bundle, lir_id);
1136
1137            collections.insert(lir_id, bundle);
1138        }
1139
1140        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1141            self.log_lir_mapping(object_id, lir_mapping_metadata);
1142        }
1143
1144        collections
1145            .remove(&root_id)
1146            .expect("LetFreePlan invariant (1)")
1147    }
1148
1149    /// Renders a [`render_plan::Expr`], producing the collection of results.
1150    ///
1151    /// # Panics
1152    ///
1153    /// Panics if any of the expr's inputs is not found in `collections`.
1154    /// Callers must ensure that input nodes have been rendered previously.
1155    fn render_plan_expr(
1156        &mut self,
1157        expr: render_plan::Expr,
1158        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1159    ) -> CollectionBundle<G> {
1160        use render_plan::Expr::*;
1161
1162        let expect_input = |id| {
1163            collections
1164                .get(&id)
1165                .cloned()
1166                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1167        };
1168
1169        match expr {
1170            Constant { rows } => {
1171                // Produce both rows and errs to avoid conditional dataflow construction.
1172                let (rows, errs) = match rows {
1173                    Ok(rows) => (rows, Vec::new()),
1174                    Err(e) => (Vec::new(), vec![e]),
1175                };
1176
1177                // We should advance times in constant collections to start from `as_of`.
1178                let as_of_frontier = self.as_of_frontier.clone();
1179                let until = self.until.clone();
1180                let ok_collection = rows
1181                    .into_iter()
1182                    .filter_map(move |(row, mut time, diff)| {
1183                        time.advance_by(as_of_frontier.borrow());
1184                        if !until.less_equal(&time) {
1185                            Some((
1186                                row,
1187                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1188                                diff,
1189                            ))
1190                        } else {
1191                            None
1192                        }
1193                    })
1194                    .to_stream(&mut self.scope)
1195                    .as_collection();
1196
1197                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1198                error_time.advance_by(self.as_of_frontier.borrow());
1199                let err_collection = errs
1200                    .into_iter()
1201                    .map(move |e| {
1202                        (
1203                            DataflowError::from(e),
1204                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1205                            Diff::ONE,
1206                        )
1207                    })
1208                    .to_stream(&mut self.scope)
1209                    .as_collection();
1210
1211                CollectionBundle::from_collections(ok_collection, err_collection)
1212            }
1213            Get { id, keys, plan } => {
1214                // Recover the collection from `self` and then apply `mfp` to it.
1215                // If `mfp` happens to be trivial, we can just return the collection.
1216                let mut collection = self
1217                    .lookup_id(id)
1218                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1219                match plan {
1220                    mz_compute_types::plan::GetPlan::PassArrangements => {
1221                        // Assert that each of `keys` are present in `collection`.
1222                        assert!(
1223                            keys.arranged
1224                                .iter()
1225                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1226                        );
1227                        assert!(keys.raw <= collection.collection.is_some());
1228                        // Retain only those keys we want to import.
1229                        collection.arranged.retain(|key, _value| {
1230                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1231                        });
1232                        collection
1233                    }
1234                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1235                        let (oks, errs) = collection.as_collection_core(
1236                            mfp,
1237                            Some((key, row)),
1238                            self.until.clone(),
1239                            &self.config_set,
1240                        );
1241                        CollectionBundle::from_collections(oks, errs)
1242                    }
1243                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1244                        let (oks, errs) = collection.as_collection_core(
1245                            mfp,
1246                            None,
1247                            self.until.clone(),
1248                            &self.config_set,
1249                        );
1250                        CollectionBundle::from_collections(oks, errs)
1251                    }
1252                }
1253            }
1254            Mfp {
1255                input,
1256                mfp,
1257                input_key_val,
1258            } => {
1259                let input = expect_input(input);
1260                // If `mfp` is non-trivial, we should apply it and produce a collection.
1261                if mfp.is_identity() {
1262                    input
1263                } else {
1264                    let (oks, errs) = input.as_collection_core(
1265                        mfp,
1266                        input_key_val,
1267                        self.until.clone(),
1268                        &self.config_set,
1269                    );
1270                    CollectionBundle::from_collections(oks, errs)
1271                }
1272            }
1273            FlatMap {
1274                input_key,
1275                input,
1276                exprs,
1277                func,
1278                mfp_after: mfp,
1279            } => {
1280                let input = expect_input(input);
1281                self.render_flat_map(input_key, input, exprs, func, mfp)
1282            }
1283            Join { inputs, plan } => {
1284                let inputs = inputs.into_iter().map(expect_input).collect();
1285                match plan {
1286                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1287                        self.render_join(inputs, linear_plan)
1288                    }
1289                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1290                        self.render_delta_join(inputs, delta_plan)
1291                    }
1292                }
1293            }
1294            Reduce {
1295                input_key,
1296                input,
1297                key_val_plan,
1298                plan,
1299                mfp_after,
1300            } => {
1301                let input = expect_input(input);
1302                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1303                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1304            }
1305            TopK { input, top_k_plan } => {
1306                let input = expect_input(input);
1307                self.render_topk(input, top_k_plan)
1308            }
1309            Negate { input } => {
1310                let input = expect_input(input);
1311                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1312                CollectionBundle::from_collections(oks.negate(), errs)
1313            }
1314            Threshold {
1315                input,
1316                threshold_plan,
1317            } => {
1318                let input = expect_input(input);
1319                self.render_threshold(input, threshold_plan)
1320            }
1321            Union {
1322                inputs,
1323                consolidate_output,
1324            } => {
1325                let mut oks = Vec::new();
1326                let mut errs = Vec::new();
1327                for input in inputs.into_iter() {
1328                    let (os, es) =
1329                        expect_input(input).as_specific_collection(None, &self.config_set);
1330                    oks.push(os);
1331                    errs.push(es);
1332                }
1333                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1334                if consolidate_output {
1335                    oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1336                }
1337                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1338                CollectionBundle::from_collections(oks, errs)
1339            }
1340            ArrangeBy {
1341                input_key,
1342                input,
1343                input_mfp,
1344                forms: keys,
1345            } => {
1346                let input = expect_input(input);
1347                input.ensure_collections(
1348                    keys,
1349                    input_key,
1350                    input_mfp,
1351                    self.until.clone(),
1352                    &self.config_set,
1353                )
1354            }
1355        }
1356    }
1357
1358    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1359        if let Some(logger) = &self.compute_logger {
1360            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1361                dataflow_index,
1362                global_id,
1363            }));
1364        }
1365    }
1366
1367    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1368        if let Some(logger) = &self.compute_logger {
1369            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1370        }
1371    }
1372
1373    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1374        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1375        // which we should attach the logging operator.
1376        //
1377        // We could attach to each collection and track the lower bound of output frontiers.
1378        // However, that would be of limited use because we expect all collections to hydrate at
1379        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1380        // frontier of the unarranged collection advance, it will perform all work necessary to
1381        // also advance its own frontier. We don't expect significant delays between frontier
1382        // advancements of the unarranged and arranged collections, so attaching the logging
1383        // operator to any one of them should produce accurate results.
1384        //
1385        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1386        // beneficial to attach the logging operator to one of the arranged representation to avoid
1387        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1388        // if we attached the logging operator to it, we would introduce a fork in its output
1389        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1390        // that the output streams of the arrangements don't yet feed into anything else, so
1391        // attaching a (pass-through) logging operator does not introduce a fork.
1392
1393        match bundle.arranged.values_mut().next() {
1394            Some(arrangement) => {
1395                use ArrangementFlavor::*;
1396
1397                match arrangement {
1398                    Local(a, _) => {
1399                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1400                    }
1401                    Trace(_, a, _) => {
1402                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1403                    }
1404                }
1405            }
1406            None => {
1407                let (oks, _) = bundle
1408                    .collection
1409                    .as_mut()
1410                    .expect("CollectionBundle invariant");
1411                let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1412                *oks = stream.as_collection();
1413            }
1414        }
1415    }
1416
1417    fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1418    where
1419        D: Clone + 'static,
1420    {
1421        let Some(logger) = self.compute_logger.clone() else {
1422            return stream.clone(); // hydration logging disabled
1423        };
1424
1425        let export_ids = self.export_ids.clone();
1426
1427        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1428        //
1429        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1430        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1431        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1432        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1433        // instead.
1434        let mut hydration_frontier = Antichain::new();
1435        for time in self.as_of_frontier.iter() {
1436            if let Some(time) = time.try_step_forward() {
1437                hydration_frontier.insert(Refines::to_inner(time));
1438            }
1439        }
1440
1441        let name = format!("LogOperatorHydration ({lir_id})");
1442        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1443            let mut hydrated = false;
1444
1445            for &export_id in &export_ids {
1446                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1447                    export_id,
1448                    lir_id,
1449                    hydrated,
1450                }));
1451            }
1452
1453            move |(input, frontier), output| {
1454                // Pass through inputs.
1455                input.for_each(|cap, data| {
1456                    output.session(&cap).give_container(data);
1457                });
1458
1459                if hydrated {
1460                    return;
1461                }
1462
1463                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1464                    hydrated = true;
1465
1466                    for &export_id in &export_ids {
1467                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1468                            export_id,
1469                            lir_id,
1470                            hydrated,
1471                        }));
1472                    }
1473                }
1474            }
1475        })
1476    }
1477}
1478
1479#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1480/// A timestamp type that can be used for operations within MZ's dataflow layer.
1481pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1482    /// The system timestamp component of the timestamp.
1483    ///
1484    /// This is useful for manipulating the system time, as when delaying
1485    /// updates for subsequent cancellation, as with monotonic reduction.
1486    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1487    /// Effects a system delay in terms of the timestamp summary.
1488    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1489    /// The event timestamp component of the timestamp.
1490    fn event_time(&self) -> mz_repr::Timestamp;
1491    /// The event timestamp component of the timestamp, as a mutable reference.
1492    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1493    /// Effects an event delay in terms of the timestamp summary.
1494    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1495    /// Steps the timestamp back so that logical compaction to the output will
1496    /// not conflate `self` with any historical times.
1497    fn step_back(&self) -> Self;
1498}
1499
1500impl RenderTimestamp for mz_repr::Timestamp {
1501    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1502        self
1503    }
1504    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1505        delay
1506    }
1507    fn event_time(&self) -> mz_repr::Timestamp {
1508        *self
1509    }
1510    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1511        self
1512    }
1513    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1514        delay
1515    }
1516    fn step_back(&self) -> Self {
1517        self.saturating_sub(1)
1518    }
1519}
1520
1521impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1522    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1523        &mut self.outer
1524    }
1525    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1526        Product::new(delay, Default::default())
1527    }
1528    fn event_time(&self) -> mz_repr::Timestamp {
1529        self.outer
1530    }
1531    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1532        &mut self.outer
1533    }
1534    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1535        Product::new(delay, Default::default())
1536    }
1537    fn step_back(&self) -> Self {
1538        // It is necessary to step back both coordinates of a product,
1539        // and when one is a `PointStamp` that also means all coordinates
1540        // of the pointstamp.
1541        let inner = self.inner.clone();
1542        let mut vec = inner.into_inner();
1543        for item in vec.iter_mut() {
1544            *item = item.saturating_sub(1);
1545        }
1546        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1547    }
1548}
1549
1550/// A signal that can be awaited by operators to suspend them prior to startup.
1551///
1552/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1553///
1554/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1555///
1556///  * Async operators can simply `await` it.
1557///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1558///    and then check `StartSignal::has_fired()` on each activation.
1559#[derive(Clone)]
1560pub(crate) struct StartSignal {
1561    /// A future that completes when the signal fires.
1562    ///
1563    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1564    /// signal is activated by dropping the corresponding `Sender`.
1565    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1566    /// A weak reference to the token, to register drop-on-fire values.
1567    token_ref: Weak<RefCell<Box<dyn Any>>>,
1568}
1569
1570impl StartSignal {
1571    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1572    /// dropped.
1573    pub fn new() -> (Self, Rc<dyn Any>) {
1574        let (tx, rx) = oneshot::channel::<Infallible>();
1575        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1576        let signal = Self {
1577            fut: rx.shared(),
1578            token_ref: Rc::downgrade(&token),
1579        };
1580        (signal, token)
1581    }
1582
1583    pub fn has_fired(&self) -> bool {
1584        self.token_ref.strong_count() == 0
1585    }
1586
1587    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1588        if let Some(token) = self.token_ref.upgrade() {
1589            let mut token = token.borrow_mut();
1590            let inner = std::mem::replace(&mut *token, Box::new(()));
1591            *token = Box::new((inner, to_drop));
1592        }
1593    }
1594}
1595
1596impl Future for StartSignal {
1597    type Output = ();
1598
1599    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1600        self.fut.poll_unpin(cx).map(|_| ())
1601    }
1602}
1603
1604/// Extension trait to attach a `StartSignal` to operator outputs.
1605pub(crate) trait WithStartSignal {
1606    /// Delays data and progress updates until the start signal has fired.
1607    ///
1608    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1609    /// depending on the amount and shape of its inputs.
1610    fn with_start_signal(self, signal: StartSignal) -> Self;
1611}
1612
1613impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1614where
1615    S: Scope,
1616    S::Timestamp: RenderTimestamp,
1617    Tr: TraceReader + Clone,
1618{
1619    fn with_start_signal(self, signal: StartSignal) -> Self {
1620        Arranged {
1621            stream: self.stream.with_start_signal(signal),
1622            trace: self.trace,
1623        }
1624    }
1625}
1626
1627impl<S, D> WithStartSignal for Stream<S, D>
1628where
1629    S: Scope,
1630    D: timely::Data,
1631{
1632    fn with_start_signal(self, signal: StartSignal) -> Self {
1633        self.unary(Pipeline, "StartSignal", |_cap, info| {
1634            let token = Box::new(ActivateOnDrop::new(
1635                (),
1636                info.address,
1637                self.scope().activations(),
1638            ));
1639            signal.drop_on_fire(token);
1640
1641            let mut stash = Vec::new();
1642
1643            move |input, output| {
1644                // Stash incoming updates as long as the start signal has not fired.
1645                if !signal.has_fired() {
1646                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1647                    return;
1648                }
1649
1650                // Release any data we might still have stashed.
1651                for (cap, mut data) in std::mem::take(&mut stash) {
1652                    output.session(&cap).give_container(&mut data);
1653                }
1654
1655                // Pass through all remaining input data.
1656                input.for_each(|cap, data| {
1657                    output.session(&cap).give_container(data);
1658                });
1659            }
1660        })
1661    }
1662}
1663
1664/// Suppress progress messages for times before the given `as_of`.
1665///
1666/// This operator exists specifically to work around a memory spike we'd otherwise see when
1667/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1668/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1669/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1670/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1671/// the snapshot batch.
1672///
1673/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1674/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1675/// beyond the `as_of`, so the snapshot data has already been collected.
1676///
1677/// To ensure this, this operator needs to take two measures:
1678///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1679///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1680///
1681/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1682/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1683/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1684/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1685fn suppress_early_progress<G, D>(
1686    stream: Stream<G, D>,
1687    as_of: Antichain<G::Timestamp>,
1688) -> Stream<G, D>
1689where
1690    G: Scope,
1691    D: Data,
1692{
1693    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1694        let mut early_cap = Some(default_cap);
1695
1696        move |(input, frontier), output| {
1697            input.for_each_time(|data_cap, data| {
1698                if as_of.less_than(data_cap.time()) {
1699                    let mut session = output.session(&data_cap);
1700                    for data in data {
1701                        session.give_container(data);
1702                    }
1703                } else {
1704                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1705                    let mut session = output.session(&cap);
1706                    for data in data {
1707                        session.give_container(data);
1708                    }
1709                }
1710            });
1711
1712            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1713                early_cap.take();
1714            }
1715        }
1716    })
1717}
1718
1719/// Extension trait for [`StreamCore`] to selectively limit progress.
1720trait LimitProgress<T: Timestamp> {
1721    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1722    /// the implementation to observe times in data, and release capabilities based on the probe's
1723    /// frontier, after applying `slack` to round up timestamps.
1724    ///
1725    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1726    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1727    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1728    /// for times before the `upper`, which is the time until which the source can distinguish
1729    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1730    /// our capability.
1731    ///
1732    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1733    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1734    ///
1735    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1736    /// allow downstream operators to release resources.
1737    ///
1738    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1739    /// avoid unbounded memory usage.
1740    ///
1741    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1742    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1743    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1744    /// * `name` is a human-readable name for the operator.
1745    /// * `limit` is the maximum number of pending times to keep around.
1746    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1747    ///   retain a capability.
1748    fn limit_progress(
1749        &self,
1750        handle: MzProbeHandle<T>,
1751        slack_ms: u64,
1752        limit: Option<usize>,
1753        upper: Antichain<T>,
1754        name: String,
1755    ) -> Self;
1756}
1757
1758// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1759// number.
1760impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1761where
1762    G: Scope<Timestamp = mz_repr::Timestamp>,
1763    D: timely::Data,
1764    R: timely::Data,
1765{
1766    fn limit_progress(
1767        &self,
1768        handle: MzProbeHandle<mz_repr::Timestamp>,
1769        slack_ms: u64,
1770        limit: Option<usize>,
1771        upper: Antichain<mz_repr::Timestamp>,
1772        name: String,
1773    ) -> Self {
1774        let stream =
1775            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1776                // Times that we've observed on our input.
1777                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1778                // Capability for the lower bound of `pending_times`, if any.
1779                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1780
1781                let activator = self.scope().activator_for(info.address);
1782                handle.activate(activator.clone());
1783
1784                move |(input, frontier), output| {
1785                    input.for_each(|cap, data| {
1786                        for time in data
1787                            .iter()
1788                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1789                        {
1790                            let rounded_time =
1791                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1792                            if !upper.less_than(&rounded_time.into()) {
1793                                pending_times.insert(rounded_time.into());
1794                            }
1795                        }
1796                        output.session(&cap).give_container(data);
1797                        if retained_cap.as_ref().is_none_or(|c| {
1798                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1799                        }) {
1800                            retained_cap = Some(cap.retain());
1801                        }
1802                    });
1803
1804                    handle.with_frontier(|f| {
1805                        while pending_times
1806                            .first()
1807                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1808                        {
1809                            let _ = pending_times.pop_first();
1810                        }
1811                    });
1812
1813                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1814                        let _ = pending_times.pop_first();
1815                    }
1816
1817                    match (retained_cap.as_mut(), pending_times.first()) {
1818                        (Some(cap), Some(first)) => cap.downgrade(first),
1819                        (_, None) => retained_cap = None,
1820                        _ => {}
1821                    }
1822
1823                    if frontier.is_empty() {
1824                        retained_cap = None;
1825                        pending_times.clear();
1826                    }
1827
1828                    if !pending_times.is_empty() {
1829                        tracing::debug!(
1830                            name,
1831                            info.global_id,
1832                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1833                            frontier = ?frontier.frontier().get(0),
1834                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1835                            ?upper,
1836                            "pending times",
1837                        );
1838                    }
1839                }
1840            });
1841        stream
1842    }
1843}
1844
1845/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1846/// the difference between timestamps.
1847struct PendingTimesDisplay<T>(T);
1848
1849impl<T> std::fmt::Display for PendingTimesDisplay<T>
1850where
1851    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1852{
1853    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1854        let mut iter = self.0.clone().into_iter();
1855        write!(f, "[")?;
1856        if let Some(first) = iter.next() {
1857            write!(f, "{}", first)?;
1858            let mut last = u64::from(first);
1859            for time in iter {
1860                write!(f, ", +{}", u64::from(time) - last)?;
1861                last = u64::from(time);
1862            }
1863        }
1864        write!(f, "]")?;
1865        Ok(())
1866    }
1867}
1868
1869/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1870/// into two rows, given the arity of the first component.
1871#[derive(Clone, Copy, Debug)]
1872struct Pairer {
1873    split_arity: usize,
1874}
1875
1876impl Pairer {
1877    /// Creates a pairer with knowledge of the arity of first component in the pair.
1878    fn new(split_arity: usize) -> Self {
1879        Self { split_arity }
1880    }
1881
1882    /// Merges a pair of datum iterators creating a `Row` instance.
1883    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1884    where
1885        I1: IntoIterator<Item = Datum<'a>>,
1886        I2: IntoIterator<Item = Datum<'a>>,
1887    {
1888        SharedRow::pack(first.into_iter().chain(second))
1889    }
1890
1891    /// Splits a datum iterator into a pair of `Row` instances.
1892    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1893        let mut datum_iter = datum_iter.into_iter();
1894        let mut row_builder = SharedRow::get();
1895        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1896        let second = row_builder.pack_using(datum_iter);
1897        (first, second)
1898    }
1899}