mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use columnar::Columnar;
114use differential_dataflow::IntoOwned;
115use differential_dataflow::containers::Columnation;
116use differential_dataflow::dynamic::pointstamp::PointStamp;
117use differential_dataflow::lattice::Lattice;
118use differential_dataflow::operators::arrange::{Arranged, ShutdownButton};
119use differential_dataflow::trace::TraceReader;
120use differential_dataflow::{AsCollection, Collection, Data};
121use futures::FutureExt;
122use futures::channel::oneshot;
123use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
124use mz_compute_types::dyncfgs::{
125    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
126    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
127};
128use mz_compute_types::plan::LirId;
129use mz_compute_types::plan::render_plan::{
130    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
131};
132use mz_expr::{EvalError, Id};
133use mz_persist_client::operators::shard_source::SnapshotMode;
134use mz_repr::explain::DummyHumanizer;
135use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
136use mz_storage_operators::persist_source;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_timely_util::operator::{CollectionExt, StreamExt};
140use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
141use timely::PartialOrder;
142use timely::communication::Allocate;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::Product;
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::Worker as TimelyWorker;
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::logging::compute::{
159    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors,
160};
161use crate::render::context::{ArrangementFlavor, Context, ShutdownToken};
162use crate::render::continual_task::ContinualTaskCtx;
163use crate::row_spine::{RowRowBatcher, RowRowBuilder};
164use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher};
165
166pub mod context;
167pub(crate) mod continual_task;
168mod errors;
169mod flat_map;
170mod join;
171mod reduce;
172pub mod sinks;
173mod threshold;
174mod top_k;
175
176pub use context::CollectionBundle;
177pub use join::LinearJoinSpec;
178
179/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
180///
181/// This method imports sources from provided assets, and then builds the remaining
182/// dataflow using "compute-local" assets like shared arrangements, and producing
183/// both arrangements and sinks.
184pub fn build_compute_dataflow<A: Allocate>(
185    timely_worker: &mut TimelyWorker<A>,
186    compute_state: &mut ComputeState,
187    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
188    start_signal: StartSignal,
189    until: Antichain<mz_repr::Timestamp>,
190    dataflow_expiration: Antichain<mz_repr::Timestamp>,
191) {
192    // Mutually recursive view definitions require special handling.
193    let recursive = dataflow
194        .objects_to_build
195        .iter()
196        .any(|object| object.plan.is_recursive());
197
198    // Determine indexes to export, and their dependencies.
199    let indexes = dataflow
200        .index_exports
201        .iter()
202        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
203        .collect::<Vec<_>>();
204
205    // Determine sinks to export, and their dependencies.
206    let sinks = dataflow
207        .sink_exports
208        .iter()
209        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
210        .collect::<Vec<_>>();
211
212    let worker_logging = timely_worker.log_register().get("timely").map(Into::into);
213    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
214
215    // If you change the format here to something other than "Dataflow: {name}",
216    // you should also update MZ_MAPPABLE_OBJECTS in `src/catalog/src/builtin.rs`
217    let name = format!("Dataflow: {}", &dataflow.debug_name);
218    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
219    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
220
221    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
222        // TODO(ct3): This should be a config of the source instead, but at least try
223        // to contain the hacks.
224        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
225
226        // The scope.clone() occurs to allow import in the region.
227        // We build a region here to establish a pattern of a scope inside the dataflow,
228        // so that other similar uses (e.g. with iterative scopes) do not require weird
229        // alternate type signatures.
230        let mut imported_sources = Vec::new();
231        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
232        let output_probe = MzProbeHandle::default();
233
234        scope.clone().region_named(&input_name, |region| {
235            // Import declared sources into the rendering context.
236            for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
237                region.region_named(&format!("Source({:?})", source_id), |inner| {
238                    let mut read_schema = None;
239                    let mut mfp = source.arguments.operators.clone().map(|mut ops| {
240                        // If enabled, we read from Persist with a `RelationDesc` that
241                        // omits uneeded columns.
242                        if apply_demands {
243                            let demands = ops.demand();
244                            let new_desc =
245                                source.storage_metadata.relation_desc.apply_demand(&demands);
246                            let new_arity = demands.len();
247                            let remap: BTreeMap<_, _> = demands
248                                .into_iter()
249                                .enumerate()
250                                .map(|(new, old)| (old, new))
251                                .collect();
252                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
253                            read_schema = Some(new_desc);
254                        }
255
256                        mz_expr::MfpPlan::create_from(ops)
257                            .expect("Linear operators should always be valid")
258                    });
259
260                    let mut snapshot_mode = SnapshotMode::Include;
261                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
262                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
263                    if let Some(x) = ct_source_transformer.as_ref() {
264                        snapshot_mode = x.snapshot_mode();
265                        suppress_early_progress_as_of = suppress_early_progress_as_of
266                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
267                    }
268
269                    // Note: For correctness, we require that sources only emit times advanced by
270                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
271                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
272                        inner,
273                        *source_id,
274                        Arc::clone(&compute_state.persist_clients),
275                        &compute_state.txns_ctx,
276                        &compute_state.worker_config,
277                        source.storage_metadata.clone(),
278                        read_schema,
279                        dataflow.as_of.clone(),
280                        snapshot_mode,
281                        until.clone(),
282                        mfp.as_mut(),
283                        compute_state.dataflow_max_inflight_bytes(),
284                        start_signal.clone(),
285                        |error| panic!("compute_import: {error}"),
286                    );
287
288                    let mut source_tokens: Vec<Rc<dyn Any>> = vec![Rc::new(token)];
289
290                    // If `mfp` is non-identity, we need to apply what remains.
291                    // For the moment, assert that it is either trivial or `None`.
292                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
293
294                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
295                    // ensure that the first frontier we report into the dataflow is beyond the
296                    // `as_of`.
297                    if let Some(as_of) = suppress_early_progress_as_of {
298                        ok_stream = suppress_early_progress(ok_stream, as_of);
299                    }
300
301                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
302                        // Apply logical backpressure to the source.
303                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
304                            .get(&compute_state.worker_config);
305                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
306                            .get(&compute_state.worker_config)
307                            .as_millis()
308                            .try_into()
309                            .expect("must fit");
310
311                        let (token, stream) = ok_stream.limit_progress(
312                            output_probe.clone(),
313                            slack,
314                            limit,
315                            upper.clone(),
316                            name.clone(),
317                        );
318                        ok_stream = stream;
319                        source_tokens.push(token);
320                    }
321
322                    // Attach a probe reporting the input frontier.
323                    let input_probe =
324                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
325                    ok_stream = ok_stream.probe_with(&input_probe);
326
327                    // The `suppress_early_progress` operator and the input
328                    // probe both want to work on the untransformed ct input,
329                    // make sure this stays after them.
330                    let (ok_stream, err_stream) = match ct_source_transformer {
331                        None => (ok_stream, err_stream),
332                        Some(ct_source_transformer) => {
333                            let (oks, errs, ct_times) = ct_source_transformer
334                                .transform(ok_stream.as_collection(), err_stream.as_collection());
335                            // TODO(ct3): Ideally this would be encapsulated by
336                            // ContinualTaskCtx, but the types are tricky.
337                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
338                            (oks.inner, errs.inner)
339                        }
340                    };
341
342                    let (oks, errs) = (
343                        ok_stream.as_collection().leave_region().leave_region(),
344                        err_stream.as_collection().leave_region().leave_region(),
345                    );
346
347                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
348
349                    // Associate returned tokens with the source identifier.
350                    tokens.insert(*source_id, Rc::new(source_tokens));
351                });
352            }
353        });
354
355        // If there exists a recursive expression, we'll need to use a non-region scope,
356        // in order to support additional timestamp coordinates for iteration.
357        if recursive {
358            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
359                let mut context = Context::for_dataflow_in(
360                    &dataflow,
361                    region.clone(),
362                    compute_state,
363                    until,
364                    dataflow_expiration,
365                );
366
367                for (id, (oks, errs)) in imported_sources.into_iter() {
368                    let bundle = crate::render::CollectionBundle::from_collections(
369                        oks.enter(region),
370                        errs.enter(region),
371                    );
372                    // Associate collection bundle with the source identifier.
373                    context.insert_id(id, bundle);
374                }
375
376                // Import declared indexes into the rendering context.
377                for (idx_id, idx) in &dataflow.index_imports {
378                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
379                    context.import_index(
380                        compute_state,
381                        &mut tokens,
382                        input_probe,
383                        *idx_id,
384                        &idx.desc,
385                        start_signal.clone(),
386                    );
387                }
388
389                // Build declared objects.
390                for object in dataflow.objects_to_build {
391                    let object_token = Rc::new(());
392                    context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token));
393                    tokens.insert(object.id, object_token);
394
395                    let bundle = context.scope.clone().region_named(
396                        &format!("BuildingObject({:?})", object.id),
397                        |region| {
398                            let depends = object.plan.depends();
399                            context
400                                .enter_region(region, Some(&depends))
401                                .render_recursive_plan(object.id, 0, object.plan)
402                                .leave_region()
403                        },
404                    );
405                    let global_id = object.id;
406
407                    context.log_dataflow_global_id(
408                        *bundle
409                            .scope()
410                            .addr()
411                            .first()
412                            .expect("Dataflow root id must exist"),
413                        global_id,
414                    );
415                    context.insert_id(Id::Global(object.id), bundle);
416                }
417
418                // Export declared indexes.
419                for (idx_id, dependencies, idx) in indexes {
420                    context.export_index_iterative(
421                        compute_state,
422                        &tokens,
423                        dependencies,
424                        idx_id,
425                        &idx,
426                        &output_probe,
427                    );
428                }
429
430                // Export declared sinks.
431                for (sink_id, dependencies, sink) in sinks {
432                    context.export_sink(
433                        compute_state,
434                        &tokens,
435                        dependencies,
436                        sink_id,
437                        &sink,
438                        start_signal.clone(),
439                        ct_ctx.input_times(&context.scope.parent),
440                        &output_probe,
441                    );
442                }
443            });
444        } else {
445            scope.clone().region_named(&build_name, |region| {
446                let mut context = Context::for_dataflow_in(
447                    &dataflow,
448                    region.clone(),
449                    compute_state,
450                    until,
451                    dataflow_expiration,
452                );
453
454                for (id, (oks, errs)) in imported_sources.into_iter() {
455                    let bundle = crate::render::CollectionBundle::from_collections(
456                        oks.enter_region(region),
457                        errs.enter_region(region),
458                    );
459                    // Associate collection bundle with the source identifier.
460                    context.insert_id(id, bundle);
461                }
462
463                // Import declared indexes into the rendering context.
464                for (idx_id, idx) in &dataflow.index_imports {
465                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
466                    context.import_index(
467                        compute_state,
468                        &mut tokens,
469                        input_probe,
470                        *idx_id,
471                        &idx.desc,
472                        start_signal.clone(),
473                    );
474                }
475
476                // Build declared objects.
477                for object in dataflow.objects_to_build {
478                    let object_token = Rc::new(());
479                    context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token));
480                    tokens.insert(object.id, object_token);
481
482                    let bundle = context.scope.clone().region_named(
483                        &format!("BuildingObject({:?})", object.id),
484                        |region| {
485                            let depends = object.plan.depends();
486                            context
487                                .enter_region(region, Some(&depends))
488                                .render_plan(object.id, object.plan)
489                                .leave_region()
490                        },
491                    );
492                    let global_id = object.id;
493                    context.log_dataflow_global_id(
494                        *bundle
495                            .scope()
496                            .addr()
497                            .first()
498                            .expect("Dataflow root id must exist"),
499                        global_id,
500                    );
501                    context.insert_id(Id::Global(object.id), bundle);
502                }
503
504                // Export declared indexes.
505                for (idx_id, dependencies, idx) in indexes {
506                    context.export_index(
507                        compute_state,
508                        &tokens,
509                        dependencies,
510                        idx_id,
511                        &idx,
512                        &output_probe,
513                    );
514                }
515
516                // Export declared sinks.
517                for (sink_id, dependencies, sink) in sinks {
518                    context.export_sink(
519                        compute_state,
520                        &tokens,
521                        dependencies,
522                        sink_id,
523                        &sink,
524                        start_signal.clone(),
525                        ct_ctx.input_times(&context.scope.parent),
526                        &output_probe,
527                    );
528                }
529            });
530        }
531    })
532}
533
534// This implementation block allows child timestamps to vary from parent timestamps,
535// but requires the parent timestamp to be `repr::Timestamp`.
536impl<'g, G, T> Context<Child<'g, G, T>>
537where
538    G: Scope<Timestamp = mz_repr::Timestamp>,
539    T: Refines<G::Timestamp> + RenderTimestamp,
540    <T as Columnar>::Container: Clone + Send,
541{
542    pub(crate) fn import_index(
543        &mut self,
544        compute_state: &mut ComputeState,
545        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
546        input_probe: probe::Handle<mz_repr::Timestamp>,
547        idx_id: GlobalId,
548        idx: &IndexDesc,
549        start_signal: StartSignal,
550    ) {
551        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
552            assert!(
553                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
554                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
555            );
556
557            let token = traces.to_drop().clone();
558
559            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
560                &self.scope.parent,
561                &format!("Index({}, {:?})", idx.on_id, idx.key),
562                self.as_of_frontier.clone(),
563                self.until.clone(),
564            );
565
566            oks.stream = oks.stream.probe_with(&input_probe);
567
568            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
569                &self.scope.parent,
570                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
571                self.as_of_frontier.clone(),
572                self.until.clone(),
573            );
574
575            let ok_arranged = oks
576                .enter(&self.scope)
577                .with_start_signal(start_signal.clone());
578            let err_arranged = err_arranged
579                .enter(&self.scope)
580                .with_start_signal(start_signal);
581
582            self.update_id(
583                Id::Global(idx.on_id),
584                CollectionBundle::from_expressions(
585                    idx.key.clone(),
586                    ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
587                ),
588            );
589            tokens.insert(
590                idx_id,
591                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
592            );
593        } else {
594            panic!(
595                "import of index {} failed while building dataflow {}",
596                idx_id, self.dataflow_id
597            );
598        }
599    }
600}
601
602// This implementation block requires the scopes have the same timestamp as the trace manager.
603// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
604impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
605where
606    G: Scope<Timestamp = mz_repr::Timestamp>,
607{
608    pub(crate) fn export_index(
609        &self,
610        compute_state: &mut ComputeState,
611        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
612        dependency_ids: BTreeSet<GlobalId>,
613        idx_id: GlobalId,
614        idx: &IndexDesc,
615        output_probe: &MzProbeHandle<G::Timestamp>,
616    ) {
617        // put together tokens that belong to the export
618        let mut needed_tokens = Vec::new();
619        for dep_id in dependency_ids {
620            if let Some(token) = tokens.get(&dep_id) {
621                needed_tokens.push(Rc::clone(token));
622            }
623        }
624        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
625            panic!(
626                "Arrangement alarmingly absent! id: {:?}",
627                Id::Global(idx_id)
628            )
629        });
630
631        match bundle.arrangement(&idx.key) {
632            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
633                // Ensure that the frontier does not advance past the expiration time, if set.
634                // Otherwise, we might write down incorrect data.
635                if let Some(&expiration) = self.dataflow_expiration.as_option() {
636                    let token = Rc::new(());
637                    let shutdown_token = Rc::downgrade(&token);
638                    oks.stream = oks.stream.expire_stream_at(
639                        &format!("{}_export_index_oks", self.debug_name),
640                        expiration,
641                        Weak::clone(&shutdown_token),
642                    );
643                    errs.stream = errs.stream.expire_stream_at(
644                        &format!("{}_export_index_errs", self.debug_name),
645                        expiration,
646                        shutdown_token,
647                    );
648                    needed_tokens.push(token);
649                }
650
651                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
652
653                // Attach logging of dataflow errors.
654                if let Some(logger) = compute_state.compute_logger.clone() {
655                    errs.stream.log_dataflow_errors(logger, idx_id);
656                }
657
658                compute_state.traces.set(
659                    idx_id,
660                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
661                );
662            }
663            Some(ArrangementFlavor::Trace(gid, _, _)) => {
664                // Duplicate of existing arrangement with id `gid`, so
665                // just create another handle to that arrangement.
666                let trace = compute_state.traces.get(&gid).unwrap().clone();
667                compute_state.traces.set(idx_id, trace);
668            }
669            None => {
670                println!("collection available: {:?}", bundle.collection.is_none());
671                println!(
672                    "keys available: {:?}",
673                    bundle.arranged.keys().collect::<Vec<_>>()
674                );
675                panic!(
676                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
677                    Id::Global(idx_id),
678                    &idx.key
679                );
680            }
681        };
682    }
683}
684
685// This implementation block requires the scopes have the same timestamp as the trace manager.
686// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
687impl<'g, G, T> Context<Child<'g, G, T>>
688where
689    G: Scope<Timestamp = mz_repr::Timestamp>,
690    T: RenderTimestamp,
691    <T as Columnar>::Container: Clone + Send,
692{
693    pub(crate) fn export_index_iterative(
694        &self,
695        compute_state: &mut ComputeState,
696        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
697        dependency_ids: BTreeSet<GlobalId>,
698        idx_id: GlobalId,
699        idx: &IndexDesc,
700        output_probe: &MzProbeHandle<G::Timestamp>,
701    ) {
702        // put together tokens that belong to the export
703        let mut needed_tokens = Vec::new();
704        for dep_id in dependency_ids {
705            if let Some(token) = tokens.get(&dep_id) {
706                needed_tokens.push(Rc::clone(token));
707            }
708        }
709        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
710            panic!(
711                "Arrangement alarmingly absent! id: {:?}",
712                Id::Global(idx_id)
713            )
714        });
715
716        match bundle.arrangement(&idx.key) {
717            Some(ArrangementFlavor::Local(oks, errs)) => {
718                let mut oks = oks
719                    .as_collection(|k, v| (k.into_owned(), v.into_owned()))
720                    .leave()
721                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
722                        "Arrange export iterative",
723                    );
724
725                let mut errs = errs
726                    .as_collection(|k, v| (k.clone(), v.clone()))
727                    .leave()
728                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
729                        "Arrange export iterative err",
730                    );
731
732                // Ensure that the frontier does not advance past the expiration time, if set.
733                // Otherwise, we might write down incorrect data.
734                if let Some(&expiration) = self.dataflow_expiration.as_option() {
735                    let token = Rc::new(());
736                    let shutdown_token = Rc::downgrade(&token);
737                    oks.stream = oks.stream.expire_stream_at(
738                        &format!("{}_export_index_iterative_oks", self.debug_name),
739                        expiration,
740                        Weak::clone(&shutdown_token),
741                    );
742                    errs.stream = errs.stream.expire_stream_at(
743                        &format!("{}_export_index_iterative_err", self.debug_name),
744                        expiration,
745                        shutdown_token,
746                    );
747                    needed_tokens.push(token);
748                }
749
750                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
751
752                // Attach logging of dataflow errors.
753                if let Some(logger) = compute_state.compute_logger.clone() {
754                    errs.stream.log_dataflow_errors(logger, idx_id);
755                }
756
757                compute_state.traces.set(
758                    idx_id,
759                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
760                );
761            }
762            Some(ArrangementFlavor::Trace(gid, _, _)) => {
763                // Duplicate of existing arrangement with id `gid`, so
764                // just create another handle to that arrangement.
765                let trace = compute_state.traces.get(&gid).unwrap().clone();
766                compute_state.traces.set(idx_id, trace);
767            }
768            None => {
769                println!("collection available: {:?}", bundle.collection.is_none());
770                println!(
771                    "keys available: {:?}",
772                    bundle.arranged.keys().collect::<Vec<_>>()
773                );
774                panic!(
775                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
776                    Id::Global(idx_id),
777                    &idx.key
778                );
779            }
780        };
781    }
782}
783
784impl<G> Context<G>
785where
786    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
787{
788    /// Renders a plan to a differential dataflow, producing the collection of results.
789    ///
790    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
791    /// in the context of `level` pre-existing iteration coordinates.
792    ///
793    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
794    /// and establishing the appropriate recursive dependencies among the bound variables.
795    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
796    /// further [`RecBind`]s are found.
797    ///
798    /// The method requires that all variables conclude with a physical representation that
799    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
800    pub fn render_recursive_plan(
801        &mut self,
802        object_id: GlobalId,
803        level: usize,
804        plan: RenderPlan,
805    ) -> CollectionBundle<G> {
806        for BindStage { lets, recs } in plan.binds {
807            // Render the let bindings in order.
808            for LetBind { id, value } in lets {
809                let bundle =
810                    self.scope
811                        .clone()
812                        .region_named(&format!("Binding({:?})", id), |region| {
813                            let depends = value.depends();
814                            self.enter_region(region, Some(&depends))
815                                .render_letfree_plan(object_id, value)
816                                .leave_region()
817                        });
818                self.insert_id(Id::Local(id), bundle);
819            }
820
821            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
822
823            // Define variables for rec bindings.
824            // It is important that we only use the `Variable` until the object is bound.
825            // At that point, all subsequent uses should have access to the object itself.
826            let mut variables = BTreeMap::new();
827            for id in rec_ids.iter() {
828                use differential_dataflow::dynamic::feedback_summary;
829                use differential_dataflow::operators::iterate::Variable;
830                let inner = feedback_summary::<u64>(level + 1, 1);
831                let oks_v = Variable::new(
832                    &mut self.scope,
833                    Product::new(Default::default(), inner.clone()),
834                );
835                let err_v = Variable::new(&mut self.scope, Product::new(Default::default(), inner));
836
837                self.insert_id(
838                    Id::Local(*id),
839                    CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
840                );
841                variables.insert(Id::Local(*id), (oks_v, err_v));
842            }
843            // Now render each of the rec bindings.
844            for RecBind { id, value, limit } in recs {
845                let bundle = self.render_recursive_plan(object_id, level + 1, value);
846                // We need to ensure that the raw collection exists, but do not have enough information
847                // here to cause that to happen.
848                let (oks, mut err) = bundle.collection.clone().unwrap();
849                self.insert_id(Id::Local(id), bundle);
850                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
851
852                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
853                let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
854                if let Some(token) = &self.shutdown_token.get_inner() {
855                    oks = oks.with_token(Weak::clone(token));
856                }
857
858                if let Some(limit) = limit {
859                    // We swallow the results of the `max_iter`th iteration, because
860                    // these results would go into the `max_iter + 1`th iteration.
861                    let (in_limit, over_limit) =
862                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
863                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
864                            let iteration_index = *ps.get(level).unwrap_or(&0);
865                            // The pointstamp starts counting from 0, so we need to add 1.
866                            iteration_index + 1 >= limit.max_iters.into()
867                        });
868                    oks = Collection::new(in_limit);
869                    if !limit.return_at_limit {
870                        err = err.concat(&Collection::new(over_limit).map(move |_data| {
871                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
872                                format!("{}", limit.max_iters.get()).into(),
873                            )))
874                        }));
875                    }
876                }
877
878                oks_v.set(&oks);
879
880                // Set err variable to the distinct elements of `err`.
881                // Distinctness is important, as we otherwise might add the same error each iteration,
882                // say if the limit of `oks` has an error. This would result in non-terminating rather
883                // than a clean report of the error. The trade-off is that we lose information about
884                // multiplicities of errors, but .. this seems to be the better call.
885                let err: KeyCollection<_, _, _> = err.into();
886                let mut errs = err
887                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
888                        "Arrange recursive err",
889                    )
890                    .mz_reduce_abelian::<_, _, _, ErrBuilder<_, _>, ErrSpine<_, _>>(
891                        "Distinct recursive err",
892                        move |_k, _s, t| t.push(((), Diff::ONE)),
893                    )
894                    .as_collection(|k, _| k.clone());
895                if let Some(token) = &self.shutdown_token.get_inner() {
896                    errs = errs.with_token(Weak::clone(token));
897                }
898                err_v.set(&errs);
899            }
900            // Now extract each of the rec bindings into the outer scope.
901            for id in rec_ids.into_iter() {
902                let bundle = self.remove_id(Id::Local(id)).unwrap();
903                let (oks, err) = bundle.collection.unwrap();
904                self.insert_id(
905                    Id::Local(id),
906                    CollectionBundle::from_collections(
907                        oks.leave_dynamic(level + 1),
908                        err.leave_dynamic(level + 1),
909                    ),
910                );
911            }
912        }
913
914        self.render_letfree_plan(object_id, plan.body)
915    }
916}
917
918impl<G> Context<G>
919where
920    G: Scope,
921    G::Timestamp: RenderTimestamp,
922    <G::Timestamp as Columnar>::Container: Clone + Send,
923    for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
924{
925    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
926    /// results.
927    ///
928    /// The return type reflects the uncertainty about the data representation, perhaps
929    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
930    ///
931    /// # Panics
932    ///
933    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
934    /// `render_recursive_plan` instead.
935    pub fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
936        for BindStage { lets, recs } in plan.binds {
937            assert!(recs.is_empty());
938
939            for LetBind { id, value } in lets {
940                let bundle =
941                    self.scope
942                        .clone()
943                        .region_named(&format!("Binding({:?})", id), |region| {
944                            let depends = value.depends();
945                            self.enter_region(region, Some(&depends))
946                                .render_letfree_plan(object_id, value)
947                                .leave_region()
948                        });
949                self.insert_id(Id::Local(id), bundle);
950            }
951        }
952
953        self.scope.clone().region_named("Main Body", |region| {
954            let depends = plan.body.depends();
955            self.enter_region(region, Some(&depends))
956                .render_letfree_plan(object_id, plan.body)
957                .leave_region()
958        })
959    }
960
961    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
962    fn render_letfree_plan(
963        &mut self,
964        object_id: GlobalId,
965        plan: LetFreePlan,
966    ) -> CollectionBundle<G> {
967        let (mut nodes, root_id, topological_order) = plan.destruct();
968
969        // Rendered collections by their `LirId`.
970        let mut collections = BTreeMap::new();
971
972        // Mappings to send along.
973        // To save overhead, we'll only compute mappings when we need to,
974        // which means things get gated behind options. Unfortuantely, that means we
975        // have several `Option<...>` types that are _all_ `Some` or `None` together,
976        // but there's no convenient way to express the invariant.
977        let should_compute_lir_metadata = self.compute_logger.is_some();
978        let mut lir_mapping_metadata = if should_compute_lir_metadata {
979            Some(Vec::with_capacity(nodes.len()))
980        } else {
981            None
982        };
983
984        for lir_id in topological_order {
985            let node = nodes.remove(&lir_id).unwrap();
986
987            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
988            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
989            // in some other structure and have that structure impl ExprHumanizer
990            let metadata = if should_compute_lir_metadata {
991                let operator = node.expr.humanize(&DummyHumanizer);
992                let operator_id_start = self.scope.peek_identifier();
993                Some((operator, operator_id_start))
994            } else {
995                None
996            };
997
998            let mut bundle = self.render_plan_expr(node.expr, &collections);
999
1000            if let Some((operator, operator_id_start)) = metadata {
1001                let operator_id_end = self.scope.peek_identifier();
1002                let operator_span = (operator_id_start, operator_id_end);
1003
1004                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1005                    lir_mapping_metadata.push((
1006                        lir_id,
1007                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1008                    ))
1009                }
1010            }
1011
1012            self.log_operator_hydration(&mut bundle, lir_id);
1013
1014            collections.insert(lir_id, bundle);
1015        }
1016
1017        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1018            self.log_lir_mapping(object_id, lir_mapping_metadata);
1019        }
1020
1021        collections
1022            .remove(&root_id)
1023            .expect("LetFreePlan invariant (1)")
1024    }
1025
1026    /// Renders a [`render_plan::Expr`], producing the collection of results.
1027    ///
1028    /// # Panics
1029    ///
1030    /// Panics if any of the expr's inputs is not found in `collections`.
1031    /// Callers must ensure that input nodes have been rendered previously.
1032    fn render_plan_expr(
1033        &mut self,
1034        expr: render_plan::Expr,
1035        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1036    ) -> CollectionBundle<G> {
1037        use render_plan::Expr::*;
1038
1039        let expect_input = |id| {
1040            collections
1041                .get(&id)
1042                .cloned()
1043                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1044        };
1045
1046        match expr {
1047            Constant { rows } => {
1048                // Produce both rows and errs to avoid conditional dataflow construction.
1049                let (rows, errs) = match rows {
1050                    Ok(rows) => (rows, Vec::new()),
1051                    Err(e) => (Vec::new(), vec![e]),
1052                };
1053
1054                // We should advance times in constant collections to start from `as_of`.
1055                let as_of_frontier = self.as_of_frontier.clone();
1056                let until = self.until.clone();
1057                let ok_collection = rows
1058                    .into_iter()
1059                    .filter_map(move |(row, mut time, diff)| {
1060                        time.advance_by(as_of_frontier.borrow());
1061                        if !until.less_equal(&time) {
1062                            Some((
1063                                row,
1064                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1065                                diff,
1066                            ))
1067                        } else {
1068                            None
1069                        }
1070                    })
1071                    .to_stream(&mut self.scope)
1072                    .as_collection();
1073
1074                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1075                error_time.advance_by(self.as_of_frontier.borrow());
1076                let err_collection = errs
1077                    .into_iter()
1078                    .map(move |e| {
1079                        (
1080                            DataflowError::from(e),
1081                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1082                            Diff::ONE,
1083                        )
1084                    })
1085                    .to_stream(&mut self.scope)
1086                    .as_collection();
1087
1088                CollectionBundle::from_collections(ok_collection, err_collection)
1089            }
1090            Get { id, keys, plan } => {
1091                // Recover the collection from `self` and then apply `mfp` to it.
1092                // If `mfp` happens to be trivial, we can just return the collection.
1093                let mut collection = self
1094                    .lookup_id(id)
1095                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1096                match plan {
1097                    mz_compute_types::plan::GetPlan::PassArrangements => {
1098                        // Assert that each of `keys` are present in `collection`.
1099                        assert!(
1100                            keys.arranged
1101                                .iter()
1102                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1103                        );
1104                        assert!(keys.raw <= collection.collection.is_some());
1105                        // Retain only those keys we want to import.
1106                        collection.arranged.retain(|key, _value| {
1107                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1108                        });
1109                        collection
1110                    }
1111                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1112                        let (oks, errs) = collection.as_collection_core(
1113                            mfp,
1114                            Some((key, row)),
1115                            self.until.clone(),
1116                            &self.config_set,
1117                        );
1118                        CollectionBundle::from_collections(oks, errs)
1119                    }
1120                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1121                        let (oks, errs) = collection.as_collection_core(
1122                            mfp,
1123                            None,
1124                            self.until.clone(),
1125                            &self.config_set,
1126                        );
1127                        CollectionBundle::from_collections(oks, errs)
1128                    }
1129                }
1130            }
1131            Mfp {
1132                input,
1133                mfp,
1134                input_key_val,
1135            } => {
1136                let input = expect_input(input);
1137                // If `mfp` is non-trivial, we should apply it and produce a collection.
1138                if mfp.is_identity() {
1139                    input
1140                } else {
1141                    let (oks, errs) = input.as_collection_core(
1142                        mfp,
1143                        input_key_val,
1144                        self.until.clone(),
1145                        &self.config_set,
1146                    );
1147                    CollectionBundle::from_collections(oks, errs)
1148                }
1149            }
1150            FlatMap {
1151                input,
1152                func,
1153                exprs,
1154                mfp_after: mfp,
1155                input_key,
1156            } => {
1157                let input = expect_input(input);
1158                self.render_flat_map(input, func, exprs, mfp, input_key)
1159            }
1160            Join { inputs, plan } => {
1161                let inputs = inputs.into_iter().map(expect_input).collect();
1162                match plan {
1163                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1164                        self.render_join(inputs, linear_plan)
1165                    }
1166                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1167                        self.render_delta_join(inputs, delta_plan)
1168                    }
1169                }
1170            }
1171            Reduce {
1172                input,
1173                key_val_plan,
1174                plan,
1175                input_key,
1176                mfp_after,
1177            } => {
1178                let input = expect_input(input);
1179                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1180                self.render_reduce(input, key_val_plan, plan, input_key, mfp_option)
1181            }
1182            TopK { input, top_k_plan } => {
1183                let input = expect_input(input);
1184                self.render_topk(input, top_k_plan)
1185            }
1186            Negate { input } => {
1187                let input = expect_input(input);
1188                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1189                CollectionBundle::from_collections(oks.negate(), errs)
1190            }
1191            Threshold {
1192                input,
1193                threshold_plan,
1194            } => {
1195                let input = expect_input(input);
1196                self.render_threshold(input, threshold_plan)
1197            }
1198            Union {
1199                inputs,
1200                consolidate_output,
1201            } => {
1202                let mut oks = Vec::new();
1203                let mut errs = Vec::new();
1204                for input in inputs.into_iter() {
1205                    let (os, es) =
1206                        expect_input(input).as_specific_collection(None, &self.config_set);
1207                    oks.push(os);
1208                    errs.push(es);
1209                }
1210                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1211                if consolidate_output {
1212                    oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1213                }
1214                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1215                CollectionBundle::from_collections(oks, errs)
1216            }
1217            ArrangeBy {
1218                input,
1219                forms: keys,
1220                input_key,
1221                input_mfp,
1222            } => {
1223                let input = expect_input(input);
1224                input.ensure_collections(
1225                    keys,
1226                    input_key,
1227                    input_mfp,
1228                    self.until.clone(),
1229                    &self.config_set,
1230                )
1231            }
1232        }
1233    }
1234
1235    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1236        if let Some(logger) = &self.compute_logger {
1237            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1238                dataflow_index,
1239                global_id,
1240            }));
1241        }
1242    }
1243
1244    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1245        if let Some(logger) = &self.compute_logger {
1246            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1247        }
1248    }
1249
1250    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1251        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1252        // which we should attach the logging operator.
1253        //
1254        // We could attach to each collection and track the lower bound of output frontiers.
1255        // However, that would be of limited use because we expect all collections to hydrate at
1256        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1257        // frontier of the unarranged collection advance, it will perform all work necessary to
1258        // also advance its own frontier. We don't expect significant delays between frontier
1259        // advancements of the unarranged and arranged collections, so attaching the logging
1260        // operator to any one of them should produce accurate results.
1261        //
1262        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1263        // beneficial to attach the logging operator to one of the arranged representation to avoid
1264        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1265        // if we attached the logging operator to it, we would introduce a fork in its output
1266        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1267        // that the output streams of the arrangements don't yet feed into anything else, so
1268        // attaching a (pass-through) logging operator does not introduce a fork.
1269
1270        match bundle.arranged.values_mut().next() {
1271            Some(arrangement) => {
1272                use ArrangementFlavor::*;
1273
1274                match arrangement {
1275                    Local(a, _) => {
1276                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1277                    }
1278                    Trace(_, a, _) => {
1279                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1280                    }
1281                }
1282            }
1283            None => {
1284                let (oks, _) = bundle
1285                    .collection
1286                    .as_mut()
1287                    .expect("CollectionBundle invariant");
1288                let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1289                *oks = stream.as_collection();
1290            }
1291        }
1292    }
1293
1294    fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1295    where
1296        D: Clone + 'static,
1297    {
1298        let Some(logger) = self.hydration_logger.clone() else {
1299            return stream.clone(); // hydration logging disabled
1300        };
1301
1302        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1303        //
1304        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1305        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1306        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1307        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1308        // instead.
1309        let mut hydration_frontier = Antichain::new();
1310        for time in self.as_of_frontier.iter() {
1311            if let Some(time) = time.try_step_forward() {
1312                hydration_frontier.insert(Refines::to_inner(time));
1313            }
1314        }
1315
1316        let name = format!("LogOperatorHydration ({lir_id})");
1317        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1318            let mut hydrated = false;
1319            logger.log(lir_id, hydrated);
1320
1321            move |input, output| {
1322                // Pass through inputs.
1323                input.for_each(|cap, data| {
1324                    output.session(&cap).give_container(data);
1325                });
1326
1327                if hydrated {
1328                    return;
1329                }
1330
1331                let frontier = input.frontier().frontier();
1332                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier) {
1333                    hydrated = true;
1334                    logger.log(lir_id, hydrated);
1335                }
1336            }
1337        })
1338    }
1339}
1340
1341#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1342/// A timestamp type that can be used for operations within MZ's dataflow layer.
1343pub trait RenderTimestamp:
1344    Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation + Columnar
1345where
1346    <Self as Columnar>::Container: Clone + Send,
1347{
1348    /// The system timestamp component of the timestamp.
1349    ///
1350    /// This is useful for manipulating the system time, as when delaying
1351    /// updates for subsequent cancellation, as with monotonic reduction.
1352    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1353    /// Effects a system delay in terms of the timestamp summary.
1354    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1355    /// The event timestamp component of the timestamp.
1356    fn event_time(&self) -> mz_repr::Timestamp;
1357    /// The event timestamp component of the timestamp, as a mutable reference.
1358    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1359    /// Effects an event delay in terms of the timestamp summary.
1360    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1361    /// Steps the timestamp back so that logical compaction to the output will
1362    /// not conflate `self` with any historical times.
1363    fn step_back(&self) -> Self;
1364}
1365
1366impl RenderTimestamp for mz_repr::Timestamp {
1367    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1368        self
1369    }
1370    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1371        delay
1372    }
1373    fn event_time(&self) -> mz_repr::Timestamp {
1374        *self
1375    }
1376    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1377        self
1378    }
1379    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1380        delay
1381    }
1382    fn step_back(&self) -> Self {
1383        self.saturating_sub(1)
1384    }
1385}
1386
1387impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1388    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1389        &mut self.outer
1390    }
1391    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1392        Product::new(delay, Default::default())
1393    }
1394    fn event_time(&self) -> mz_repr::Timestamp {
1395        self.outer
1396    }
1397    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1398        &mut self.outer
1399    }
1400    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1401        Product::new(delay, Default::default())
1402    }
1403    fn step_back(&self) -> Self {
1404        // It is necessary to step back both coordinates of a product,
1405        // and when one is a `PointStamp` that also means all coordinates
1406        // of the pointstamp.
1407        let inner = self.inner.clone();
1408        let mut vec = inner.into_vec();
1409        for item in vec.iter_mut() {
1410            *item = item.saturating_sub(1);
1411        }
1412        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1413    }
1414}
1415
1416/// A signal that can be awaited by operators to suspend them prior to startup.
1417///
1418/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1419///
1420/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1421///
1422///  * Async operators can simply `await` it.
1423///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1424///    and then check `StartSignal::has_fired()` on each activation.
1425#[derive(Clone)]
1426pub(crate) struct StartSignal {
1427    /// A future that completes when the signal fires.
1428    ///
1429    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1430    /// signal is activated by dropping the corresponding `Sender`.
1431    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1432    /// A weak reference to the token, to register drop-on-fire values.
1433    token_ref: Weak<RefCell<Box<dyn Any>>>,
1434}
1435
1436impl StartSignal {
1437    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1438    /// dropped.
1439    pub fn new() -> (Self, Rc<dyn Any>) {
1440        let (tx, rx) = oneshot::channel::<Infallible>();
1441        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1442        let signal = Self {
1443            fut: rx.shared(),
1444            token_ref: Rc::downgrade(&token),
1445        };
1446        (signal, token)
1447    }
1448
1449    pub fn has_fired(&self) -> bool {
1450        self.token_ref.strong_count() == 0
1451    }
1452
1453    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1454        if let Some(token) = self.token_ref.upgrade() {
1455            let mut token = token.borrow_mut();
1456            let inner = std::mem::replace(&mut *token, Box::new(()));
1457            *token = Box::new((inner, to_drop));
1458        }
1459    }
1460}
1461
1462impl Future for StartSignal {
1463    type Output = ();
1464
1465    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1466        self.fut.poll_unpin(cx).map(|_| ())
1467    }
1468}
1469
1470/// Extension trait to attach a `StartSignal` to operator outputs.
1471pub(crate) trait WithStartSignal {
1472    /// Delays data and progress updates until the start signal has fired.
1473    ///
1474    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1475    /// depending on the amount and shape of its inputs.
1476    fn with_start_signal(self, signal: StartSignal) -> Self;
1477}
1478
1479impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1480where
1481    S: Scope,
1482    S::Timestamp: RenderTimestamp,
1483    <S::Timestamp as Columnar>::Container: Clone + Send,
1484    Tr: TraceReader + Clone,
1485{
1486    fn with_start_signal(self, signal: StartSignal) -> Self {
1487        Arranged {
1488            stream: self.stream.with_start_signal(signal),
1489            trace: self.trace,
1490        }
1491    }
1492}
1493
1494impl<S, D> WithStartSignal for Stream<S, D>
1495where
1496    S: Scope,
1497    D: timely::Data,
1498{
1499    fn with_start_signal(self, signal: StartSignal) -> Self {
1500        self.unary(Pipeline, "StartSignal", |_cap, info| {
1501            let token = Box::new(ActivateOnDrop::new(
1502                (),
1503                info.address,
1504                self.scope().activations(),
1505            ));
1506            signal.drop_on_fire(token);
1507
1508            let mut stash = Vec::new();
1509
1510            move |input, output| {
1511                // Stash incoming updates as long as the start signal has not fired.
1512                if !signal.has_fired() {
1513                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1514                    return;
1515                }
1516
1517                // Release any data we might still have stashed.
1518                for (cap, mut data) in std::mem::take(&mut stash) {
1519                    output.session(&cap).give_container(&mut data);
1520                }
1521
1522                // Pass through all remaining input data.
1523                input.for_each(|cap, data| {
1524                    output.session(&cap).give_container(data);
1525                });
1526            }
1527        })
1528    }
1529}
1530
1531/// Suppress progress messages for times before the given `as_of`.
1532///
1533/// This operator exists specifically to work around a memory spike we'd otherwise see when
1534/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1535/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1536/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1537/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1538/// the snapshot batch.
1539///
1540/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1541/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1542/// beyond the `as_of`, so the snapshot data has already been collected.
1543///
1544/// To ensure this, this operator needs to take two measures:
1545///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1546///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1547///
1548/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1549/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1550/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1551/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1552fn suppress_early_progress<G, D>(
1553    stream: Stream<G, D>,
1554    as_of: Antichain<G::Timestamp>,
1555) -> Stream<G, D>
1556where
1557    G: Scope,
1558    D: Data,
1559{
1560    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1561        let mut early_cap = Some(default_cap);
1562
1563        move |input, output| {
1564            input.for_each(|data_cap, data| {
1565                let mut session = if as_of.less_than(data_cap.time()) {
1566                    output.session(&data_cap)
1567                } else {
1568                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1569                    output.session(cap)
1570                };
1571                session.give_container(data);
1572            });
1573
1574            let frontier = input.frontier().frontier();
1575            if !PartialOrder::less_equal(&frontier, &as_of.borrow()) {
1576                early_cap.take();
1577            }
1578        }
1579    })
1580}
1581
1582/// Extension trait for [`StreamCore`] to selectively limit progress.
1583trait LimitProgress<T: Timestamp> {
1584    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1585    /// the implementation to observe times in data, and release capabilities based on the probe's
1586    /// frontier, after applying `slack` to round up timestamps.
1587    ///
1588    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1589    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1590    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1591    /// for times before the `upper`, which is the time until which the source can distinguish
1592    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1593    /// our capability.
1594    ///
1595    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1596    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1597    ///
1598    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1599    /// allow downstream operators to release resources.
1600    ///
1601    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1602    /// avoid unbounded memory usage.
1603    ///
1604    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1605    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1606    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1607    /// * `name` is a human-readable name for the operator.
1608    /// * `limit` is the maximum number of pending times to keep around.
1609    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1610    ///   retain a capability.
1611    ///
1612    /// Returns a shutdown button and the stream with the progress limiting applied.
1613    fn limit_progress(
1614        &self,
1615        handle: MzProbeHandle<T>,
1616        slack_ms: u64,
1617        limit: Option<usize>,
1618        upper: Antichain<T>,
1619        name: String,
1620    ) -> (Rc<dyn Any>, Self);
1621}
1622
1623// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1624// number.
1625impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1626where
1627    G: Scope<Timestamp = mz_repr::Timestamp>,
1628    D: timely::Data,
1629    R: timely::Data,
1630{
1631    fn limit_progress(
1632        &self,
1633        handle: MzProbeHandle<mz_repr::Timestamp>,
1634        slack_ms: u64,
1635        limit: Option<usize>,
1636        upper: Antichain<mz_repr::Timestamp>,
1637        name: String,
1638    ) -> (Rc<dyn Any>, Self) {
1639        let mut button = None;
1640
1641        let stream =
1642            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1643                // Times that we've observed on our input.
1644                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1645                // Capability for the lower bound of `pending_times`, if any.
1646                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1647
1648                let activator = self.scope().activator_for(info.address);
1649                handle.activate(activator.clone());
1650
1651                let shutdown = Rc::new(());
1652                button = Some(ShutdownButton::new(
1653                    Rc::new(RefCell::new(Some(Rc::clone(&shutdown)))),
1654                    activator,
1655                ));
1656                let shutdown = Rc::downgrade(&shutdown);
1657
1658                move |input, output| {
1659                    // We've been shut down, release all resources and consume inputs.
1660                    if shutdown.strong_count() == 0 {
1661                        retained_cap = None;
1662                        pending_times.clear();
1663                        while let Some(_) = input.next() {}
1664                        return;
1665                    }
1666
1667                    while let Some((cap, data)) = input.next() {
1668                        for time in data
1669                            .iter()
1670                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1671                        {
1672                            let rounded_time =
1673                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1674                            if !upper.less_than(&rounded_time.into()) {
1675                                pending_times.insert(rounded_time.into());
1676                            }
1677                        }
1678                        output.session(&cap).give_container(data);
1679                        if retained_cap.as_ref().is_none_or(|c| {
1680                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1681                        }) {
1682                            retained_cap = Some(cap.retain());
1683                        }
1684                    }
1685
1686                    handle.with_frontier(|f| {
1687                        while pending_times
1688                            .first()
1689                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1690                        {
1691                            let _ = pending_times.pop_first();
1692                        }
1693                    });
1694
1695                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1696                        let _ = pending_times.pop_first();
1697                    }
1698
1699                    match (retained_cap.as_mut(), pending_times.first()) {
1700                        (Some(cap), Some(first)) => cap.downgrade(first),
1701                        (_, None) => retained_cap = None,
1702                        _ => {}
1703                    }
1704
1705                    if input.frontier.is_empty() {
1706                        retained_cap = None;
1707                        pending_times.clear();
1708                    }
1709
1710                    if !pending_times.is_empty() {
1711                        tracing::debug!(
1712                            name,
1713                            info.global_id,
1714                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1715                            frontier = ?input.frontier.frontier().get(0),
1716                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1717                            ?upper,
1718                            "pending times",
1719                        );
1720                    }
1721                }
1722            });
1723        (Rc::new(button.unwrap().press_on_drop()), stream)
1724    }
1725}
1726
1727/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1728/// the difference between timestamps.
1729struct PendingTimesDisplay<T>(T);
1730
1731impl<T> std::fmt::Display for PendingTimesDisplay<T>
1732where
1733    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1734{
1735    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1736        let mut iter = self.0.clone().into_iter();
1737        write!(f, "[")?;
1738        if let Some(first) = iter.next() {
1739            write!(f, "{}", first)?;
1740            let mut last = u64::from(first);
1741            for time in iter {
1742                write!(f, ", +{}", u64::from(time) - last)?;
1743                last = u64::from(time);
1744            }
1745        }
1746        write!(f, "]")?;
1747        Ok(())
1748    }
1749}
1750
1751/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1752/// into two rows, given the arity of the first component.
1753#[derive(Clone, Copy, Debug)]
1754struct Pairer {
1755    split_arity: usize,
1756}
1757
1758impl Pairer {
1759    /// Creates a pairer with knowledge of the arity of first component in the pair.
1760    fn new(split_arity: usize) -> Self {
1761        Self { split_arity }
1762    }
1763
1764    /// Merges a pair of datum iterators creating a `Row` instance.
1765    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1766    where
1767        I1: IntoIterator<Item = Datum<'a>>,
1768        I2: IntoIterator<Item = Datum<'a>>,
1769    {
1770        SharedRow::pack(first.into_iter().chain(second))
1771    }
1772
1773    /// Splits a datum iterator into a pair of `Row` instances.
1774    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1775        let mut datum_iter = datum_iter.into_iter();
1776        let binding = SharedRow::get();
1777        let mut row_builder = binding.borrow_mut();
1778        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1779        let second = row_builder.pack_using(datum_iter);
1780        (first, second)
1781    }
1782}