mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::{Arranged, ShutdownButton};
116use differential_dataflow::trace::TraceReader;
117use differential_dataflow::{AsCollection, Collection, Data};
118use futures::FutureExt;
119use futures::channel::oneshot;
120use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
121use mz_compute_types::dyncfgs::{
122    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
123    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
124    ENABLE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
125};
126use mz_compute_types::plan::LirId;
127use mz_compute_types::plan::render_plan::{
128    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
129};
130use mz_expr::{EvalError, Id, LocalId};
131use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
132use mz_repr::explain::DummyHumanizer;
133use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
134use mz_storage_operators::persist_source;
135use mz_storage_types::controller::CollectionMetadata;
136use mz_storage_types::errors::DataflowError;
137use mz_timely_util::operator::{CollectionExt, StreamExt};
138use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
139use timely::PartialOrder;
140use timely::communication::Allocate;
141use timely::container::CapacityContainerBuilder;
142use timely::dataflow::channels::pact::Pipeline;
143use timely::dataflow::operators::to_stream::ToStream;
144use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
145use timely::dataflow::scopes::Child;
146use timely::dataflow::{Scope, Stream, StreamCore};
147use timely::order::Product;
148use timely::progress::timestamp::Refines;
149use timely::progress::{Antichain, Timestamp};
150use timely::scheduling::ActivateOnDrop;
151use timely::worker::{AsWorker, Worker as TimelyWorker};
152
153use crate::arrangement::manager::TraceBundle;
154use crate::compute_state::ComputeState;
155use crate::extensions::arrange::{KeyCollection, MzArrange};
156use crate::extensions::reduce::MzReduce;
157use crate::extensions::temporal_bucket::TemporalBucketing;
158use crate::logging::compute::{
159    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors,
160};
161use crate::render::context::{ArrangementFlavor, Context, ShutdownProbe, shutdown_token};
162use crate::render::continual_task::ContinualTaskCtx;
163use crate::row_spine::{RowRowBatcher, RowRowBuilder};
164use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
165
166pub mod context;
167pub(crate) mod continual_task;
168mod errors;
169mod flat_map;
170mod join;
171mod reduce;
172pub mod sinks;
173mod threshold;
174mod top_k;
175
176pub use context::CollectionBundle;
177pub use join::LinearJoinSpec;
178
179/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
180///
181/// This method imports sources from provided assets, and then builds the remaining
182/// dataflow using "compute-local" assets like shared arrangements, and producing
183/// both arrangements and sinks.
184pub fn build_compute_dataflow<A: Allocate>(
185    timely_worker: &mut TimelyWorker<A>,
186    compute_state: &mut ComputeState,
187    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
188    start_signal: StartSignal,
189    until: Antichain<mz_repr::Timestamp>,
190    dataflow_expiration: Antichain<mz_repr::Timestamp>,
191) {
192    // Mutually recursive view definitions require special handling.
193    let recursive = dataflow
194        .objects_to_build
195        .iter()
196        .any(|object| object.plan.is_recursive());
197
198    // Determine indexes to export, and their dependencies.
199    let indexes = dataflow
200        .index_exports
201        .iter()
202        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
203        .collect::<Vec<_>>();
204
205    // Determine sinks to export, and their dependencies.
206    let sinks = dataflow
207        .sink_exports
208        .iter()
209        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
210        .collect::<Vec<_>>();
211
212    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
213    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
214
215    let name = format!("Dataflow: {}", &dataflow.debug_name);
216    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
217    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
218
219    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
220        // TODO(ct3): This should be a config of the source instead, but at least try
221        // to contain the hacks.
222        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
223
224        // The scope.clone() occurs to allow import in the region.
225        // We build a region here to establish a pattern of a scope inside the dataflow,
226        // so that other similar uses (e.g. with iterative scopes) do not require weird
227        // alternate type signatures.
228        let mut imported_sources = Vec::new();
229        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
230        let output_probe = MzProbeHandle::default();
231
232        scope.clone().region_named(&input_name, |region| {
233            // Import declared sources into the rendering context.
234            for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
235                region.region_named(&format!("Source({:?})", source_id), |inner| {
236                    let mut read_schema = None;
237                    let mut mfp = source.arguments.operators.clone().map(|mut ops| {
238                        // If enabled, we read from Persist with a `RelationDesc` that
239                        // omits uneeded columns.
240                        if apply_demands {
241                            let demands = ops.demand();
242                            let new_desc =
243                                source.storage_metadata.relation_desc.apply_demand(&demands);
244                            let new_arity = demands.len();
245                            let remap: BTreeMap<_, _> = demands
246                                .into_iter()
247                                .enumerate()
248                                .map(|(new, old)| (old, new))
249                                .collect();
250                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
251                            read_schema = Some(new_desc);
252                        }
253
254                        mz_expr::MfpPlan::create_from(ops)
255                            .expect("Linear operators should always be valid")
256                    });
257
258                    let mut snapshot_mode = SnapshotMode::Include;
259                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
260                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
261                    if let Some(x) = ct_source_transformer.as_ref() {
262                        snapshot_mode = x.snapshot_mode();
263                        suppress_early_progress_as_of = suppress_early_progress_as_of
264                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
265                    }
266
267                    // Note: For correctness, we require that sources only emit times advanced by
268                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
269                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
270                        inner,
271                        *source_id,
272                        Arc::clone(&compute_state.persist_clients),
273                        &compute_state.txns_ctx,
274                        &compute_state.worker_config,
275                        source.storage_metadata.clone(),
276                        read_schema,
277                        dataflow.as_of.clone(),
278                        snapshot_mode,
279                        until.clone(),
280                        mfp.as_mut(),
281                        compute_state.dataflow_max_inflight_bytes(),
282                        start_signal.clone(),
283                        ErrorHandler::Halt("compute_import"),
284                    );
285
286                    let mut source_tokens: Vec<Rc<dyn Any>> = vec![Rc::new(token)];
287
288                    // If `mfp` is non-identity, we need to apply what remains.
289                    // For the moment, assert that it is either trivial or `None`.
290                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
291
292                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
293                    // ensure that the first frontier we report into the dataflow is beyond the
294                    // `as_of`.
295                    if let Some(as_of) = suppress_early_progress_as_of {
296                        ok_stream = suppress_early_progress(ok_stream, as_of);
297                    }
298
299                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
300                        // Apply logical backpressure to the source.
301                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
302                            .get(&compute_state.worker_config);
303                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
304                            .get(&compute_state.worker_config)
305                            .as_millis()
306                            .try_into()
307                            .expect("must fit");
308
309                        let (token, stream) = ok_stream.limit_progress(
310                            output_probe.clone(),
311                            slack,
312                            limit,
313                            upper.clone(),
314                            name.clone(),
315                        );
316                        ok_stream = stream;
317                        source_tokens.push(token);
318                    }
319
320                    // Attach a probe reporting the input frontier.
321                    let input_probe =
322                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
323                    ok_stream = ok_stream.probe_with(&input_probe);
324
325                    // The `suppress_early_progress` operator and the input
326                    // probe both want to work on the untransformed ct input,
327                    // make sure this stays after them.
328                    let (ok_stream, err_stream) = match ct_source_transformer {
329                        None => (ok_stream, err_stream),
330                        Some(ct_source_transformer) => {
331                            let (oks, errs, ct_times) = ct_source_transformer
332                                .transform(ok_stream.as_collection(), err_stream.as_collection());
333                            // TODO(ct3): Ideally this would be encapsulated by
334                            // ContinualTaskCtx, but the types are tricky.
335                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
336                            (oks.inner, errs.inner)
337                        }
338                    };
339
340                    let (oks, errs) = (
341                        ok_stream.as_collection().leave_region().leave_region(),
342                        err_stream.as_collection().leave_region().leave_region(),
343                    );
344
345                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
346
347                    // Associate returned tokens with the source identifier.
348                    tokens.insert(*source_id, Rc::new(source_tokens));
349                });
350            }
351        });
352
353        // If there exists a recursive expression, we'll need to use a non-region scope,
354        // in order to support additional timestamp coordinates for iteration.
355        if recursive {
356            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
357                let mut context = Context::for_dataflow_in(
358                    &dataflow,
359                    region.clone(),
360                    compute_state,
361                    until,
362                    dataflow_expiration,
363                );
364
365                for (id, (oks, errs)) in imported_sources.into_iter() {
366                    let bundle = crate::render::CollectionBundle::from_collections(
367                        oks.enter(region),
368                        errs.enter(region),
369                    );
370                    // Associate collection bundle with the source identifier.
371                    context.insert_id(id, bundle);
372                }
373
374                // Import declared indexes into the rendering context.
375                for (idx_id, idx) in &dataflow.index_imports {
376                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
377                    context.import_index(
378                        compute_state,
379                        &mut tokens,
380                        input_probe,
381                        *idx_id,
382                        &idx.desc,
383                        start_signal.clone(),
384                    );
385                }
386
387                // Build declared objects.
388                for object in dataflow.objects_to_build {
389                    let (probe, token) = shutdown_token(region);
390                    context.shutdown_probe = probe;
391                    tokens.insert(object.id, Rc::new(token));
392
393                    let bundle = context.scope.clone().region_named(
394                        &format!("BuildingObject({:?})", object.id),
395                        |region| {
396                            let depends = object.plan.depends();
397                            let in_let = object.plan.is_recursive();
398                            context
399                                .enter_region(region, Some(&depends))
400                                .render_recursive_plan(
401                                    object.id,
402                                    0,
403                                    object.plan,
404                                    // recursive plans _must_ have bodies in a let
405                                    BindingInfo::Body { in_let },
406                                )
407                                .leave_region()
408                        },
409                    );
410                    let global_id = object.id;
411
412                    context.log_dataflow_global_id(
413                        *bundle
414                            .scope()
415                            .addr()
416                            .first()
417                            .expect("Dataflow root id must exist"),
418                        global_id,
419                    );
420                    context.insert_id(Id::Global(object.id), bundle);
421                }
422
423                // Export declared indexes.
424                for (idx_id, dependencies, idx) in indexes {
425                    context.export_index_iterative(
426                        compute_state,
427                        &tokens,
428                        dependencies,
429                        idx_id,
430                        &idx,
431                        &output_probe,
432                    );
433                }
434
435                // Export declared sinks.
436                for (sink_id, dependencies, sink) in sinks {
437                    context.export_sink(
438                        compute_state,
439                        &tokens,
440                        dependencies,
441                        sink_id,
442                        &sink,
443                        start_signal.clone(),
444                        ct_ctx.input_times(&context.scope.parent),
445                        &output_probe,
446                    );
447                }
448            });
449        } else {
450            scope.clone().region_named(&build_name, |region| {
451                let mut context = Context::for_dataflow_in(
452                    &dataflow,
453                    region.clone(),
454                    compute_state,
455                    until,
456                    dataflow_expiration,
457                );
458
459                for (id, (oks, errs)) in imported_sources.into_iter() {
460                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
461                        let as_of = context.as_of_frontier.clone();
462                        let summary = TEMPORAL_BUCKETING_SUMMARY
463                            .get(&compute_state.worker_config)
464                            .try_into()
465                            .expect("must fit");
466                        oks.inner
467                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
468                            .as_collection()
469                    } else {
470                        oks
471                    };
472                    let bundle = crate::render::CollectionBundle::from_collections(
473                        oks.enter_region(region),
474                        errs.enter_region(region),
475                    );
476                    // Associate collection bundle with the source identifier.
477                    context.insert_id(id, bundle);
478                }
479
480                // Import declared indexes into the rendering context.
481                for (idx_id, idx) in &dataflow.index_imports {
482                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
483                    context.import_index(
484                        compute_state,
485                        &mut tokens,
486                        input_probe,
487                        *idx_id,
488                        &idx.desc,
489                        start_signal.clone(),
490                    );
491                }
492
493                // Build declared objects.
494                for object in dataflow.objects_to_build {
495                    let (probe, token) = shutdown_token(region);
496                    context.shutdown_probe = probe;
497                    tokens.insert(object.id, Rc::new(token));
498
499                    let bundle = context.scope.clone().region_named(
500                        &format!("BuildingObject({:?})", object.id),
501                        |region| {
502                            let depends = object.plan.depends();
503                            context
504                                .enter_region(region, Some(&depends))
505                                .render_plan(object.id, object.plan)
506                                .leave_region()
507                        },
508                    );
509                    let global_id = object.id;
510                    context.log_dataflow_global_id(
511                        *bundle
512                            .scope()
513                            .addr()
514                            .first()
515                            .expect("Dataflow root id must exist"),
516                        global_id,
517                    );
518                    context.insert_id(Id::Global(object.id), bundle);
519                }
520
521                // Export declared indexes.
522                for (idx_id, dependencies, idx) in indexes {
523                    context.export_index(
524                        compute_state,
525                        &tokens,
526                        dependencies,
527                        idx_id,
528                        &idx,
529                        &output_probe,
530                    );
531                }
532
533                // Export declared sinks.
534                for (sink_id, dependencies, sink) in sinks {
535                    context.export_sink(
536                        compute_state,
537                        &tokens,
538                        dependencies,
539                        sink_id,
540                        &sink,
541                        start_signal.clone(),
542                        ct_ctx.input_times(&context.scope.parent),
543                        &output_probe,
544                    );
545                }
546            });
547        }
548    })
549}
550
551// This implementation block allows child timestamps to vary from parent timestamps,
552// but requires the parent timestamp to be `repr::Timestamp`.
553impl<'g, G, T> Context<Child<'g, G, T>>
554where
555    G: Scope<Timestamp = mz_repr::Timestamp>,
556    T: Refines<G::Timestamp> + RenderTimestamp,
557{
558    pub(crate) fn import_index(
559        &mut self,
560        compute_state: &mut ComputeState,
561        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
562        input_probe: probe::Handle<mz_repr::Timestamp>,
563        idx_id: GlobalId,
564        idx: &IndexDesc,
565        start_signal: StartSignal,
566    ) {
567        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
568            assert!(
569                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
570                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
571            );
572
573            let token = traces.to_drop().clone();
574
575            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
576                &self.scope.parent,
577                &format!("Index({}, {:?})", idx.on_id, idx.key),
578                self.as_of_frontier.clone(),
579                self.until.clone(),
580            );
581
582            oks.stream = oks.stream.probe_with(&input_probe);
583
584            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
585                &self.scope.parent,
586                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
587                self.as_of_frontier.clone(),
588                self.until.clone(),
589            );
590
591            let ok_arranged = oks
592                .enter(&self.scope)
593                .with_start_signal(start_signal.clone());
594            let err_arranged = err_arranged
595                .enter(&self.scope)
596                .with_start_signal(start_signal);
597
598            self.update_id(
599                Id::Global(idx.on_id),
600                CollectionBundle::from_expressions(
601                    idx.key.clone(),
602                    ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
603                ),
604            );
605            tokens.insert(
606                idx_id,
607                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
608            );
609        } else {
610            panic!(
611                "import of index {} failed while building dataflow {}",
612                idx_id, self.dataflow_id
613            );
614        }
615    }
616}
617
618// This implementation block requires the scopes have the same timestamp as the trace manager.
619// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
620impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
621where
622    G: Scope<Timestamp = mz_repr::Timestamp>,
623{
624    pub(crate) fn export_index(
625        &self,
626        compute_state: &mut ComputeState,
627        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
628        dependency_ids: BTreeSet<GlobalId>,
629        idx_id: GlobalId,
630        idx: &IndexDesc,
631        output_probe: &MzProbeHandle<G::Timestamp>,
632    ) {
633        // put together tokens that belong to the export
634        let mut needed_tokens = Vec::new();
635        for dep_id in dependency_ids {
636            if let Some(token) = tokens.get(&dep_id) {
637                needed_tokens.push(Rc::clone(token));
638            }
639        }
640        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
641            panic!(
642                "Arrangement alarmingly absent! id: {:?}",
643                Id::Global(idx_id)
644            )
645        });
646
647        match bundle.arrangement(&idx.key) {
648            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
649                // Ensure that the frontier does not advance past the expiration time, if set.
650                // Otherwise, we might write down incorrect data.
651                if let Some(&expiration) = self.dataflow_expiration.as_option() {
652                    let token = Rc::new(());
653                    let shutdown_token = Rc::downgrade(&token);
654                    oks.stream = oks.stream.expire_stream_at(
655                        &format!("{}_export_index_oks", self.debug_name),
656                        expiration,
657                        Weak::clone(&shutdown_token),
658                    );
659                    errs.stream = errs.stream.expire_stream_at(
660                        &format!("{}_export_index_errs", self.debug_name),
661                        expiration,
662                        shutdown_token,
663                    );
664                    needed_tokens.push(token);
665                }
666
667                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
668
669                // Attach logging of dataflow errors.
670                if let Some(logger) = compute_state.compute_logger.clone() {
671                    errs.stream.log_dataflow_errors(logger, idx_id);
672                }
673
674                compute_state.traces.set(
675                    idx_id,
676                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
677                );
678            }
679            Some(ArrangementFlavor::Trace(gid, _, _)) => {
680                // Duplicate of existing arrangement with id `gid`, so
681                // just create another handle to that arrangement.
682                let trace = compute_state.traces.get(&gid).unwrap().clone();
683                compute_state.traces.set(idx_id, trace);
684            }
685            None => {
686                println!("collection available: {:?}", bundle.collection.is_none());
687                println!(
688                    "keys available: {:?}",
689                    bundle.arranged.keys().collect::<Vec<_>>()
690                );
691                panic!(
692                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
693                    Id::Global(idx_id),
694                    &idx.key
695                );
696            }
697        };
698    }
699}
700
701// This implementation block requires the scopes have the same timestamp as the trace manager.
702// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
703impl<'g, G, T> Context<Child<'g, G, T>>
704where
705    G: Scope<Timestamp = mz_repr::Timestamp>,
706    T: RenderTimestamp,
707{
708    pub(crate) fn export_index_iterative(
709        &self,
710        compute_state: &mut ComputeState,
711        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
712        dependency_ids: BTreeSet<GlobalId>,
713        idx_id: GlobalId,
714        idx: &IndexDesc,
715        output_probe: &MzProbeHandle<G::Timestamp>,
716    ) {
717        // put together tokens that belong to the export
718        let mut needed_tokens = Vec::new();
719        for dep_id in dependency_ids {
720            if let Some(token) = tokens.get(&dep_id) {
721                needed_tokens.push(Rc::clone(token));
722            }
723        }
724        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
725            panic!(
726                "Arrangement alarmingly absent! id: {:?}",
727                Id::Global(idx_id)
728            )
729        });
730
731        match bundle.arrangement(&idx.key) {
732            Some(ArrangementFlavor::Local(oks, errs)) => {
733                // TODO: The following as_collection/leave/arrange sequence could be optimized.
734                //   * Combine as_collection and leave into a single function.
735                //   * Use columnar to extract columns from the batches to implement leave.
736                let mut oks = oks
737                    .as_collection(|k, v| (k.to_row(), v.to_row()))
738                    .leave()
739                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
740                    "Arrange export iterative",
741                );
742
743                let mut errs = errs
744                    .as_collection(|k, v| (k.clone(), v.clone()))
745                    .leave()
746                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
747                        "Arrange export iterative err",
748                    );
749
750                // Ensure that the frontier does not advance past the expiration time, if set.
751                // Otherwise, we might write down incorrect data.
752                if let Some(&expiration) = self.dataflow_expiration.as_option() {
753                    let token = Rc::new(());
754                    let shutdown_token = Rc::downgrade(&token);
755                    oks.stream = oks.stream.expire_stream_at(
756                        &format!("{}_export_index_iterative_oks", self.debug_name),
757                        expiration,
758                        Weak::clone(&shutdown_token),
759                    );
760                    errs.stream = errs.stream.expire_stream_at(
761                        &format!("{}_export_index_iterative_err", self.debug_name),
762                        expiration,
763                        shutdown_token,
764                    );
765                    needed_tokens.push(token);
766                }
767
768                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
769
770                // Attach logging of dataflow errors.
771                if let Some(logger) = compute_state.compute_logger.clone() {
772                    errs.stream.log_dataflow_errors(logger, idx_id);
773                }
774
775                compute_state.traces.set(
776                    idx_id,
777                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
778                );
779            }
780            Some(ArrangementFlavor::Trace(gid, _, _)) => {
781                // Duplicate of existing arrangement with id `gid`, so
782                // just create another handle to that arrangement.
783                let trace = compute_state.traces.get(&gid).unwrap().clone();
784                compute_state.traces.set(idx_id, trace);
785            }
786            None => {
787                println!("collection available: {:?}", bundle.collection.is_none());
788                println!(
789                    "keys available: {:?}",
790                    bundle.arranged.keys().collect::<Vec<_>>()
791                );
792                panic!(
793                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
794                    Id::Global(idx_id),
795                    &idx.key
796                );
797            }
798        };
799    }
800}
801
802/// Information about bindings, tracked in `render_recursive_plan` and
803/// `render_plan`, to be passed to `render_letfree_plan`.
804///
805/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
806/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
807enum BindingInfo {
808    Body { in_let: bool },
809    Let { id: LocalId, last: bool },
810    LetRec { id: LocalId, last: bool },
811}
812
813impl<G> Context<G>
814where
815    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
816{
817    /// Renders a plan to a differential dataflow, producing the collection of results.
818    ///
819    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
820    /// in the context of `level` pre-existing iteration coordinates.
821    ///
822    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
823    /// and establishing the appropriate recursive dependencies among the bound variables.
824    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
825    /// further [`RecBind`]s are found.
826    ///
827    /// The method requires that all variables conclude with a physical representation that
828    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
829    fn render_recursive_plan(
830        &mut self,
831        object_id: GlobalId,
832        level: usize,
833        plan: RenderPlan,
834        binding: BindingInfo,
835    ) -> CollectionBundle<G> {
836        for BindStage { lets, recs } in plan.binds {
837            // Render the let bindings in order.
838            let mut let_iter = lets.into_iter().peekable();
839            while let Some(LetBind { id, value }) = let_iter.next() {
840                let bundle =
841                    self.scope
842                        .clone()
843                        .region_named(&format!("Binding({:?})", id), |region| {
844                            let depends = value.depends();
845                            let last = let_iter.peek().is_none();
846                            let binding = BindingInfo::Let { id, last };
847                            self.enter_region(region, Some(&depends))
848                                .render_letfree_plan(object_id, value, binding)
849                                .leave_region()
850                        });
851                self.insert_id(Id::Local(id), bundle);
852            }
853
854            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
855
856            // Define variables for rec bindings.
857            // It is important that we only use the `Variable` until the object is bound.
858            // At that point, all subsequent uses should have access to the object itself.
859            let mut variables = BTreeMap::new();
860            for id in rec_ids.iter() {
861                use differential_dataflow::dynamic::feedback_summary;
862                use differential_dataflow::operators::iterate::Variable;
863                let inner = feedback_summary::<u64>(level + 1, 1);
864                let oks_v = Variable::new(
865                    &mut self.scope,
866                    Product::new(Default::default(), inner.clone()),
867                );
868                let err_v = Variable::new(&mut self.scope, Product::new(Default::default(), inner));
869
870                self.insert_id(
871                    Id::Local(*id),
872                    CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
873                );
874                variables.insert(Id::Local(*id), (oks_v, err_v));
875            }
876            // Now render each of the rec bindings.
877            let mut rec_iter = recs.into_iter().peekable();
878            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
879                let last = rec_iter.peek().is_none();
880                let binding = BindingInfo::LetRec { id, last };
881                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
882                // We need to ensure that the raw collection exists, but do not have enough information
883                // here to cause that to happen.
884                let (oks, mut err) = bundle.collection.clone().unwrap();
885                self.insert_id(Id::Local(id), bundle);
886                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
887
888                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
889                let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
890
891                if let Some(limit) = limit {
892                    // We swallow the results of the `max_iter`th iteration, because
893                    // these results would go into the `max_iter + 1`th iteration.
894                    let (in_limit, over_limit) =
895                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
896                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
897                            let iteration_index = *ps.get(level).unwrap_or(&0);
898                            // The pointstamp starts counting from 0, so we need to add 1.
899                            iteration_index + 1 >= limit.max_iters.into()
900                        });
901                    oks = Collection::new(in_limit);
902                    if !limit.return_at_limit {
903                        err = err.concat(&Collection::new(over_limit).map(move |_data| {
904                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
905                                format!("{}", limit.max_iters.get()).into(),
906                            )))
907                        }));
908                    }
909                }
910
911                // Set err variable to the distinct elements of `err`.
912                // Distinctness is important, as we otherwise might add the same error each iteration,
913                // say if the limit of `oks` has an error. This would result in non-terminating rather
914                // than a clean report of the error. The trade-off is that we lose information about
915                // multiplicities of errors, but .. this seems to be the better call.
916                let err: KeyCollection<_, _, _> = err.into();
917                let errs = err
918                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
919                        "Arrange recursive err",
920                    )
921                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
922                        "Distinct recursive err",
923                        move |_k, _s, t| t.push(((), Diff::ONE)),
924                    )
925                    .as_collection(|k, _| k.clone());
926
927                // Actively interrupt the data flow during shutdown, to ensure we won't keep
928                // iterating for long (or even forever).
929                let oks = render_shutdown_fuse(oks, self.shutdown_probe.clone());
930                let errs = render_shutdown_fuse(errs, self.shutdown_probe.clone());
931
932                oks_v.set(&oks);
933                err_v.set(&errs);
934            }
935            // Now extract each of the rec bindings into the outer scope.
936            for id in rec_ids.into_iter() {
937                let bundle = self.remove_id(Id::Local(id)).unwrap();
938                let (oks, err) = bundle.collection.unwrap();
939                self.insert_id(
940                    Id::Local(id),
941                    CollectionBundle::from_collections(
942                        oks.leave_dynamic(level + 1),
943                        err.leave_dynamic(level + 1),
944                    ),
945                );
946            }
947        }
948
949        self.render_letfree_plan(object_id, plan.body, binding)
950    }
951}
952
953impl<G> Context<G>
954where
955    G: Scope,
956    G::Timestamp: RenderTimestamp,
957{
958    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
959    /// results.
960    ///
961    /// The return type reflects the uncertainty about the data representation, perhaps
962    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
963    ///
964    /// # Panics
965    ///
966    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
967    /// `render_recursive_plan` instead.
968    fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
969        let mut in_let = false;
970        for BindStage { lets, recs } in plan.binds {
971            assert!(recs.is_empty());
972
973            let mut let_iter = lets.into_iter().peekable();
974            while let Some(LetBind { id, value }) = let_iter.next() {
975                // if we encounter a single let, the body is in a let
976                in_let = true;
977                let bundle =
978                    self.scope
979                        .clone()
980                        .region_named(&format!("Binding({:?})", id), |region| {
981                            let depends = value.depends();
982                            let last = let_iter.peek().is_none();
983                            let binding = BindingInfo::Let { id, last };
984                            self.enter_region(region, Some(&depends))
985                                .render_letfree_plan(object_id, value, binding)
986                                .leave_region()
987                        });
988                self.insert_id(Id::Local(id), bundle);
989            }
990        }
991
992        self.scope.clone().region_named("Main Body", |region| {
993            let depends = plan.body.depends();
994            self.enter_region(region, Some(&depends))
995                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
996                .leave_region()
997        })
998    }
999
1000    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1001    fn render_letfree_plan(
1002        &mut self,
1003        object_id: GlobalId,
1004        plan: LetFreePlan,
1005        binding: BindingInfo,
1006    ) -> CollectionBundle<G> {
1007        let (mut nodes, root_id, topological_order) = plan.destruct();
1008
1009        // Rendered collections by their `LirId`.
1010        let mut collections = BTreeMap::new();
1011
1012        // Mappings to send along.
1013        // To save overhead, we'll only compute mappings when we need to,
1014        // which means things get gated behind options. Unfortuantely, that means we
1015        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1016        // but there's no convenient way to express the invariant.
1017        let should_compute_lir_metadata = self.compute_logger.is_some();
1018        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1019            Some(Vec::with_capacity(nodes.len()))
1020        } else {
1021            None
1022        };
1023
1024        let mut topo_iter = topological_order.into_iter().peekable();
1025        while let Some(lir_id) = topo_iter.next() {
1026            let node = nodes.remove(&lir_id).unwrap();
1027
1028            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1029            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1030            // in some other structure and have that structure impl ExprHumanizer
1031            let metadata = if should_compute_lir_metadata {
1032                let operator = node.expr.humanize(&DummyHumanizer);
1033
1034                // mark the last operator in topo order with any binding decoration
1035                let operator = if topo_iter.peek().is_none() {
1036                    match &binding {
1037                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1038                        BindingInfo::Body { in_let: false } => operator,
1039                        BindingInfo::Let { id, last: true } => {
1040                            format!("With {id} = {operator}")
1041                        }
1042                        BindingInfo::Let { id, last: false } => {
1043                            format!("{id} = {operator}")
1044                        }
1045                        BindingInfo::LetRec { id, last: true } => {
1046                            format!("With Recursive {id} = {operator}")
1047                        }
1048                        BindingInfo::LetRec { id, last: false } => {
1049                            format!("{id} = {operator}")
1050                        }
1051                    }
1052                } else {
1053                    operator
1054                };
1055
1056                let operator_id_start = self.scope.peek_identifier();
1057                Some((operator, operator_id_start))
1058            } else {
1059                None
1060            };
1061
1062            let mut bundle = self.render_plan_expr(node.expr, &collections);
1063
1064            if let Some((operator, operator_id_start)) = metadata {
1065                let operator_id_end = self.scope.peek_identifier();
1066                let operator_span = (operator_id_start, operator_id_end);
1067
1068                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1069                    lir_mapping_metadata.push((
1070                        lir_id,
1071                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1072                    ))
1073                }
1074            }
1075
1076            self.log_operator_hydration(&mut bundle, lir_id);
1077
1078            collections.insert(lir_id, bundle);
1079        }
1080
1081        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1082            self.log_lir_mapping(object_id, lir_mapping_metadata);
1083        }
1084
1085        collections
1086            .remove(&root_id)
1087            .expect("LetFreePlan invariant (1)")
1088    }
1089
1090    /// Renders a [`render_plan::Expr`], producing the collection of results.
1091    ///
1092    /// # Panics
1093    ///
1094    /// Panics if any of the expr's inputs is not found in `collections`.
1095    /// Callers must ensure that input nodes have been rendered previously.
1096    fn render_plan_expr(
1097        &mut self,
1098        expr: render_plan::Expr,
1099        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1100    ) -> CollectionBundle<G> {
1101        use render_plan::Expr::*;
1102
1103        let expect_input = |id| {
1104            collections
1105                .get(&id)
1106                .cloned()
1107                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1108        };
1109
1110        match expr {
1111            Constant { rows } => {
1112                // Produce both rows and errs to avoid conditional dataflow construction.
1113                let (rows, errs) = match rows {
1114                    Ok(rows) => (rows, Vec::new()),
1115                    Err(e) => (Vec::new(), vec![e]),
1116                };
1117
1118                // We should advance times in constant collections to start from `as_of`.
1119                let as_of_frontier = self.as_of_frontier.clone();
1120                let until = self.until.clone();
1121                let ok_collection = rows
1122                    .into_iter()
1123                    .filter_map(move |(row, mut time, diff)| {
1124                        time.advance_by(as_of_frontier.borrow());
1125                        if !until.less_equal(&time) {
1126                            Some((
1127                                row,
1128                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1129                                diff,
1130                            ))
1131                        } else {
1132                            None
1133                        }
1134                    })
1135                    .to_stream(&mut self.scope)
1136                    .as_collection();
1137
1138                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1139                error_time.advance_by(self.as_of_frontier.borrow());
1140                let err_collection = errs
1141                    .into_iter()
1142                    .map(move |e| {
1143                        (
1144                            DataflowError::from(e),
1145                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1146                            Diff::ONE,
1147                        )
1148                    })
1149                    .to_stream(&mut self.scope)
1150                    .as_collection();
1151
1152                CollectionBundle::from_collections(ok_collection, err_collection)
1153            }
1154            Get { id, keys, plan } => {
1155                // Recover the collection from `self` and then apply `mfp` to it.
1156                // If `mfp` happens to be trivial, we can just return the collection.
1157                let mut collection = self
1158                    .lookup_id(id)
1159                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1160                match plan {
1161                    mz_compute_types::plan::GetPlan::PassArrangements => {
1162                        // Assert that each of `keys` are present in `collection`.
1163                        assert!(
1164                            keys.arranged
1165                                .iter()
1166                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1167                        );
1168                        assert!(keys.raw <= collection.collection.is_some());
1169                        // Retain only those keys we want to import.
1170                        collection.arranged.retain(|key, _value| {
1171                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1172                        });
1173                        collection
1174                    }
1175                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1176                        let (oks, errs) = collection.as_collection_core(
1177                            mfp,
1178                            Some((key, row)),
1179                            self.until.clone(),
1180                            &self.config_set,
1181                        );
1182                        CollectionBundle::from_collections(oks, errs)
1183                    }
1184                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1185                        let (oks, errs) = collection.as_collection_core(
1186                            mfp,
1187                            None,
1188                            self.until.clone(),
1189                            &self.config_set,
1190                        );
1191                        CollectionBundle::from_collections(oks, errs)
1192                    }
1193                }
1194            }
1195            Mfp {
1196                input,
1197                mfp,
1198                input_key_val,
1199            } => {
1200                let input = expect_input(input);
1201                // If `mfp` is non-trivial, we should apply it and produce a collection.
1202                if mfp.is_identity() {
1203                    input
1204                } else {
1205                    let (oks, errs) = input.as_collection_core(
1206                        mfp,
1207                        input_key_val,
1208                        self.until.clone(),
1209                        &self.config_set,
1210                    );
1211                    CollectionBundle::from_collections(oks, errs)
1212                }
1213            }
1214            FlatMap {
1215                input_key,
1216                input,
1217                exprs,
1218                func,
1219                mfp_after: mfp,
1220            } => {
1221                let input = expect_input(input);
1222                self.render_flat_map(input_key, input, exprs, func, mfp)
1223            }
1224            Join { inputs, plan } => {
1225                let inputs = inputs.into_iter().map(expect_input).collect();
1226                match plan {
1227                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1228                        self.render_join(inputs, linear_plan)
1229                    }
1230                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1231                        self.render_delta_join(inputs, delta_plan)
1232                    }
1233                }
1234            }
1235            Reduce {
1236                input_key,
1237                input,
1238                key_val_plan,
1239                plan,
1240                mfp_after,
1241            } => {
1242                let input = expect_input(input);
1243                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1244                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1245            }
1246            TopK { input, top_k_plan } => {
1247                let input = expect_input(input);
1248                self.render_topk(input, top_k_plan)
1249            }
1250            Negate { input } => {
1251                let input = expect_input(input);
1252                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1253                CollectionBundle::from_collections(oks.negate(), errs)
1254            }
1255            Threshold {
1256                input,
1257                threshold_plan,
1258            } => {
1259                let input = expect_input(input);
1260                self.render_threshold(input, threshold_plan)
1261            }
1262            Union {
1263                inputs,
1264                consolidate_output,
1265            } => {
1266                let mut oks = Vec::new();
1267                let mut errs = Vec::new();
1268                for input in inputs.into_iter() {
1269                    let (os, es) =
1270                        expect_input(input).as_specific_collection(None, &self.config_set);
1271                    oks.push(os);
1272                    errs.push(es);
1273                }
1274                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1275                if consolidate_output {
1276                    oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1277                }
1278                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1279                CollectionBundle::from_collections(oks, errs)
1280            }
1281            ArrangeBy {
1282                input_key,
1283                input,
1284                input_mfp,
1285                forms: keys,
1286            } => {
1287                let input = expect_input(input);
1288                input.ensure_collections(
1289                    keys,
1290                    input_key,
1291                    input_mfp,
1292                    self.until.clone(),
1293                    &self.config_set,
1294                )
1295            }
1296        }
1297    }
1298
1299    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1300        if let Some(logger) = &self.compute_logger {
1301            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1302                dataflow_index,
1303                global_id,
1304            }));
1305        }
1306    }
1307
1308    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1309        if let Some(logger) = &self.compute_logger {
1310            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1311        }
1312    }
1313
1314    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1315        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1316        // which we should attach the logging operator.
1317        //
1318        // We could attach to each collection and track the lower bound of output frontiers.
1319        // However, that would be of limited use because we expect all collections to hydrate at
1320        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1321        // frontier of the unarranged collection advance, it will perform all work necessary to
1322        // also advance its own frontier. We don't expect significant delays between frontier
1323        // advancements of the unarranged and arranged collections, so attaching the logging
1324        // operator to any one of them should produce accurate results.
1325        //
1326        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1327        // beneficial to attach the logging operator to one of the arranged representation to avoid
1328        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1329        // if we attached the logging operator to it, we would introduce a fork in its output
1330        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1331        // that the output streams of the arrangements don't yet feed into anything else, so
1332        // attaching a (pass-through) logging operator does not introduce a fork.
1333
1334        match bundle.arranged.values_mut().next() {
1335            Some(arrangement) => {
1336                use ArrangementFlavor::*;
1337
1338                match arrangement {
1339                    Local(a, _) => {
1340                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1341                    }
1342                    Trace(_, a, _) => {
1343                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1344                    }
1345                }
1346            }
1347            None => {
1348                let (oks, _) = bundle
1349                    .collection
1350                    .as_mut()
1351                    .expect("CollectionBundle invariant");
1352                let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1353                *oks = stream.as_collection();
1354            }
1355        }
1356    }
1357
1358    fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1359    where
1360        D: Clone + 'static,
1361    {
1362        let Some(logger) = self.hydration_logger.clone() else {
1363            return stream.clone(); // hydration logging disabled
1364        };
1365
1366        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1367        //
1368        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1369        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1370        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1371        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1372        // instead.
1373        let mut hydration_frontier = Antichain::new();
1374        for time in self.as_of_frontier.iter() {
1375            if let Some(time) = time.try_step_forward() {
1376                hydration_frontier.insert(Refines::to_inner(time));
1377            }
1378        }
1379
1380        let name = format!("LogOperatorHydration ({lir_id})");
1381        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1382            let mut hydrated = false;
1383            logger.log(lir_id, hydrated);
1384
1385            move |input, output| {
1386                // Pass through inputs.
1387                input.for_each(|cap, data| {
1388                    output.session(&cap).give_container(data);
1389                });
1390
1391                if hydrated {
1392                    return;
1393                }
1394
1395                let frontier = input.frontier().frontier();
1396                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier) {
1397                    hydrated = true;
1398                    logger.log(lir_id, hydrated);
1399                }
1400            }
1401        })
1402    }
1403}
1404
1405#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1406/// A timestamp type that can be used for operations within MZ's dataflow layer.
1407pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1408    /// The system timestamp component of the timestamp.
1409    ///
1410    /// This is useful for manipulating the system time, as when delaying
1411    /// updates for subsequent cancellation, as with monotonic reduction.
1412    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1413    /// Effects a system delay in terms of the timestamp summary.
1414    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1415    /// The event timestamp component of the timestamp.
1416    fn event_time(&self) -> mz_repr::Timestamp;
1417    /// The event timestamp component of the timestamp, as a mutable reference.
1418    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1419    /// Effects an event delay in terms of the timestamp summary.
1420    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1421    /// Steps the timestamp back so that logical compaction to the output will
1422    /// not conflate `self` with any historical times.
1423    fn step_back(&self) -> Self;
1424}
1425
1426impl RenderTimestamp for mz_repr::Timestamp {
1427    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1428        self
1429    }
1430    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1431        delay
1432    }
1433    fn event_time(&self) -> mz_repr::Timestamp {
1434        *self
1435    }
1436    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1437        self
1438    }
1439    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1440        delay
1441    }
1442    fn step_back(&self) -> Self {
1443        self.saturating_sub(1)
1444    }
1445}
1446
1447impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1448    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1449        &mut self.outer
1450    }
1451    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1452        Product::new(delay, Default::default())
1453    }
1454    fn event_time(&self) -> mz_repr::Timestamp {
1455        self.outer
1456    }
1457    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1458        &mut self.outer
1459    }
1460    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1461        Product::new(delay, Default::default())
1462    }
1463    fn step_back(&self) -> Self {
1464        // It is necessary to step back both coordinates of a product,
1465        // and when one is a `PointStamp` that also means all coordinates
1466        // of the pointstamp.
1467        let inner = self.inner.clone();
1468        let mut vec = inner.into_vec();
1469        for item in vec.iter_mut() {
1470            *item = item.saturating_sub(1);
1471        }
1472        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1473    }
1474}
1475
1476/// A signal that can be awaited by operators to suspend them prior to startup.
1477///
1478/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1479///
1480/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1481///
1482///  * Async operators can simply `await` it.
1483///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1484///    and then check `StartSignal::has_fired()` on each activation.
1485#[derive(Clone)]
1486pub(crate) struct StartSignal {
1487    /// A future that completes when the signal fires.
1488    ///
1489    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1490    /// signal is activated by dropping the corresponding `Sender`.
1491    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1492    /// A weak reference to the token, to register drop-on-fire values.
1493    token_ref: Weak<RefCell<Box<dyn Any>>>,
1494}
1495
1496impl StartSignal {
1497    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1498    /// dropped.
1499    pub fn new() -> (Self, Rc<dyn Any>) {
1500        let (tx, rx) = oneshot::channel::<Infallible>();
1501        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1502        let signal = Self {
1503            fut: rx.shared(),
1504            token_ref: Rc::downgrade(&token),
1505        };
1506        (signal, token)
1507    }
1508
1509    pub fn has_fired(&self) -> bool {
1510        self.token_ref.strong_count() == 0
1511    }
1512
1513    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1514        if let Some(token) = self.token_ref.upgrade() {
1515            let mut token = token.borrow_mut();
1516            let inner = std::mem::replace(&mut *token, Box::new(()));
1517            *token = Box::new((inner, to_drop));
1518        }
1519    }
1520}
1521
1522impl Future for StartSignal {
1523    type Output = ();
1524
1525    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1526        self.fut.poll_unpin(cx).map(|_| ())
1527    }
1528}
1529
1530/// Extension trait to attach a `StartSignal` to operator outputs.
1531pub(crate) trait WithStartSignal {
1532    /// Delays data and progress updates until the start signal has fired.
1533    ///
1534    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1535    /// depending on the amount and shape of its inputs.
1536    fn with_start_signal(self, signal: StartSignal) -> Self;
1537}
1538
1539impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1540where
1541    S: Scope,
1542    S::Timestamp: RenderTimestamp,
1543    Tr: TraceReader + Clone,
1544{
1545    fn with_start_signal(self, signal: StartSignal) -> Self {
1546        Arranged {
1547            stream: self.stream.with_start_signal(signal),
1548            trace: self.trace,
1549        }
1550    }
1551}
1552
1553impl<S, D> WithStartSignal for Stream<S, D>
1554where
1555    S: Scope,
1556    D: timely::Data,
1557{
1558    fn with_start_signal(self, signal: StartSignal) -> Self {
1559        self.unary(Pipeline, "StartSignal", |_cap, info| {
1560            let token = Box::new(ActivateOnDrop::new(
1561                (),
1562                info.address,
1563                self.scope().activations(),
1564            ));
1565            signal.drop_on_fire(token);
1566
1567            let mut stash = Vec::new();
1568
1569            move |input, output| {
1570                // Stash incoming updates as long as the start signal has not fired.
1571                if !signal.has_fired() {
1572                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1573                    return;
1574                }
1575
1576                // Release any data we might still have stashed.
1577                for (cap, mut data) in std::mem::take(&mut stash) {
1578                    output.session(&cap).give_container(&mut data);
1579                }
1580
1581                // Pass through all remaining input data.
1582                input.for_each(|cap, data| {
1583                    output.session(&cap).give_container(data);
1584                });
1585            }
1586        })
1587    }
1588}
1589
1590/// Wraps the provided `Collection` with an operator that passes through all received inputs as
1591/// long as the provided `ShutdownProbe` does not announce dataflow shutdown. Once the token
1592/// dataflow is shutting down, all data flowing into the operator is dropped.
1593fn render_shutdown_fuse<G, D>(
1594    collection: Collection<G, D, Diff>,
1595    probe: ShutdownProbe,
1596) -> Collection<G, D, Diff>
1597where
1598    G: Scope,
1599    D: Data,
1600{
1601    let stream = collection.inner;
1602    let stream = stream.unary(Pipeline, "ShutdownFuse", move |_cap, _info| {
1603        move |input, output| {
1604            input.for_each(|cap, data| {
1605                if !probe.in_shutdown() {
1606                    output.session(&cap).give_container(data);
1607                }
1608            });
1609        }
1610    });
1611    stream.as_collection()
1612}
1613
1614/// Suppress progress messages for times before the given `as_of`.
1615///
1616/// This operator exists specifically to work around a memory spike we'd otherwise see when
1617/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1618/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1619/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1620/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1621/// the snapshot batch.
1622///
1623/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1624/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1625/// beyond the `as_of`, so the snapshot data has already been collected.
1626///
1627/// To ensure this, this operator needs to take two measures:
1628///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1629///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1630///
1631/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1632/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1633/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1634/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1635fn suppress_early_progress<G, D>(
1636    stream: Stream<G, D>,
1637    as_of: Antichain<G::Timestamp>,
1638) -> Stream<G, D>
1639where
1640    G: Scope,
1641    D: Data,
1642{
1643    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1644        let mut early_cap = Some(default_cap);
1645
1646        move |input, output| {
1647            input.for_each(|data_cap, data| {
1648                let mut session = if as_of.less_than(data_cap.time()) {
1649                    output.session(&data_cap)
1650                } else {
1651                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1652                    output.session(cap)
1653                };
1654                session.give_container(data);
1655            });
1656
1657            let frontier = input.frontier().frontier();
1658            if !PartialOrder::less_equal(&frontier, &as_of.borrow()) {
1659                early_cap.take();
1660            }
1661        }
1662    })
1663}
1664
1665/// Extension trait for [`StreamCore`] to selectively limit progress.
1666trait LimitProgress<T: Timestamp> {
1667    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1668    /// the implementation to observe times in data, and release capabilities based on the probe's
1669    /// frontier, after applying `slack` to round up timestamps.
1670    ///
1671    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1672    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1673    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1674    /// for times before the `upper`, which is the time until which the source can distinguish
1675    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1676    /// our capability.
1677    ///
1678    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1679    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1680    ///
1681    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1682    /// allow downstream operators to release resources.
1683    ///
1684    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1685    /// avoid unbounded memory usage.
1686    ///
1687    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1688    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1689    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1690    /// * `name` is a human-readable name for the operator.
1691    /// * `limit` is the maximum number of pending times to keep around.
1692    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1693    ///   retain a capability.
1694    ///
1695    /// Returns a shutdown button and the stream with the progress limiting applied.
1696    fn limit_progress(
1697        &self,
1698        handle: MzProbeHandle<T>,
1699        slack_ms: u64,
1700        limit: Option<usize>,
1701        upper: Antichain<T>,
1702        name: String,
1703    ) -> (Rc<dyn Any>, Self);
1704}
1705
1706// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1707// number.
1708impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1709where
1710    G: Scope<Timestamp = mz_repr::Timestamp>,
1711    D: timely::Data,
1712    R: timely::Data,
1713{
1714    fn limit_progress(
1715        &self,
1716        handle: MzProbeHandle<mz_repr::Timestamp>,
1717        slack_ms: u64,
1718        limit: Option<usize>,
1719        upper: Antichain<mz_repr::Timestamp>,
1720        name: String,
1721    ) -> (Rc<dyn Any>, Self) {
1722        let mut button = None;
1723
1724        let stream =
1725            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1726                // Times that we've observed on our input.
1727                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1728                // Capability for the lower bound of `pending_times`, if any.
1729                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1730
1731                let activator = self.scope().activator_for(info.address);
1732                handle.activate(activator.clone());
1733
1734                let shutdown = Rc::new(());
1735                button = Some(ShutdownButton::new(
1736                    Rc::new(RefCell::new(Some(Rc::clone(&shutdown)))),
1737                    activator,
1738                ));
1739                let shutdown = Rc::downgrade(&shutdown);
1740
1741                move |input, output| {
1742                    // We've been shut down, release all resources and consume inputs.
1743                    if shutdown.strong_count() == 0 {
1744                        retained_cap = None;
1745                        pending_times.clear();
1746                        while let Some(_) = input.next() {}
1747                        return;
1748                    }
1749
1750                    while let Some((cap, data)) = input.next() {
1751                        for time in data
1752                            .iter()
1753                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1754                        {
1755                            let rounded_time =
1756                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1757                            if !upper.less_than(&rounded_time.into()) {
1758                                pending_times.insert(rounded_time.into());
1759                            }
1760                        }
1761                        output.session(&cap).give_container(data);
1762                        if retained_cap.as_ref().is_none_or(|c| {
1763                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1764                        }) {
1765                            retained_cap = Some(cap.retain());
1766                        }
1767                    }
1768
1769                    handle.with_frontier(|f| {
1770                        while pending_times
1771                            .first()
1772                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1773                        {
1774                            let _ = pending_times.pop_first();
1775                        }
1776                    });
1777
1778                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1779                        let _ = pending_times.pop_first();
1780                    }
1781
1782                    match (retained_cap.as_mut(), pending_times.first()) {
1783                        (Some(cap), Some(first)) => cap.downgrade(first),
1784                        (_, None) => retained_cap = None,
1785                        _ => {}
1786                    }
1787
1788                    if input.frontier.is_empty() {
1789                        retained_cap = None;
1790                        pending_times.clear();
1791                    }
1792
1793                    if !pending_times.is_empty() {
1794                        tracing::debug!(
1795                            name,
1796                            info.global_id,
1797                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1798                            frontier = ?input.frontier.frontier().get(0),
1799                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1800                            ?upper,
1801                            "pending times",
1802                        );
1803                    }
1804                }
1805            });
1806        (Rc::new(button.unwrap().press_on_drop()), stream)
1807    }
1808}
1809
1810/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1811/// the difference between timestamps.
1812struct PendingTimesDisplay<T>(T);
1813
1814impl<T> std::fmt::Display for PendingTimesDisplay<T>
1815where
1816    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1817{
1818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1819        let mut iter = self.0.clone().into_iter();
1820        write!(f, "[")?;
1821        if let Some(first) = iter.next() {
1822            write!(f, "{}", first)?;
1823            let mut last = u64::from(first);
1824            for time in iter {
1825                write!(f, ", +{}", u64::from(time) - last)?;
1826                last = u64::from(time);
1827            }
1828        }
1829        write!(f, "]")?;
1830        Ok(())
1831    }
1832}
1833
1834/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1835/// into two rows, given the arity of the first component.
1836#[derive(Clone, Copy, Debug)]
1837struct Pairer {
1838    split_arity: usize,
1839}
1840
1841impl Pairer {
1842    /// Creates a pairer with knowledge of the arity of first component in the pair.
1843    fn new(split_arity: usize) -> Self {
1844        Self { split_arity }
1845    }
1846
1847    /// Merges a pair of datum iterators creating a `Row` instance.
1848    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1849    where
1850        I1: IntoIterator<Item = Datum<'a>>,
1851        I2: IntoIterator<Item = Datum<'a>>,
1852    {
1853        SharedRow::pack(first.into_iter().chain(second))
1854    }
1855
1856    /// Splits a datum iterator into a pair of `Row` instances.
1857    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1858        let mut datum_iter = datum_iter.into_iter();
1859        let mut row_builder = SharedRow::get();
1860        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1861        let second = row_builder.pack_using(datum_iter);
1862        (first, second)
1863    }
1864}