mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::{Arranged, ShutdownButton};
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::TraceReader;
118use differential_dataflow::{AsCollection, Collection, Data};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    ENABLE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use timely::PartialOrder;
141use timely::communication::Allocate;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::Product;
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::{AsWorker, Worker as TimelyWorker};
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context, ShutdownProbe, shutdown_token};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
181///
182/// This method imports sources from provided assets, and then builds the remaining
183/// dataflow using "compute-local" assets like shared arrangements, and producing
184/// both arrangements and sinks.
185pub fn build_compute_dataflow<A: Allocate>(
186    timely_worker: &mut TimelyWorker<A>,
187    compute_state: &mut ComputeState,
188    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189    start_signal: StartSignal,
190    until: Antichain<mz_repr::Timestamp>,
191    dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193    // Mutually recursive view definitions require special handling.
194    let recursive = dataflow
195        .objects_to_build
196        .iter()
197        .any(|object| object.plan.is_recursive());
198
199    // Determine indexes to export, and their dependencies.
200    let indexes = dataflow
201        .index_exports
202        .iter()
203        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204        .collect::<Vec<_>>();
205
206    // Determine sinks to export, and their dependencies.
207    let sinks = dataflow
208        .sink_exports
209        .iter()
210        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211        .collect::<Vec<_>>();
212
213    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215
216    let name = format!("Dataflow: {}", &dataflow.debug_name);
217    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
218    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
219
220    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
221        // TODO(ct3): This should be a config of the source instead, but at least try
222        // to contain the hacks.
223        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
224
225        // The scope.clone() occurs to allow import in the region.
226        // We build a region here to establish a pattern of a scope inside the dataflow,
227        // so that other similar uses (e.g. with iterative scopes) do not require weird
228        // alternate type signatures.
229        let mut imported_sources = Vec::new();
230        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
231        let output_probe = MzProbeHandle::default();
232
233        scope.clone().region_named(&input_name, |region| {
234            // Import declared sources into the rendering context.
235            for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
236                region.region_named(&format!("Source({:?})", source_id), |inner| {
237                    let mut read_schema = None;
238                    let mut mfp = source.arguments.operators.clone().map(|mut ops| {
239                        // If enabled, we read from Persist with a `RelationDesc` that
240                        // omits uneeded columns.
241                        if apply_demands {
242                            let demands = ops.demand();
243                            let new_desc =
244                                source.storage_metadata.relation_desc.apply_demand(&demands);
245                            let new_arity = demands.len();
246                            let remap: BTreeMap<_, _> = demands
247                                .into_iter()
248                                .enumerate()
249                                .map(|(new, old)| (old, new))
250                                .collect();
251                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
252                            read_schema = Some(new_desc);
253                        }
254
255                        mz_expr::MfpPlan::create_from(ops)
256                            .expect("Linear operators should always be valid")
257                    });
258
259                    let mut snapshot_mode = SnapshotMode::Include;
260                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
261                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
262                    if let Some(x) = ct_source_transformer.as_ref() {
263                        snapshot_mode = x.snapshot_mode();
264                        suppress_early_progress_as_of = suppress_early_progress_as_of
265                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
266                    }
267
268                    // Note: For correctness, we require that sources only emit times advanced by
269                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
270                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
271                        inner,
272                        *source_id,
273                        Arc::clone(&compute_state.persist_clients),
274                        &compute_state.txns_ctx,
275                        &compute_state.worker_config,
276                        source.storage_metadata.clone(),
277                        read_schema,
278                        dataflow.as_of.clone(),
279                        snapshot_mode,
280                        until.clone(),
281                        mfp.as_mut(),
282                        compute_state.dataflow_max_inflight_bytes(),
283                        start_signal.clone(),
284                        ErrorHandler::Halt("compute_import"),
285                    );
286
287                    let mut source_tokens: Vec<Rc<dyn Any>> = vec![Rc::new(token)];
288
289                    // If `mfp` is non-identity, we need to apply what remains.
290                    // For the moment, assert that it is either trivial or `None`.
291                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
292
293                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
294                    // ensure that the first frontier we report into the dataflow is beyond the
295                    // `as_of`.
296                    if let Some(as_of) = suppress_early_progress_as_of {
297                        ok_stream = suppress_early_progress(ok_stream, as_of);
298                    }
299
300                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
301                        // Apply logical backpressure to the source.
302                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
303                            .get(&compute_state.worker_config);
304                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
305                            .get(&compute_state.worker_config)
306                            .as_millis()
307                            .try_into()
308                            .expect("must fit");
309
310                        let (token, stream) = ok_stream.limit_progress(
311                            output_probe.clone(),
312                            slack,
313                            limit,
314                            upper.clone(),
315                            name.clone(),
316                        );
317                        ok_stream = stream;
318                        source_tokens.push(token);
319                    }
320
321                    // Attach a probe reporting the input frontier.
322                    let input_probe =
323                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
324                    ok_stream = ok_stream.probe_with(&input_probe);
325
326                    // The `suppress_early_progress` operator and the input
327                    // probe both want to work on the untransformed ct input,
328                    // make sure this stays after them.
329                    let (ok_stream, err_stream) = match ct_source_transformer {
330                        None => (ok_stream, err_stream),
331                        Some(ct_source_transformer) => {
332                            let (oks, errs, ct_times) = ct_source_transformer
333                                .transform(ok_stream.as_collection(), err_stream.as_collection());
334                            // TODO(ct3): Ideally this would be encapsulated by
335                            // ContinualTaskCtx, but the types are tricky.
336                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
337                            (oks.inner, errs.inner)
338                        }
339                    };
340
341                    let (oks, errs) = (
342                        ok_stream.as_collection().leave_region().leave_region(),
343                        err_stream.as_collection().leave_region().leave_region(),
344                    );
345
346                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
347
348                    // Associate returned tokens with the source identifier.
349                    tokens.insert(*source_id, Rc::new(source_tokens));
350                });
351            }
352        });
353
354        // If there exists a recursive expression, we'll need to use a non-region scope,
355        // in order to support additional timestamp coordinates for iteration.
356        if recursive {
357            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
358                let mut context = Context::for_dataflow_in(
359                    &dataflow,
360                    region.clone(),
361                    compute_state,
362                    until,
363                    dataflow_expiration,
364                );
365
366                for (id, (oks, errs)) in imported_sources.into_iter() {
367                    let bundle = crate::render::CollectionBundle::from_collections(
368                        oks.enter(region),
369                        errs.enter(region),
370                    );
371                    // Associate collection bundle with the source identifier.
372                    context.insert_id(id, bundle);
373                }
374
375                // Import declared indexes into the rendering context.
376                for (idx_id, idx) in &dataflow.index_imports {
377                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
378                    context.import_index(
379                        compute_state,
380                        &mut tokens,
381                        input_probe,
382                        *idx_id,
383                        &idx.desc,
384                        start_signal.clone(),
385                    );
386                }
387
388                // Build declared objects.
389                for object in dataflow.objects_to_build {
390                    let (probe, token) = shutdown_token(region);
391                    context.shutdown_probe = probe;
392                    tokens.insert(object.id, Rc::new(token));
393
394                    let bundle = context.scope.clone().region_named(
395                        &format!("BuildingObject({:?})", object.id),
396                        |region| {
397                            let depends = object.plan.depends();
398                            let in_let = object.plan.is_recursive();
399                            context
400                                .enter_region(region, Some(&depends))
401                                .render_recursive_plan(
402                                    object.id,
403                                    0,
404                                    object.plan,
405                                    // recursive plans _must_ have bodies in a let
406                                    BindingInfo::Body { in_let },
407                                )
408                                .leave_region()
409                        },
410                    );
411                    let global_id = object.id;
412
413                    context.log_dataflow_global_id(
414                        *bundle
415                            .scope()
416                            .addr()
417                            .first()
418                            .expect("Dataflow root id must exist"),
419                        global_id,
420                    );
421                    context.insert_id(Id::Global(object.id), bundle);
422                }
423
424                // Export declared indexes.
425                for (idx_id, dependencies, idx) in indexes {
426                    context.export_index_iterative(
427                        compute_state,
428                        &tokens,
429                        dependencies,
430                        idx_id,
431                        &idx,
432                        &output_probe,
433                    );
434                }
435
436                // Export declared sinks.
437                for (sink_id, dependencies, sink) in sinks {
438                    context.export_sink(
439                        compute_state,
440                        &tokens,
441                        dependencies,
442                        sink_id,
443                        &sink,
444                        start_signal.clone(),
445                        ct_ctx.input_times(&context.scope.parent),
446                        &output_probe,
447                    );
448                }
449            });
450        } else {
451            scope.clone().region_named(&build_name, |region| {
452                let mut context = Context::for_dataflow_in(
453                    &dataflow,
454                    region.clone(),
455                    compute_state,
456                    until,
457                    dataflow_expiration,
458                );
459
460                for (id, (oks, errs)) in imported_sources.into_iter() {
461                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
462                        let as_of = context.as_of_frontier.clone();
463                        let summary = TEMPORAL_BUCKETING_SUMMARY
464                            .get(&compute_state.worker_config)
465                            .try_into()
466                            .expect("must fit");
467                        oks.inner
468                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
469                            .as_collection()
470                    } else {
471                        oks
472                    };
473                    let bundle = crate::render::CollectionBundle::from_collections(
474                        oks.enter_region(region),
475                        errs.enter_region(region),
476                    );
477                    // Associate collection bundle with the source identifier.
478                    context.insert_id(id, bundle);
479                }
480
481                // Import declared indexes into the rendering context.
482                for (idx_id, idx) in &dataflow.index_imports {
483                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
484                    context.import_index(
485                        compute_state,
486                        &mut tokens,
487                        input_probe,
488                        *idx_id,
489                        &idx.desc,
490                        start_signal.clone(),
491                    );
492                }
493
494                // Build declared objects.
495                for object in dataflow.objects_to_build {
496                    let (probe, token) = shutdown_token(region);
497                    context.shutdown_probe = probe;
498                    tokens.insert(object.id, Rc::new(token));
499
500                    let bundle = context.scope.clone().region_named(
501                        &format!("BuildingObject({:?})", object.id),
502                        |region| {
503                            let depends = object.plan.depends();
504                            context
505                                .enter_region(region, Some(&depends))
506                                .render_plan(object.id, object.plan)
507                                .leave_region()
508                        },
509                    );
510                    let global_id = object.id;
511                    context.log_dataflow_global_id(
512                        *bundle
513                            .scope()
514                            .addr()
515                            .first()
516                            .expect("Dataflow root id must exist"),
517                        global_id,
518                    );
519                    context.insert_id(Id::Global(object.id), bundle);
520                }
521
522                // Export declared indexes.
523                for (idx_id, dependencies, idx) in indexes {
524                    context.export_index(
525                        compute_state,
526                        &tokens,
527                        dependencies,
528                        idx_id,
529                        &idx,
530                        &output_probe,
531                    );
532                }
533
534                // Export declared sinks.
535                for (sink_id, dependencies, sink) in sinks {
536                    context.export_sink(
537                        compute_state,
538                        &tokens,
539                        dependencies,
540                        sink_id,
541                        &sink,
542                        start_signal.clone(),
543                        ct_ctx.input_times(&context.scope.parent),
544                        &output_probe,
545                    );
546                }
547            });
548        }
549    })
550}
551
552// This implementation block allows child timestamps to vary from parent timestamps,
553// but requires the parent timestamp to be `repr::Timestamp`.
554impl<'g, G, T> Context<Child<'g, G, T>>
555where
556    G: Scope<Timestamp = mz_repr::Timestamp>,
557    T: Refines<G::Timestamp> + RenderTimestamp,
558{
559    pub(crate) fn import_index(
560        &mut self,
561        compute_state: &mut ComputeState,
562        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
563        input_probe: probe::Handle<mz_repr::Timestamp>,
564        idx_id: GlobalId,
565        idx: &IndexDesc,
566        start_signal: StartSignal,
567    ) {
568        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
569            assert!(
570                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
571                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
572            );
573
574            let token = traces.to_drop().clone();
575
576            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
577                &self.scope.parent,
578                &format!("Index({}, {:?})", idx.on_id, idx.key),
579                self.as_of_frontier.clone(),
580                self.until.clone(),
581            );
582
583            oks.stream = oks.stream.probe_with(&input_probe);
584
585            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
586                &self.scope.parent,
587                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
588                self.as_of_frontier.clone(),
589                self.until.clone(),
590            );
591
592            let ok_arranged = oks
593                .enter(&self.scope)
594                .with_start_signal(start_signal.clone());
595            let err_arranged = err_arranged
596                .enter(&self.scope)
597                .with_start_signal(start_signal);
598
599            self.update_id(
600                Id::Global(idx.on_id),
601                CollectionBundle::from_expressions(
602                    idx.key.clone(),
603                    ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
604                ),
605            );
606            tokens.insert(
607                idx_id,
608                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
609            );
610        } else {
611            panic!(
612                "import of index {} failed while building dataflow {}",
613                idx_id, self.dataflow_id
614            );
615        }
616    }
617}
618
619// This implementation block requires the scopes have the same timestamp as the trace manager.
620// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
621impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
622where
623    G: Scope<Timestamp = mz_repr::Timestamp>,
624{
625    pub(crate) fn export_index(
626        &self,
627        compute_state: &mut ComputeState,
628        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
629        dependency_ids: BTreeSet<GlobalId>,
630        idx_id: GlobalId,
631        idx: &IndexDesc,
632        output_probe: &MzProbeHandle<G::Timestamp>,
633    ) {
634        // put together tokens that belong to the export
635        let mut needed_tokens = Vec::new();
636        for dep_id in dependency_ids {
637            if let Some(token) = tokens.get(&dep_id) {
638                needed_tokens.push(Rc::clone(token));
639            }
640        }
641        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
642            panic!(
643                "Arrangement alarmingly absent! id: {:?}",
644                Id::Global(idx_id)
645            )
646        });
647
648        match bundle.arrangement(&idx.key) {
649            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
650                // Ensure that the frontier does not advance past the expiration time, if set.
651                // Otherwise, we might write down incorrect data.
652                if let Some(&expiration) = self.dataflow_expiration.as_option() {
653                    let token = Rc::new(());
654                    let shutdown_token = Rc::downgrade(&token);
655                    oks.stream = oks.stream.expire_stream_at(
656                        &format!("{}_export_index_oks", self.debug_name),
657                        expiration,
658                        Weak::clone(&shutdown_token),
659                    );
660                    errs.stream = errs.stream.expire_stream_at(
661                        &format!("{}_export_index_errs", self.debug_name),
662                        expiration,
663                        shutdown_token,
664                    );
665                    needed_tokens.push(token);
666                }
667
668                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
669
670                // Attach logging of dataflow errors.
671                if let Some(logger) = compute_state.compute_logger.clone() {
672                    errs.stream.log_dataflow_errors(logger, idx_id);
673                }
674
675                compute_state.traces.set(
676                    idx_id,
677                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
678                );
679            }
680            Some(ArrangementFlavor::Trace(gid, _, _)) => {
681                // Duplicate of existing arrangement with id `gid`, so
682                // just create another handle to that arrangement.
683                let trace = compute_state.traces.get(&gid).unwrap().clone();
684                compute_state.traces.set(idx_id, trace);
685            }
686            None => {
687                println!("collection available: {:?}", bundle.collection.is_none());
688                println!(
689                    "keys available: {:?}",
690                    bundle.arranged.keys().collect::<Vec<_>>()
691                );
692                panic!(
693                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
694                    Id::Global(idx_id),
695                    &idx.key
696                );
697            }
698        };
699    }
700}
701
702// This implementation block requires the scopes have the same timestamp as the trace manager.
703// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
704impl<'g, G, T> Context<Child<'g, G, T>>
705where
706    G: Scope<Timestamp = mz_repr::Timestamp>,
707    T: RenderTimestamp,
708{
709    pub(crate) fn export_index_iterative(
710        &self,
711        compute_state: &mut ComputeState,
712        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
713        dependency_ids: BTreeSet<GlobalId>,
714        idx_id: GlobalId,
715        idx: &IndexDesc,
716        output_probe: &MzProbeHandle<G::Timestamp>,
717    ) {
718        // put together tokens that belong to the export
719        let mut needed_tokens = Vec::new();
720        for dep_id in dependency_ids {
721            if let Some(token) = tokens.get(&dep_id) {
722                needed_tokens.push(Rc::clone(token));
723            }
724        }
725        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
726            panic!(
727                "Arrangement alarmingly absent! id: {:?}",
728                Id::Global(idx_id)
729            )
730        });
731
732        match bundle.arrangement(&idx.key) {
733            Some(ArrangementFlavor::Local(oks, errs)) => {
734                // TODO: The following as_collection/leave/arrange sequence could be optimized.
735                //   * Combine as_collection and leave into a single function.
736                //   * Use columnar to extract columns from the batches to implement leave.
737                let mut oks = oks
738                    .as_collection(|k, v| (k.to_row(), v.to_row()))
739                    .leave()
740                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
741                    "Arrange export iterative",
742                );
743
744                let mut errs = errs
745                    .as_collection(|k, v| (k.clone(), v.clone()))
746                    .leave()
747                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
748                        "Arrange export iterative err",
749                    );
750
751                // Ensure that the frontier does not advance past the expiration time, if set.
752                // Otherwise, we might write down incorrect data.
753                if let Some(&expiration) = self.dataflow_expiration.as_option() {
754                    let token = Rc::new(());
755                    let shutdown_token = Rc::downgrade(&token);
756                    oks.stream = oks.stream.expire_stream_at(
757                        &format!("{}_export_index_iterative_oks", self.debug_name),
758                        expiration,
759                        Weak::clone(&shutdown_token),
760                    );
761                    errs.stream = errs.stream.expire_stream_at(
762                        &format!("{}_export_index_iterative_err", self.debug_name),
763                        expiration,
764                        shutdown_token,
765                    );
766                    needed_tokens.push(token);
767                }
768
769                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
770
771                // Attach logging of dataflow errors.
772                if let Some(logger) = compute_state.compute_logger.clone() {
773                    errs.stream.log_dataflow_errors(logger, idx_id);
774                }
775
776                compute_state.traces.set(
777                    idx_id,
778                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
779                );
780            }
781            Some(ArrangementFlavor::Trace(gid, _, _)) => {
782                // Duplicate of existing arrangement with id `gid`, so
783                // just create another handle to that arrangement.
784                let trace = compute_state.traces.get(&gid).unwrap().clone();
785                compute_state.traces.set(idx_id, trace);
786            }
787            None => {
788                println!("collection available: {:?}", bundle.collection.is_none());
789                println!(
790                    "keys available: {:?}",
791                    bundle.arranged.keys().collect::<Vec<_>>()
792                );
793                panic!(
794                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
795                    Id::Global(idx_id),
796                    &idx.key
797                );
798            }
799        };
800    }
801}
802
803/// Information about bindings, tracked in `render_recursive_plan` and
804/// `render_plan`, to be passed to `render_letfree_plan`.
805///
806/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
807/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
808enum BindingInfo {
809    Body { in_let: bool },
810    Let { id: LocalId, last: bool },
811    LetRec { id: LocalId, last: bool },
812}
813
814impl<G> Context<G>
815where
816    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
817{
818    /// Renders a plan to a differential dataflow, producing the collection of results.
819    ///
820    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
821    /// in the context of `level` pre-existing iteration coordinates.
822    ///
823    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
824    /// and establishing the appropriate recursive dependencies among the bound variables.
825    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
826    /// further [`RecBind`]s are found.
827    ///
828    /// The method requires that all variables conclude with a physical representation that
829    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
830    fn render_recursive_plan(
831        &mut self,
832        object_id: GlobalId,
833        level: usize,
834        plan: RenderPlan,
835        binding: BindingInfo,
836    ) -> CollectionBundle<G> {
837        for BindStage { lets, recs } in plan.binds {
838            // Render the let bindings in order.
839            let mut let_iter = lets.into_iter().peekable();
840            while let Some(LetBind { id, value }) = let_iter.next() {
841                let bundle =
842                    self.scope
843                        .clone()
844                        .region_named(&format!("Binding({:?})", id), |region| {
845                            let depends = value.depends();
846                            let last = let_iter.peek().is_none();
847                            let binding = BindingInfo::Let { id, last };
848                            self.enter_region(region, Some(&depends))
849                                .render_letfree_plan(object_id, value, binding)
850                                .leave_region()
851                        });
852                self.insert_id(Id::Local(id), bundle);
853            }
854
855            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
856
857            // Define variables for rec bindings.
858            // It is important that we only use the `SemigroupVariable` until the object is bound.
859            // At that point, all subsequent uses should have access to the object itself.
860            let mut variables = BTreeMap::new();
861            for id in rec_ids.iter() {
862                use differential_dataflow::dynamic::feedback_summary;
863                let inner = feedback_summary::<u64>(level + 1, 1);
864                let oks_v = SemigroupVariable::new(
865                    &mut self.scope,
866                    Product::new(Default::default(), inner.clone()),
867                );
868                let err_v = SemigroupVariable::new(
869                    &mut self.scope,
870                    Product::new(Default::default(), inner),
871                );
872
873                self.insert_id(
874                    Id::Local(*id),
875                    CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
876                );
877                variables.insert(Id::Local(*id), (oks_v, err_v));
878            }
879            // Now render each of the rec bindings.
880            let mut rec_iter = recs.into_iter().peekable();
881            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
882                let last = rec_iter.peek().is_none();
883                let binding = BindingInfo::LetRec { id, last };
884                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
885                // We need to ensure that the raw collection exists, but do not have enough information
886                // here to cause that to happen.
887                let (oks, mut err) = bundle.collection.clone().unwrap();
888                self.insert_id(Id::Local(id), bundle);
889                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
890
891                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
892                let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
893
894                if let Some(limit) = limit {
895                    // We swallow the results of the `max_iter`th iteration, because
896                    // these results would go into the `max_iter + 1`th iteration.
897                    let (in_limit, over_limit) =
898                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
899                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
900                            let iteration_index = *ps.get(level).unwrap_or(&0);
901                            // The pointstamp starts counting from 0, so we need to add 1.
902                            iteration_index + 1 >= limit.max_iters.into()
903                        });
904                    oks = Collection::new(in_limit);
905                    if !limit.return_at_limit {
906                        err = err.concat(&Collection::new(over_limit).map(move |_data| {
907                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
908                                format!("{}", limit.max_iters.get()).into(),
909                            )))
910                        }));
911                    }
912                }
913
914                // Set err variable to the distinct elements of `err`.
915                // Distinctness is important, as we otherwise might add the same error each iteration,
916                // say if the limit of `oks` has an error. This would result in non-terminating rather
917                // than a clean report of the error. The trade-off is that we lose information about
918                // multiplicities of errors, but .. this seems to be the better call.
919                let err: KeyCollection<_, _, _> = err.into();
920                let errs = err
921                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
922                        "Arrange recursive err",
923                    )
924                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
925                        "Distinct recursive err",
926                        move |_k, _s, t| t.push(((), Diff::ONE)),
927                    )
928                    .as_collection(|k, _| k.clone());
929
930                // Actively interrupt the data flow during shutdown, to ensure we won't keep
931                // iterating for long (or even forever).
932                let oks = render_shutdown_fuse(oks, self.shutdown_probe.clone());
933                let errs = render_shutdown_fuse(errs, self.shutdown_probe.clone());
934
935                oks_v.set(&oks);
936                err_v.set(&errs);
937            }
938            // Now extract each of the rec bindings into the outer scope.
939            for id in rec_ids.into_iter() {
940                let bundle = self.remove_id(Id::Local(id)).unwrap();
941                let (oks, err) = bundle.collection.unwrap();
942                self.insert_id(
943                    Id::Local(id),
944                    CollectionBundle::from_collections(
945                        oks.leave_dynamic(level + 1),
946                        err.leave_dynamic(level + 1),
947                    ),
948                );
949            }
950        }
951
952        self.render_letfree_plan(object_id, plan.body, binding)
953    }
954}
955
956impl<G> Context<G>
957where
958    G: Scope,
959    G::Timestamp: RenderTimestamp,
960{
961    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
962    /// results.
963    ///
964    /// The return type reflects the uncertainty about the data representation, perhaps
965    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
966    ///
967    /// # Panics
968    ///
969    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
970    /// `render_recursive_plan` instead.
971    fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
972        let mut in_let = false;
973        for BindStage { lets, recs } in plan.binds {
974            assert!(recs.is_empty());
975
976            let mut let_iter = lets.into_iter().peekable();
977            while let Some(LetBind { id, value }) = let_iter.next() {
978                // if we encounter a single let, the body is in a let
979                in_let = true;
980                let bundle =
981                    self.scope
982                        .clone()
983                        .region_named(&format!("Binding({:?})", id), |region| {
984                            let depends = value.depends();
985                            let last = let_iter.peek().is_none();
986                            let binding = BindingInfo::Let { id, last };
987                            self.enter_region(region, Some(&depends))
988                                .render_letfree_plan(object_id, value, binding)
989                                .leave_region()
990                        });
991                self.insert_id(Id::Local(id), bundle);
992            }
993        }
994
995        self.scope.clone().region_named("Main Body", |region| {
996            let depends = plan.body.depends();
997            self.enter_region(region, Some(&depends))
998                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
999                .leave_region()
1000        })
1001    }
1002
1003    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1004    fn render_letfree_plan(
1005        &mut self,
1006        object_id: GlobalId,
1007        plan: LetFreePlan,
1008        binding: BindingInfo,
1009    ) -> CollectionBundle<G> {
1010        let (mut nodes, root_id, topological_order) = plan.destruct();
1011
1012        // Rendered collections by their `LirId`.
1013        let mut collections = BTreeMap::new();
1014
1015        // Mappings to send along.
1016        // To save overhead, we'll only compute mappings when we need to,
1017        // which means things get gated behind options. Unfortunately, that means we
1018        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1019        // but there's no convenient way to express the invariant.
1020        let should_compute_lir_metadata = self.compute_logger.is_some();
1021        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1022            Some(Vec::with_capacity(nodes.len()))
1023        } else {
1024            None
1025        };
1026
1027        let mut topo_iter = topological_order.into_iter().peekable();
1028        while let Some(lir_id) = topo_iter.next() {
1029            let node = nodes.remove(&lir_id).unwrap();
1030
1031            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1032            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1033            // in some other structure and have that structure impl ExprHumanizer
1034            let metadata = if should_compute_lir_metadata {
1035                let operator = node.expr.humanize(&DummyHumanizer);
1036
1037                // mark the last operator in topo order with any binding decoration
1038                let operator = if topo_iter.peek().is_none() {
1039                    match &binding {
1040                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1041                        BindingInfo::Body { in_let: false } => operator,
1042                        BindingInfo::Let { id, last: true } => {
1043                            format!("With {id} = {operator}")
1044                        }
1045                        BindingInfo::Let { id, last: false } => {
1046                            format!("{id} = {operator}")
1047                        }
1048                        BindingInfo::LetRec { id, last: true } => {
1049                            format!("With Recursive {id} = {operator}")
1050                        }
1051                        BindingInfo::LetRec { id, last: false } => {
1052                            format!("{id} = {operator}")
1053                        }
1054                    }
1055                } else {
1056                    operator
1057                };
1058
1059                let operator_id_start = self.scope.peek_identifier();
1060                Some((operator, operator_id_start))
1061            } else {
1062                None
1063            };
1064
1065            let mut bundle = self.render_plan_expr(node.expr, &collections);
1066
1067            if let Some((operator, operator_id_start)) = metadata {
1068                let operator_id_end = self.scope.peek_identifier();
1069                let operator_span = (operator_id_start, operator_id_end);
1070
1071                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1072                    lir_mapping_metadata.push((
1073                        lir_id,
1074                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1075                    ))
1076                }
1077            }
1078
1079            self.log_operator_hydration(&mut bundle, lir_id);
1080
1081            collections.insert(lir_id, bundle);
1082        }
1083
1084        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1085            self.log_lir_mapping(object_id, lir_mapping_metadata);
1086        }
1087
1088        collections
1089            .remove(&root_id)
1090            .expect("LetFreePlan invariant (1)")
1091    }
1092
1093    /// Renders a [`render_plan::Expr`], producing the collection of results.
1094    ///
1095    /// # Panics
1096    ///
1097    /// Panics if any of the expr's inputs is not found in `collections`.
1098    /// Callers must ensure that input nodes have been rendered previously.
1099    fn render_plan_expr(
1100        &mut self,
1101        expr: render_plan::Expr,
1102        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1103    ) -> CollectionBundle<G> {
1104        use render_plan::Expr::*;
1105
1106        let expect_input = |id| {
1107            collections
1108                .get(&id)
1109                .cloned()
1110                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1111        };
1112
1113        match expr {
1114            Constant { rows } => {
1115                // Produce both rows and errs to avoid conditional dataflow construction.
1116                let (rows, errs) = match rows {
1117                    Ok(rows) => (rows, Vec::new()),
1118                    Err(e) => (Vec::new(), vec![e]),
1119                };
1120
1121                // We should advance times in constant collections to start from `as_of`.
1122                let as_of_frontier = self.as_of_frontier.clone();
1123                let until = self.until.clone();
1124                let ok_collection = rows
1125                    .into_iter()
1126                    .filter_map(move |(row, mut time, diff)| {
1127                        time.advance_by(as_of_frontier.borrow());
1128                        if !until.less_equal(&time) {
1129                            Some((
1130                                row,
1131                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1132                                diff,
1133                            ))
1134                        } else {
1135                            None
1136                        }
1137                    })
1138                    .to_stream(&mut self.scope)
1139                    .as_collection();
1140
1141                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1142                error_time.advance_by(self.as_of_frontier.borrow());
1143                let err_collection = errs
1144                    .into_iter()
1145                    .map(move |e| {
1146                        (
1147                            DataflowError::from(e),
1148                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1149                            Diff::ONE,
1150                        )
1151                    })
1152                    .to_stream(&mut self.scope)
1153                    .as_collection();
1154
1155                CollectionBundle::from_collections(ok_collection, err_collection)
1156            }
1157            Get { id, keys, plan } => {
1158                // Recover the collection from `self` and then apply `mfp` to it.
1159                // If `mfp` happens to be trivial, we can just return the collection.
1160                let mut collection = self
1161                    .lookup_id(id)
1162                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1163                match plan {
1164                    mz_compute_types::plan::GetPlan::PassArrangements => {
1165                        // Assert that each of `keys` are present in `collection`.
1166                        assert!(
1167                            keys.arranged
1168                                .iter()
1169                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1170                        );
1171                        assert!(keys.raw <= collection.collection.is_some());
1172                        // Retain only those keys we want to import.
1173                        collection.arranged.retain(|key, _value| {
1174                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1175                        });
1176                        collection
1177                    }
1178                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1179                        let (oks, errs) = collection.as_collection_core(
1180                            mfp,
1181                            Some((key, row)),
1182                            self.until.clone(),
1183                            &self.config_set,
1184                        );
1185                        CollectionBundle::from_collections(oks, errs)
1186                    }
1187                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1188                        let (oks, errs) = collection.as_collection_core(
1189                            mfp,
1190                            None,
1191                            self.until.clone(),
1192                            &self.config_set,
1193                        );
1194                        CollectionBundle::from_collections(oks, errs)
1195                    }
1196                }
1197            }
1198            Mfp {
1199                input,
1200                mfp,
1201                input_key_val,
1202            } => {
1203                let input = expect_input(input);
1204                // If `mfp` is non-trivial, we should apply it and produce a collection.
1205                if mfp.is_identity() {
1206                    input
1207                } else {
1208                    let (oks, errs) = input.as_collection_core(
1209                        mfp,
1210                        input_key_val,
1211                        self.until.clone(),
1212                        &self.config_set,
1213                    );
1214                    CollectionBundle::from_collections(oks, errs)
1215                }
1216            }
1217            FlatMap {
1218                input_key,
1219                input,
1220                exprs,
1221                func,
1222                mfp_after: mfp,
1223            } => {
1224                let input = expect_input(input);
1225                self.render_flat_map(input_key, input, exprs, func, mfp)
1226            }
1227            Join { inputs, plan } => {
1228                let inputs = inputs.into_iter().map(expect_input).collect();
1229                match plan {
1230                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1231                        self.render_join(inputs, linear_plan)
1232                    }
1233                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1234                        self.render_delta_join(inputs, delta_plan)
1235                    }
1236                }
1237            }
1238            Reduce {
1239                input_key,
1240                input,
1241                key_val_plan,
1242                plan,
1243                mfp_after,
1244            } => {
1245                let input = expect_input(input);
1246                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1247                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1248            }
1249            TopK { input, top_k_plan } => {
1250                let input = expect_input(input);
1251                self.render_topk(input, top_k_plan)
1252            }
1253            Negate { input } => {
1254                let input = expect_input(input);
1255                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1256                CollectionBundle::from_collections(oks.negate(), errs)
1257            }
1258            Threshold {
1259                input,
1260                threshold_plan,
1261            } => {
1262                let input = expect_input(input);
1263                self.render_threshold(input, threshold_plan)
1264            }
1265            Union {
1266                inputs,
1267                consolidate_output,
1268            } => {
1269                let mut oks = Vec::new();
1270                let mut errs = Vec::new();
1271                for input in inputs.into_iter() {
1272                    let (os, es) =
1273                        expect_input(input).as_specific_collection(None, &self.config_set);
1274                    oks.push(os);
1275                    errs.push(es);
1276                }
1277                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1278                if consolidate_output {
1279                    oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1280                }
1281                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1282                CollectionBundle::from_collections(oks, errs)
1283            }
1284            ArrangeBy {
1285                input_key,
1286                input,
1287                input_mfp,
1288                forms: keys,
1289            } => {
1290                let input = expect_input(input);
1291                input.ensure_collections(
1292                    keys,
1293                    input_key,
1294                    input_mfp,
1295                    self.until.clone(),
1296                    &self.config_set,
1297                )
1298            }
1299        }
1300    }
1301
1302    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1303        if let Some(logger) = &self.compute_logger {
1304            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1305                dataflow_index,
1306                global_id,
1307            }));
1308        }
1309    }
1310
1311    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1312        if let Some(logger) = &self.compute_logger {
1313            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1314        }
1315    }
1316
1317    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1318        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1319        // which we should attach the logging operator.
1320        //
1321        // We could attach to each collection and track the lower bound of output frontiers.
1322        // However, that would be of limited use because we expect all collections to hydrate at
1323        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1324        // frontier of the unarranged collection advance, it will perform all work necessary to
1325        // also advance its own frontier. We don't expect significant delays between frontier
1326        // advancements of the unarranged and arranged collections, so attaching the logging
1327        // operator to any one of them should produce accurate results.
1328        //
1329        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1330        // beneficial to attach the logging operator to one of the arranged representation to avoid
1331        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1332        // if we attached the logging operator to it, we would introduce a fork in its output
1333        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1334        // that the output streams of the arrangements don't yet feed into anything else, so
1335        // attaching a (pass-through) logging operator does not introduce a fork.
1336
1337        match bundle.arranged.values_mut().next() {
1338            Some(arrangement) => {
1339                use ArrangementFlavor::*;
1340
1341                match arrangement {
1342                    Local(a, _) => {
1343                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1344                    }
1345                    Trace(_, a, _) => {
1346                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1347                    }
1348                }
1349            }
1350            None => {
1351                let (oks, _) = bundle
1352                    .collection
1353                    .as_mut()
1354                    .expect("CollectionBundle invariant");
1355                let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1356                *oks = stream.as_collection();
1357            }
1358        }
1359    }
1360
1361    fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1362    where
1363        D: Clone + 'static,
1364    {
1365        let Some(logger) = self.compute_logger.clone() else {
1366            return stream.clone(); // hydration logging disabled
1367        };
1368
1369        let export_ids = self.export_ids.clone();
1370
1371        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1372        //
1373        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1374        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1375        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1376        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1377        // instead.
1378        let mut hydration_frontier = Antichain::new();
1379        for time in self.as_of_frontier.iter() {
1380            if let Some(time) = time.try_step_forward() {
1381                hydration_frontier.insert(Refines::to_inner(time));
1382            }
1383        }
1384
1385        let name = format!("LogOperatorHydration ({lir_id})");
1386        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1387            let mut hydrated = false;
1388
1389            for &export_id in &export_ids {
1390                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1391                    export_id,
1392                    lir_id,
1393                    hydrated,
1394                }));
1395            }
1396
1397            move |input, output| {
1398                // Pass through inputs.
1399                input.for_each(|cap, data| {
1400                    output.session(&cap).give_container(data);
1401                });
1402
1403                if hydrated {
1404                    return;
1405                }
1406
1407                let frontier = input.frontier().frontier();
1408                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier) {
1409                    hydrated = true;
1410
1411                    for &export_id in &export_ids {
1412                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1413                            export_id,
1414                            lir_id,
1415                            hydrated,
1416                        }));
1417                    }
1418                }
1419            }
1420        })
1421    }
1422}
1423
1424#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1425/// A timestamp type that can be used for operations within MZ's dataflow layer.
1426pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1427    /// The system timestamp component of the timestamp.
1428    ///
1429    /// This is useful for manipulating the system time, as when delaying
1430    /// updates for subsequent cancellation, as with monotonic reduction.
1431    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1432    /// Effects a system delay in terms of the timestamp summary.
1433    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1434    /// The event timestamp component of the timestamp.
1435    fn event_time(&self) -> mz_repr::Timestamp;
1436    /// The event timestamp component of the timestamp, as a mutable reference.
1437    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1438    /// Effects an event delay in terms of the timestamp summary.
1439    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1440    /// Steps the timestamp back so that logical compaction to the output will
1441    /// not conflate `self` with any historical times.
1442    fn step_back(&self) -> Self;
1443}
1444
1445impl RenderTimestamp for mz_repr::Timestamp {
1446    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1447        self
1448    }
1449    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1450        delay
1451    }
1452    fn event_time(&self) -> mz_repr::Timestamp {
1453        *self
1454    }
1455    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1456        self
1457    }
1458    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1459        delay
1460    }
1461    fn step_back(&self) -> Self {
1462        self.saturating_sub(1)
1463    }
1464}
1465
1466impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1467    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1468        &mut self.outer
1469    }
1470    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1471        Product::new(delay, Default::default())
1472    }
1473    fn event_time(&self) -> mz_repr::Timestamp {
1474        self.outer
1475    }
1476    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1477        &mut self.outer
1478    }
1479    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1480        Product::new(delay, Default::default())
1481    }
1482    fn step_back(&self) -> Self {
1483        // It is necessary to step back both coordinates of a product,
1484        // and when one is a `PointStamp` that also means all coordinates
1485        // of the pointstamp.
1486        let inner = self.inner.clone();
1487        let mut vec = inner.into_vec();
1488        for item in vec.iter_mut() {
1489            *item = item.saturating_sub(1);
1490        }
1491        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1492    }
1493}
1494
1495/// A signal that can be awaited by operators to suspend them prior to startup.
1496///
1497/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1498///
1499/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1500///
1501///  * Async operators can simply `await` it.
1502///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1503///    and then check `StartSignal::has_fired()` on each activation.
1504#[derive(Clone)]
1505pub(crate) struct StartSignal {
1506    /// A future that completes when the signal fires.
1507    ///
1508    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1509    /// signal is activated by dropping the corresponding `Sender`.
1510    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1511    /// A weak reference to the token, to register drop-on-fire values.
1512    token_ref: Weak<RefCell<Box<dyn Any>>>,
1513}
1514
1515impl StartSignal {
1516    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1517    /// dropped.
1518    pub fn new() -> (Self, Rc<dyn Any>) {
1519        let (tx, rx) = oneshot::channel::<Infallible>();
1520        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1521        let signal = Self {
1522            fut: rx.shared(),
1523            token_ref: Rc::downgrade(&token),
1524        };
1525        (signal, token)
1526    }
1527
1528    pub fn has_fired(&self) -> bool {
1529        self.token_ref.strong_count() == 0
1530    }
1531
1532    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1533        if let Some(token) = self.token_ref.upgrade() {
1534            let mut token = token.borrow_mut();
1535            let inner = std::mem::replace(&mut *token, Box::new(()));
1536            *token = Box::new((inner, to_drop));
1537        }
1538    }
1539}
1540
1541impl Future for StartSignal {
1542    type Output = ();
1543
1544    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1545        self.fut.poll_unpin(cx).map(|_| ())
1546    }
1547}
1548
1549/// Extension trait to attach a `StartSignal` to operator outputs.
1550pub(crate) trait WithStartSignal {
1551    /// Delays data and progress updates until the start signal has fired.
1552    ///
1553    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1554    /// depending on the amount and shape of its inputs.
1555    fn with_start_signal(self, signal: StartSignal) -> Self;
1556}
1557
1558impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1559where
1560    S: Scope,
1561    S::Timestamp: RenderTimestamp,
1562    Tr: TraceReader + Clone,
1563{
1564    fn with_start_signal(self, signal: StartSignal) -> Self {
1565        Arranged {
1566            stream: self.stream.with_start_signal(signal),
1567            trace: self.trace,
1568        }
1569    }
1570}
1571
1572impl<S, D> WithStartSignal for Stream<S, D>
1573where
1574    S: Scope,
1575    D: timely::Data,
1576{
1577    fn with_start_signal(self, signal: StartSignal) -> Self {
1578        self.unary(Pipeline, "StartSignal", |_cap, info| {
1579            let token = Box::new(ActivateOnDrop::new(
1580                (),
1581                info.address,
1582                self.scope().activations(),
1583            ));
1584            signal.drop_on_fire(token);
1585
1586            let mut stash = Vec::new();
1587
1588            move |input, output| {
1589                // Stash incoming updates as long as the start signal has not fired.
1590                if !signal.has_fired() {
1591                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1592                    return;
1593                }
1594
1595                // Release any data we might still have stashed.
1596                for (cap, mut data) in std::mem::take(&mut stash) {
1597                    output.session(&cap).give_container(&mut data);
1598                }
1599
1600                // Pass through all remaining input data.
1601                input.for_each(|cap, data| {
1602                    output.session(&cap).give_container(data);
1603                });
1604            }
1605        })
1606    }
1607}
1608
1609/// Wraps the provided `Collection` with an operator that passes through all received inputs as
1610/// long as the provided `ShutdownProbe` does not announce dataflow shutdown. Once the token
1611/// dataflow is shutting down, all data flowing into the operator is dropped.
1612fn render_shutdown_fuse<G, D>(
1613    collection: Collection<G, D, Diff>,
1614    probe: ShutdownProbe,
1615) -> Collection<G, D, Diff>
1616where
1617    G: Scope,
1618    D: Data,
1619{
1620    let stream = collection.inner;
1621    let stream = stream.unary(Pipeline, "ShutdownFuse", move |_cap, _info| {
1622        move |input, output| {
1623            input.for_each(|cap, data| {
1624                if !probe.in_shutdown() {
1625                    output.session(&cap).give_container(data);
1626                }
1627            });
1628        }
1629    });
1630    stream.as_collection()
1631}
1632
1633/// Suppress progress messages for times before the given `as_of`.
1634///
1635/// This operator exists specifically to work around a memory spike we'd otherwise see when
1636/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1637/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1638/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1639/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1640/// the snapshot batch.
1641///
1642/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1643/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1644/// beyond the `as_of`, so the snapshot data has already been collected.
1645///
1646/// To ensure this, this operator needs to take two measures:
1647///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1648///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1649///
1650/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1651/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1652/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1653/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1654fn suppress_early_progress<G, D>(
1655    stream: Stream<G, D>,
1656    as_of: Antichain<G::Timestamp>,
1657) -> Stream<G, D>
1658where
1659    G: Scope,
1660    D: Data,
1661{
1662    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1663        let mut early_cap = Some(default_cap);
1664
1665        move |input, output| {
1666            input.for_each(|data_cap, data| {
1667                let mut session = if as_of.less_than(data_cap.time()) {
1668                    output.session(&data_cap)
1669                } else {
1670                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1671                    output.session(cap)
1672                };
1673                session.give_container(data);
1674            });
1675
1676            let frontier = input.frontier().frontier();
1677            if !PartialOrder::less_equal(&frontier, &as_of.borrow()) {
1678                early_cap.take();
1679            }
1680        }
1681    })
1682}
1683
1684/// Extension trait for [`StreamCore`] to selectively limit progress.
1685trait LimitProgress<T: Timestamp> {
1686    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1687    /// the implementation to observe times in data, and release capabilities based on the probe's
1688    /// frontier, after applying `slack` to round up timestamps.
1689    ///
1690    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1691    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1692    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1693    /// for times before the `upper`, which is the time until which the source can distinguish
1694    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1695    /// our capability.
1696    ///
1697    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1698    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1699    ///
1700    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1701    /// allow downstream operators to release resources.
1702    ///
1703    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1704    /// avoid unbounded memory usage.
1705    ///
1706    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1707    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1708    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1709    /// * `name` is a human-readable name for the operator.
1710    /// * `limit` is the maximum number of pending times to keep around.
1711    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1712    ///   retain a capability.
1713    ///
1714    /// Returns a shutdown button and the stream with the progress limiting applied.
1715    fn limit_progress(
1716        &self,
1717        handle: MzProbeHandle<T>,
1718        slack_ms: u64,
1719        limit: Option<usize>,
1720        upper: Antichain<T>,
1721        name: String,
1722    ) -> (Rc<dyn Any>, Self);
1723}
1724
1725// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1726// number.
1727impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1728where
1729    G: Scope<Timestamp = mz_repr::Timestamp>,
1730    D: timely::Data,
1731    R: timely::Data,
1732{
1733    fn limit_progress(
1734        &self,
1735        handle: MzProbeHandle<mz_repr::Timestamp>,
1736        slack_ms: u64,
1737        limit: Option<usize>,
1738        upper: Antichain<mz_repr::Timestamp>,
1739        name: String,
1740    ) -> (Rc<dyn Any>, Self) {
1741        let mut button = None;
1742
1743        let stream =
1744            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1745                // Times that we've observed on our input.
1746                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1747                // Capability for the lower bound of `pending_times`, if any.
1748                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1749
1750                let activator = self.scope().activator_for(info.address);
1751                handle.activate(activator.clone());
1752
1753                let shutdown = Rc::new(());
1754                button = Some(ShutdownButton::new(
1755                    Rc::new(RefCell::new(Some(Rc::clone(&shutdown)))),
1756                    activator,
1757                ));
1758                let shutdown = Rc::downgrade(&shutdown);
1759
1760                move |input, output| {
1761                    // We've been shut down, release all resources and consume inputs.
1762                    if shutdown.strong_count() == 0 {
1763                        retained_cap = None;
1764                        pending_times.clear();
1765                        while let Some(_) = input.next() {}
1766                        return;
1767                    }
1768
1769                    while let Some((cap, data)) = input.next() {
1770                        for time in data
1771                            .iter()
1772                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1773                        {
1774                            let rounded_time =
1775                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1776                            if !upper.less_than(&rounded_time.into()) {
1777                                pending_times.insert(rounded_time.into());
1778                            }
1779                        }
1780                        output.session(&cap).give_container(data);
1781                        if retained_cap.as_ref().is_none_or(|c| {
1782                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1783                        }) {
1784                            retained_cap = Some(cap.retain());
1785                        }
1786                    }
1787
1788                    handle.with_frontier(|f| {
1789                        while pending_times
1790                            .first()
1791                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1792                        {
1793                            let _ = pending_times.pop_first();
1794                        }
1795                    });
1796
1797                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1798                        let _ = pending_times.pop_first();
1799                    }
1800
1801                    match (retained_cap.as_mut(), pending_times.first()) {
1802                        (Some(cap), Some(first)) => cap.downgrade(first),
1803                        (_, None) => retained_cap = None,
1804                        _ => {}
1805                    }
1806
1807                    if input.frontier.is_empty() {
1808                        retained_cap = None;
1809                        pending_times.clear();
1810                    }
1811
1812                    if !pending_times.is_empty() {
1813                        tracing::debug!(
1814                            name,
1815                            info.global_id,
1816                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1817                            frontier = ?input.frontier.frontier().get(0),
1818                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1819                            ?upper,
1820                            "pending times",
1821                        );
1822                    }
1823                }
1824            });
1825        (Rc::new(button.unwrap().press_on_drop()), stream)
1826    }
1827}
1828
1829/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1830/// the difference between timestamps.
1831struct PendingTimesDisplay<T>(T);
1832
1833impl<T> std::fmt::Display for PendingTimesDisplay<T>
1834where
1835    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1836{
1837    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1838        let mut iter = self.0.clone().into_iter();
1839        write!(f, "[")?;
1840        if let Some(first) = iter.next() {
1841            write!(f, "{}", first)?;
1842            let mut last = u64::from(first);
1843            for time in iter {
1844                write!(f, ", +{}", u64::from(time) - last)?;
1845                last = u64::from(time);
1846            }
1847        }
1848        write!(f, "]")?;
1849        Ok(())
1850    }
1851}
1852
1853/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1854/// into two rows, given the arity of the first component.
1855#[derive(Clone, Copy, Debug)]
1856struct Pairer {
1857    split_arity: usize,
1858}
1859
1860impl Pairer {
1861    /// Creates a pairer with knowledge of the arity of first component in the pair.
1862    fn new(split_arity: usize) -> Self {
1863        Self { split_arity }
1864    }
1865
1866    /// Merges a pair of datum iterators creating a `Row` instance.
1867    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1868    where
1869        I1: IntoIterator<Item = Datum<'a>>,
1870        I2: IntoIterator<Item = Datum<'a>>,
1871    {
1872        SharedRow::pack(first.into_iter().chain(second))
1873    }
1874
1875    /// Splits a datum iterator into a pair of `Row` instances.
1876    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1877        let mut datum_iter = datum_iter.into_iter();
1878        let mut row_builder = SharedRow::get();
1879        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1880        let second = row_builder.pack_using(datum_iter);
1881        (first, second)
1882    }
1883}