mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::TraceReader;
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    ENABLE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use timely::PartialOrder;
141use timely::communication::Allocate;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::Product;
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::{AsWorker, Worker as TimelyWorker};
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
181///
182/// This method imports sources from provided assets, and then builds the remaining
183/// dataflow using "compute-local" assets like shared arrangements, and producing
184/// both arrangements and sinks.
185pub fn build_compute_dataflow<A: Allocate>(
186    timely_worker: &mut TimelyWorker<A>,
187    compute_state: &mut ComputeState,
188    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189    start_signal: StartSignal,
190    until: Antichain<mz_repr::Timestamp>,
191    dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193    // Mutually recursive view definitions require special handling.
194    let recursive = dataflow
195        .objects_to_build
196        .iter()
197        .any(|object| object.plan.is_recursive());
198
199    // Determine indexes to export, and their dependencies.
200    let indexes = dataflow
201        .index_exports
202        .iter()
203        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204        .collect::<Vec<_>>();
205
206    // Determine sinks to export, and their dependencies.
207    let sinks = dataflow
208        .sink_exports
209        .iter()
210        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211        .collect::<Vec<_>>();
212
213    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215
216    let name = format!("Dataflow: {}", &dataflow.debug_name);
217    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
218    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
219
220    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
221        // TODO(ct3): This should be a config of the source instead, but at least try
222        // to contain the hacks.
223        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
224
225        // The scope.clone() occurs to allow import in the region.
226        // We build a region here to establish a pattern of a scope inside the dataflow,
227        // so that other similar uses (e.g. with iterative scopes) do not require weird
228        // alternate type signatures.
229        let mut imported_sources = Vec::new();
230        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
231        let output_probe = MzProbeHandle::default();
232
233        scope.clone().region_named(&input_name, |region| {
234            // Import declared sources into the rendering context.
235            for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
236                region.region_named(&format!("Source({:?})", source_id), |inner| {
237                    let mut read_schema = None;
238                    let mut mfp = source.arguments.operators.clone().map(|mut ops| {
239                        // If enabled, we read from Persist with a `RelationDesc` that
240                        // omits uneeded columns.
241                        if apply_demands {
242                            let demands = ops.demand();
243                            let new_desc =
244                                source.storage_metadata.relation_desc.apply_demand(&demands);
245                            let new_arity = demands.len();
246                            let remap: BTreeMap<_, _> = demands
247                                .into_iter()
248                                .enumerate()
249                                .map(|(new, old)| (old, new))
250                                .collect();
251                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
252                            read_schema = Some(new_desc);
253                        }
254
255                        mz_expr::MfpPlan::create_from(ops)
256                            .expect("Linear operators should always be valid")
257                    });
258
259                    let mut snapshot_mode = SnapshotMode::Include;
260                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
261                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
262                    if let Some(x) = ct_source_transformer.as_ref() {
263                        snapshot_mode = x.snapshot_mode();
264                        suppress_early_progress_as_of = suppress_early_progress_as_of
265                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
266                    }
267
268                    // Note: For correctness, we require that sources only emit times advanced by
269                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
270                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
271                        inner,
272                        *source_id,
273                        Arc::clone(&compute_state.persist_clients),
274                        &compute_state.txns_ctx,
275                        source.storage_metadata.clone(),
276                        read_schema,
277                        dataflow.as_of.clone(),
278                        snapshot_mode,
279                        until.clone(),
280                        mfp.as_mut(),
281                        compute_state.dataflow_max_inflight_bytes(),
282                        start_signal.clone(),
283                        ErrorHandler::Halt("compute_import"),
284                    );
285
286                    // If `mfp` is non-identity, we need to apply what remains.
287                    // For the moment, assert that it is either trivial or `None`.
288                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
289
290                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
291                    // ensure that the first frontier we report into the dataflow is beyond the
292                    // `as_of`.
293                    if let Some(as_of) = suppress_early_progress_as_of {
294                        ok_stream = suppress_early_progress(ok_stream, as_of);
295                    }
296
297                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
298                        // Apply logical backpressure to the source.
299                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
300                            .get(&compute_state.worker_config);
301                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
302                            .get(&compute_state.worker_config)
303                            .as_millis()
304                            .try_into()
305                            .expect("must fit");
306
307                        let stream = ok_stream.limit_progress(
308                            output_probe.clone(),
309                            slack,
310                            limit,
311                            upper.clone(),
312                            name.clone(),
313                        );
314                        ok_stream = stream;
315                    }
316
317                    // Attach a probe reporting the input frontier.
318                    let input_probe =
319                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
320                    ok_stream = ok_stream.probe_with(&input_probe);
321
322                    // The `suppress_early_progress` operator and the input
323                    // probe both want to work on the untransformed ct input,
324                    // make sure this stays after them.
325                    let (ok_stream, err_stream) = match ct_source_transformer {
326                        None => (ok_stream, err_stream),
327                        Some(ct_source_transformer) => {
328                            let (oks, errs, ct_times) = ct_source_transformer
329                                .transform(ok_stream.as_collection(), err_stream.as_collection());
330                            // TODO(ct3): Ideally this would be encapsulated by
331                            // ContinualTaskCtx, but the types are tricky.
332                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
333                            (oks.inner, errs.inner)
334                        }
335                    };
336
337                    let (oks, errs) = (
338                        ok_stream.as_collection().leave_region().leave_region(),
339                        err_stream.as_collection().leave_region().leave_region(),
340                    );
341
342                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
343
344                    // Associate returned tokens with the source identifier.
345                    tokens.insert(*source_id, Rc::new(token));
346                });
347            }
348        });
349
350        // If there exists a recursive expression, we'll need to use a non-region scope,
351        // in order to support additional timestamp coordinates for iteration.
352        if recursive {
353            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
354                let mut context = Context::for_dataflow_in(
355                    &dataflow,
356                    region.clone(),
357                    compute_state,
358                    until,
359                    dataflow_expiration,
360                );
361
362                for (id, (oks, errs)) in imported_sources.into_iter() {
363                    let bundle = crate::render::CollectionBundle::from_collections(
364                        oks.enter(region),
365                        errs.enter(region),
366                    );
367                    // Associate collection bundle with the source identifier.
368                    context.insert_id(id, bundle);
369                }
370
371                // Import declared indexes into the rendering context.
372                for (idx_id, idx) in &dataflow.index_imports {
373                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
374                    context.import_index(
375                        compute_state,
376                        &mut tokens,
377                        input_probe,
378                        *idx_id,
379                        &idx.desc,
380                        start_signal.clone(),
381                    );
382                }
383
384                // Build declared objects.
385                for object in dataflow.objects_to_build {
386                    let bundle = context.scope.clone().region_named(
387                        &format!("BuildingObject({:?})", object.id),
388                        |region| {
389                            let depends = object.plan.depends();
390                            let in_let = object.plan.is_recursive();
391                            context
392                                .enter_region(region, Some(&depends))
393                                .render_recursive_plan(
394                                    object.id,
395                                    0,
396                                    object.plan,
397                                    // recursive plans _must_ have bodies in a let
398                                    BindingInfo::Body { in_let },
399                                )
400                                .leave_region()
401                        },
402                    );
403                    let global_id = object.id;
404
405                    context.log_dataflow_global_id(
406                        *bundle
407                            .scope()
408                            .addr()
409                            .first()
410                            .expect("Dataflow root id must exist"),
411                        global_id,
412                    );
413                    context.insert_id(Id::Global(object.id), bundle);
414                }
415
416                // Export declared indexes.
417                for (idx_id, dependencies, idx) in indexes {
418                    context.export_index_iterative(
419                        compute_state,
420                        &tokens,
421                        dependencies,
422                        idx_id,
423                        &idx,
424                        &output_probe,
425                    );
426                }
427
428                // Export declared sinks.
429                for (sink_id, dependencies, sink) in sinks {
430                    context.export_sink(
431                        compute_state,
432                        &tokens,
433                        dependencies,
434                        sink_id,
435                        &sink,
436                        start_signal.clone(),
437                        ct_ctx.input_times(&context.scope.parent),
438                        &output_probe,
439                    );
440                }
441            });
442        } else {
443            scope.clone().region_named(&build_name, |region| {
444                let mut context = Context::for_dataflow_in(
445                    &dataflow,
446                    region.clone(),
447                    compute_state,
448                    until,
449                    dataflow_expiration,
450                );
451
452                for (id, (oks, errs)) in imported_sources.into_iter() {
453                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
454                        let as_of = context.as_of_frontier.clone();
455                        let summary = TEMPORAL_BUCKETING_SUMMARY
456                            .get(&compute_state.worker_config)
457                            .try_into()
458                            .expect("must fit");
459                        oks.inner
460                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
461                            .as_collection()
462                    } else {
463                        oks
464                    };
465                    let bundle = crate::render::CollectionBundle::from_collections(
466                        oks.enter_region(region),
467                        errs.enter_region(region),
468                    );
469                    // Associate collection bundle with the source identifier.
470                    context.insert_id(id, bundle);
471                }
472
473                // Import declared indexes into the rendering context.
474                for (idx_id, idx) in &dataflow.index_imports {
475                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
476                    context.import_index(
477                        compute_state,
478                        &mut tokens,
479                        input_probe,
480                        *idx_id,
481                        &idx.desc,
482                        start_signal.clone(),
483                    );
484                }
485
486                // Build declared objects.
487                for object in dataflow.objects_to_build {
488                    let bundle = context.scope.clone().region_named(
489                        &format!("BuildingObject({:?})", object.id),
490                        |region| {
491                            let depends = object.plan.depends();
492                            context
493                                .enter_region(region, Some(&depends))
494                                .render_plan(object.id, object.plan)
495                                .leave_region()
496                        },
497                    );
498                    let global_id = object.id;
499                    context.log_dataflow_global_id(
500                        *bundle
501                            .scope()
502                            .addr()
503                            .first()
504                            .expect("Dataflow root id must exist"),
505                        global_id,
506                    );
507                    context.insert_id(Id::Global(object.id), bundle);
508                }
509
510                // Export declared indexes.
511                for (idx_id, dependencies, idx) in indexes {
512                    context.export_index(
513                        compute_state,
514                        &tokens,
515                        dependencies,
516                        idx_id,
517                        &idx,
518                        &output_probe,
519                    );
520                }
521
522                // Export declared sinks.
523                for (sink_id, dependencies, sink) in sinks {
524                    context.export_sink(
525                        compute_state,
526                        &tokens,
527                        dependencies,
528                        sink_id,
529                        &sink,
530                        start_signal.clone(),
531                        ct_ctx.input_times(&context.scope.parent),
532                        &output_probe,
533                    );
534                }
535            });
536        }
537    })
538}
539
540// This implementation block allows child timestamps to vary from parent timestamps,
541// but requires the parent timestamp to be `repr::Timestamp`.
542impl<'g, G, T> Context<Child<'g, G, T>>
543where
544    G: Scope<Timestamp = mz_repr::Timestamp>,
545    T: Refines<G::Timestamp> + RenderTimestamp,
546{
547    pub(crate) fn import_index(
548        &mut self,
549        compute_state: &mut ComputeState,
550        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
551        input_probe: probe::Handle<mz_repr::Timestamp>,
552        idx_id: GlobalId,
553        idx: &IndexDesc,
554        start_signal: StartSignal,
555    ) {
556        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
557            assert!(
558                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
559                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
560            );
561
562            let token = traces.to_drop().clone();
563
564            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
565                &self.scope.parent,
566                &format!("Index({}, {:?})", idx.on_id, idx.key),
567                self.as_of_frontier.clone(),
568                self.until.clone(),
569            );
570
571            oks.stream = oks.stream.probe_with(&input_probe);
572
573            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
574                &self.scope.parent,
575                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
576                self.as_of_frontier.clone(),
577                self.until.clone(),
578            );
579
580            let ok_arranged = oks
581                .enter(&self.scope)
582                .with_start_signal(start_signal.clone());
583            let err_arranged = err_arranged
584                .enter(&self.scope)
585                .with_start_signal(start_signal);
586
587            self.update_id(
588                Id::Global(idx.on_id),
589                CollectionBundle::from_expressions(
590                    idx.key.clone(),
591                    ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
592                ),
593            );
594            tokens.insert(
595                idx_id,
596                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
597            );
598        } else {
599            panic!(
600                "import of index {} failed while building dataflow {}",
601                idx_id, self.dataflow_id
602            );
603        }
604    }
605}
606
607// This implementation block requires the scopes have the same timestamp as the trace manager.
608// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
609impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
610where
611    G: Scope<Timestamp = mz_repr::Timestamp>,
612{
613    pub(crate) fn export_index(
614        &self,
615        compute_state: &mut ComputeState,
616        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
617        dependency_ids: BTreeSet<GlobalId>,
618        idx_id: GlobalId,
619        idx: &IndexDesc,
620        output_probe: &MzProbeHandle<G::Timestamp>,
621    ) {
622        // put together tokens that belong to the export
623        let mut needed_tokens = Vec::new();
624        for dep_id in dependency_ids {
625            if let Some(token) = tokens.get(&dep_id) {
626                needed_tokens.push(Rc::clone(token));
627            }
628        }
629        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
630            panic!(
631                "Arrangement alarmingly absent! id: {:?}",
632                Id::Global(idx_id)
633            )
634        });
635
636        match bundle.arrangement(&idx.key) {
637            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
638                // Ensure that the frontier does not advance past the expiration time, if set.
639                // Otherwise, we might write down incorrect data.
640                if let Some(&expiration) = self.dataflow_expiration.as_option() {
641                    oks.stream = oks.stream.expire_stream_at(
642                        &format!("{}_export_index_oks", self.debug_name),
643                        expiration,
644                    );
645                    errs.stream = errs.stream.expire_stream_at(
646                        &format!("{}_export_index_errs", self.debug_name),
647                        expiration,
648                    );
649                }
650
651                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
652
653                // Attach logging of dataflow errors.
654                if let Some(logger) = compute_state.compute_logger.clone() {
655                    errs.stream.log_dataflow_errors(logger, idx_id);
656                }
657
658                compute_state.traces.set(
659                    idx_id,
660                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
661                );
662            }
663            Some(ArrangementFlavor::Trace(gid, _, _)) => {
664                // Duplicate of existing arrangement with id `gid`, so
665                // just create another handle to that arrangement.
666                let trace = compute_state.traces.get(&gid).unwrap().clone();
667                compute_state.traces.set(idx_id, trace);
668            }
669            None => {
670                println!("collection available: {:?}", bundle.collection.is_none());
671                println!(
672                    "keys available: {:?}",
673                    bundle.arranged.keys().collect::<Vec<_>>()
674                );
675                panic!(
676                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
677                    Id::Global(idx_id),
678                    &idx.key
679                );
680            }
681        };
682    }
683}
684
685// This implementation block requires the scopes have the same timestamp as the trace manager.
686// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
687impl<'g, G, T> Context<Child<'g, G, T>>
688where
689    G: Scope<Timestamp = mz_repr::Timestamp>,
690    T: RenderTimestamp,
691{
692    pub(crate) fn export_index_iterative(
693        &self,
694        compute_state: &mut ComputeState,
695        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
696        dependency_ids: BTreeSet<GlobalId>,
697        idx_id: GlobalId,
698        idx: &IndexDesc,
699        output_probe: &MzProbeHandle<G::Timestamp>,
700    ) {
701        // put together tokens that belong to the export
702        let mut needed_tokens = Vec::new();
703        for dep_id in dependency_ids {
704            if let Some(token) = tokens.get(&dep_id) {
705                needed_tokens.push(Rc::clone(token));
706            }
707        }
708        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
709            panic!(
710                "Arrangement alarmingly absent! id: {:?}",
711                Id::Global(idx_id)
712            )
713        });
714
715        match bundle.arrangement(&idx.key) {
716            Some(ArrangementFlavor::Local(oks, errs)) => {
717                // TODO: The following as_collection/leave/arrange sequence could be optimized.
718                //   * Combine as_collection and leave into a single function.
719                //   * Use columnar to extract columns from the batches to implement leave.
720                let mut oks = oks
721                    .as_collection(|k, v| (k.to_row(), v.to_row()))
722                    .leave()
723                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
724                    "Arrange export iterative",
725                );
726
727                let mut errs = errs
728                    .as_collection(|k, v| (k.clone(), v.clone()))
729                    .leave()
730                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
731                        "Arrange export iterative err",
732                    );
733
734                // Ensure that the frontier does not advance past the expiration time, if set.
735                // Otherwise, we might write down incorrect data.
736                if let Some(&expiration) = self.dataflow_expiration.as_option() {
737                    oks.stream = oks.stream.expire_stream_at(
738                        &format!("{}_export_index_iterative_oks", self.debug_name),
739                        expiration,
740                    );
741                    errs.stream = errs.stream.expire_stream_at(
742                        &format!("{}_export_index_iterative_err", self.debug_name),
743                        expiration,
744                    );
745                }
746
747                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
748
749                // Attach logging of dataflow errors.
750                if let Some(logger) = compute_state.compute_logger.clone() {
751                    errs.stream.log_dataflow_errors(logger, idx_id);
752                }
753
754                compute_state.traces.set(
755                    idx_id,
756                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
757                );
758            }
759            Some(ArrangementFlavor::Trace(gid, _, _)) => {
760                // Duplicate of existing arrangement with id `gid`, so
761                // just create another handle to that arrangement.
762                let trace = compute_state.traces.get(&gid).unwrap().clone();
763                compute_state.traces.set(idx_id, trace);
764            }
765            None => {
766                println!("collection available: {:?}", bundle.collection.is_none());
767                println!(
768                    "keys available: {:?}",
769                    bundle.arranged.keys().collect::<Vec<_>>()
770                );
771                panic!(
772                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
773                    Id::Global(idx_id),
774                    &idx.key
775                );
776            }
777        };
778    }
779}
780
781/// Information about bindings, tracked in `render_recursive_plan` and
782/// `render_plan`, to be passed to `render_letfree_plan`.
783///
784/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
785/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
786enum BindingInfo {
787    Body { in_let: bool },
788    Let { id: LocalId, last: bool },
789    LetRec { id: LocalId, last: bool },
790}
791
792impl<G> Context<G>
793where
794    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
795{
796    /// Renders a plan to a differential dataflow, producing the collection of results.
797    ///
798    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
799    /// in the context of `level` pre-existing iteration coordinates.
800    ///
801    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
802    /// and establishing the appropriate recursive dependencies among the bound variables.
803    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
804    /// further [`RecBind`]s are found.
805    ///
806    /// The method requires that all variables conclude with a physical representation that
807    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
808    fn render_recursive_plan(
809        &mut self,
810        object_id: GlobalId,
811        level: usize,
812        plan: RenderPlan,
813        binding: BindingInfo,
814    ) -> CollectionBundle<G> {
815        for BindStage { lets, recs } in plan.binds {
816            // Render the let bindings in order.
817            let mut let_iter = lets.into_iter().peekable();
818            while let Some(LetBind { id, value }) = let_iter.next() {
819                let bundle =
820                    self.scope
821                        .clone()
822                        .region_named(&format!("Binding({:?})", id), |region| {
823                            let depends = value.depends();
824                            let last = let_iter.peek().is_none();
825                            let binding = BindingInfo::Let { id, last };
826                            self.enter_region(region, Some(&depends))
827                                .render_letfree_plan(object_id, value, binding)
828                                .leave_region()
829                        });
830                self.insert_id(Id::Local(id), bundle);
831            }
832
833            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
834
835            // Define variables for rec bindings.
836            // It is important that we only use the `SemigroupVariable` until the object is bound.
837            // At that point, all subsequent uses should have access to the object itself.
838            let mut variables = BTreeMap::new();
839            for id in rec_ids.iter() {
840                use differential_dataflow::dynamic::feedback_summary;
841                let inner = feedback_summary::<u64>(level + 1, 1);
842                let oks_v = SemigroupVariable::new(
843                    &mut self.scope,
844                    Product::new(Default::default(), inner.clone()),
845                );
846                let err_v = SemigroupVariable::new(
847                    &mut self.scope,
848                    Product::new(Default::default(), inner),
849                );
850
851                self.insert_id(
852                    Id::Local(*id),
853                    CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
854                );
855                variables.insert(Id::Local(*id), (oks_v, err_v));
856            }
857            // Now render each of the rec bindings.
858            let mut rec_iter = recs.into_iter().peekable();
859            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
860                let last = rec_iter.peek().is_none();
861                let binding = BindingInfo::LetRec { id, last };
862                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
863                // We need to ensure that the raw collection exists, but do not have enough information
864                // here to cause that to happen.
865                let (oks, mut err) = bundle.collection.clone().unwrap();
866                self.insert_id(Id::Local(id), bundle);
867                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
868
869                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
870                let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
871
872                if let Some(limit) = limit {
873                    // We swallow the results of the `max_iter`th iteration, because
874                    // these results would go into the `max_iter + 1`th iteration.
875                    let (in_limit, over_limit) =
876                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
877                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
878                            let iteration_index = *ps.get(level).unwrap_or(&0);
879                            // The pointstamp starts counting from 0, so we need to add 1.
880                            iteration_index + 1 >= limit.max_iters.into()
881                        });
882                    oks = VecCollection::new(in_limit);
883                    if !limit.return_at_limit {
884                        err = err.concat(&VecCollection::new(over_limit).map(move |_data| {
885                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
886                                format!("{}", limit.max_iters.get()).into(),
887                            )))
888                        }));
889                    }
890                }
891
892                // Set err variable to the distinct elements of `err`.
893                // Distinctness is important, as we otherwise might add the same error each iteration,
894                // say if the limit of `oks` has an error. This would result in non-terminating rather
895                // than a clean report of the error. The trade-off is that we lose information about
896                // multiplicities of errors, but .. this seems to be the better call.
897                let err: KeyCollection<_, _, _> = err.into();
898                let errs = err
899                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
900                        "Arrange recursive err",
901                    )
902                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
903                        "Distinct recursive err",
904                        move |_k, _s, t| t.push(((), Diff::ONE)),
905                    )
906                    .as_collection(|k, _| k.clone());
907
908                oks_v.set(&oks);
909                err_v.set(&errs);
910            }
911            // Now extract each of the rec bindings into the outer scope.
912            for id in rec_ids.into_iter() {
913                let bundle = self.remove_id(Id::Local(id)).unwrap();
914                let (oks, err) = bundle.collection.unwrap();
915                self.insert_id(
916                    Id::Local(id),
917                    CollectionBundle::from_collections(
918                        oks.leave_dynamic(level + 1),
919                        err.leave_dynamic(level + 1),
920                    ),
921                );
922            }
923        }
924
925        self.render_letfree_plan(object_id, plan.body, binding)
926    }
927}
928
929impl<G> Context<G>
930where
931    G: Scope,
932    G::Timestamp: RenderTimestamp,
933{
934    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
935    /// results.
936    ///
937    /// The return type reflects the uncertainty about the data representation, perhaps
938    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
939    ///
940    /// # Panics
941    ///
942    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
943    /// `render_recursive_plan` instead.
944    fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
945        let mut in_let = false;
946        for BindStage { lets, recs } in plan.binds {
947            assert!(recs.is_empty());
948
949            let mut let_iter = lets.into_iter().peekable();
950            while let Some(LetBind { id, value }) = let_iter.next() {
951                // if we encounter a single let, the body is in a let
952                in_let = true;
953                let bundle =
954                    self.scope
955                        .clone()
956                        .region_named(&format!("Binding({:?})", id), |region| {
957                            let depends = value.depends();
958                            let last = let_iter.peek().is_none();
959                            let binding = BindingInfo::Let { id, last };
960                            self.enter_region(region, Some(&depends))
961                                .render_letfree_plan(object_id, value, binding)
962                                .leave_region()
963                        });
964                self.insert_id(Id::Local(id), bundle);
965            }
966        }
967
968        self.scope.clone().region_named("Main Body", |region| {
969            let depends = plan.body.depends();
970            self.enter_region(region, Some(&depends))
971                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
972                .leave_region()
973        })
974    }
975
976    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
977    fn render_letfree_plan(
978        &mut self,
979        object_id: GlobalId,
980        plan: LetFreePlan,
981        binding: BindingInfo,
982    ) -> CollectionBundle<G> {
983        let (mut nodes, root_id, topological_order) = plan.destruct();
984
985        // Rendered collections by their `LirId`.
986        let mut collections = BTreeMap::new();
987
988        // Mappings to send along.
989        // To save overhead, we'll only compute mappings when we need to,
990        // which means things get gated behind options. Unfortunately, that means we
991        // have several `Option<...>` types that are _all_ `Some` or `None` together,
992        // but there's no convenient way to express the invariant.
993        let should_compute_lir_metadata = self.compute_logger.is_some();
994        let mut lir_mapping_metadata = if should_compute_lir_metadata {
995            Some(Vec::with_capacity(nodes.len()))
996        } else {
997            None
998        };
999
1000        let mut topo_iter = topological_order.into_iter().peekable();
1001        while let Some(lir_id) = topo_iter.next() {
1002            let node = nodes.remove(&lir_id).unwrap();
1003
1004            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1005            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1006            // in some other structure and have that structure impl ExprHumanizer
1007            let metadata = if should_compute_lir_metadata {
1008                let operator = node.expr.humanize(&DummyHumanizer);
1009
1010                // mark the last operator in topo order with any binding decoration
1011                let operator = if topo_iter.peek().is_none() {
1012                    match &binding {
1013                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1014                        BindingInfo::Body { in_let: false } => operator,
1015                        BindingInfo::Let { id, last: true } => {
1016                            format!("With {id} = {operator}")
1017                        }
1018                        BindingInfo::Let { id, last: false } => {
1019                            format!("{id} = {operator}")
1020                        }
1021                        BindingInfo::LetRec { id, last: true } => {
1022                            format!("With Recursive {id} = {operator}")
1023                        }
1024                        BindingInfo::LetRec { id, last: false } => {
1025                            format!("{id} = {operator}")
1026                        }
1027                    }
1028                } else {
1029                    operator
1030                };
1031
1032                let operator_id_start = self.scope.peek_identifier();
1033                Some((operator, operator_id_start))
1034            } else {
1035                None
1036            };
1037
1038            let mut bundle = self.render_plan_expr(node.expr, &collections);
1039
1040            if let Some((operator, operator_id_start)) = metadata {
1041                let operator_id_end = self.scope.peek_identifier();
1042                let operator_span = (operator_id_start, operator_id_end);
1043
1044                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1045                    lir_mapping_metadata.push((
1046                        lir_id,
1047                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1048                    ))
1049                }
1050            }
1051
1052            self.log_operator_hydration(&mut bundle, lir_id);
1053
1054            collections.insert(lir_id, bundle);
1055        }
1056
1057        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1058            self.log_lir_mapping(object_id, lir_mapping_metadata);
1059        }
1060
1061        collections
1062            .remove(&root_id)
1063            .expect("LetFreePlan invariant (1)")
1064    }
1065
1066    /// Renders a [`render_plan::Expr`], producing the collection of results.
1067    ///
1068    /// # Panics
1069    ///
1070    /// Panics if any of the expr's inputs is not found in `collections`.
1071    /// Callers must ensure that input nodes have been rendered previously.
1072    fn render_plan_expr(
1073        &mut self,
1074        expr: render_plan::Expr,
1075        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1076    ) -> CollectionBundle<G> {
1077        use render_plan::Expr::*;
1078
1079        let expect_input = |id| {
1080            collections
1081                .get(&id)
1082                .cloned()
1083                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1084        };
1085
1086        match expr {
1087            Constant { rows } => {
1088                // Produce both rows and errs to avoid conditional dataflow construction.
1089                let (rows, errs) = match rows {
1090                    Ok(rows) => (rows, Vec::new()),
1091                    Err(e) => (Vec::new(), vec![e]),
1092                };
1093
1094                // We should advance times in constant collections to start from `as_of`.
1095                let as_of_frontier = self.as_of_frontier.clone();
1096                let until = self.until.clone();
1097                let ok_collection = rows
1098                    .into_iter()
1099                    .filter_map(move |(row, mut time, diff)| {
1100                        time.advance_by(as_of_frontier.borrow());
1101                        if !until.less_equal(&time) {
1102                            Some((
1103                                row,
1104                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1105                                diff,
1106                            ))
1107                        } else {
1108                            None
1109                        }
1110                    })
1111                    .to_stream(&mut self.scope)
1112                    .as_collection();
1113
1114                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1115                error_time.advance_by(self.as_of_frontier.borrow());
1116                let err_collection = errs
1117                    .into_iter()
1118                    .map(move |e| {
1119                        (
1120                            DataflowError::from(e),
1121                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1122                            Diff::ONE,
1123                        )
1124                    })
1125                    .to_stream(&mut self.scope)
1126                    .as_collection();
1127
1128                CollectionBundle::from_collections(ok_collection, err_collection)
1129            }
1130            Get { id, keys, plan } => {
1131                // Recover the collection from `self` and then apply `mfp` to it.
1132                // If `mfp` happens to be trivial, we can just return the collection.
1133                let mut collection = self
1134                    .lookup_id(id)
1135                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1136                match plan {
1137                    mz_compute_types::plan::GetPlan::PassArrangements => {
1138                        // Assert that each of `keys` are present in `collection`.
1139                        assert!(
1140                            keys.arranged
1141                                .iter()
1142                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1143                        );
1144                        assert!(keys.raw <= collection.collection.is_some());
1145                        // Retain only those keys we want to import.
1146                        collection.arranged.retain(|key, _value| {
1147                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1148                        });
1149                        collection
1150                    }
1151                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1152                        let (oks, errs) = collection.as_collection_core(
1153                            mfp,
1154                            Some((key, row)),
1155                            self.until.clone(),
1156                            &self.config_set,
1157                        );
1158                        CollectionBundle::from_collections(oks, errs)
1159                    }
1160                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1161                        let (oks, errs) = collection.as_collection_core(
1162                            mfp,
1163                            None,
1164                            self.until.clone(),
1165                            &self.config_set,
1166                        );
1167                        CollectionBundle::from_collections(oks, errs)
1168                    }
1169                }
1170            }
1171            Mfp {
1172                input,
1173                mfp,
1174                input_key_val,
1175            } => {
1176                let input = expect_input(input);
1177                // If `mfp` is non-trivial, we should apply it and produce a collection.
1178                if mfp.is_identity() {
1179                    input
1180                } else {
1181                    let (oks, errs) = input.as_collection_core(
1182                        mfp,
1183                        input_key_val,
1184                        self.until.clone(),
1185                        &self.config_set,
1186                    );
1187                    CollectionBundle::from_collections(oks, errs)
1188                }
1189            }
1190            FlatMap {
1191                input_key,
1192                input,
1193                exprs,
1194                func,
1195                mfp_after: mfp,
1196            } => {
1197                let input = expect_input(input);
1198                self.render_flat_map(input_key, input, exprs, func, mfp)
1199            }
1200            Join { inputs, plan } => {
1201                let inputs = inputs.into_iter().map(expect_input).collect();
1202                match plan {
1203                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1204                        self.render_join(inputs, linear_plan)
1205                    }
1206                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1207                        self.render_delta_join(inputs, delta_plan)
1208                    }
1209                }
1210            }
1211            Reduce {
1212                input_key,
1213                input,
1214                key_val_plan,
1215                plan,
1216                mfp_after,
1217            } => {
1218                let input = expect_input(input);
1219                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1220                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1221            }
1222            TopK { input, top_k_plan } => {
1223                let input = expect_input(input);
1224                self.render_topk(input, top_k_plan)
1225            }
1226            Negate { input } => {
1227                let input = expect_input(input);
1228                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1229                CollectionBundle::from_collections(oks.negate(), errs)
1230            }
1231            Threshold {
1232                input,
1233                threshold_plan,
1234            } => {
1235                let input = expect_input(input);
1236                self.render_threshold(input, threshold_plan)
1237            }
1238            Union {
1239                inputs,
1240                consolidate_output,
1241            } => {
1242                let mut oks = Vec::new();
1243                let mut errs = Vec::new();
1244                for input in inputs.into_iter() {
1245                    let (os, es) =
1246                        expect_input(input).as_specific_collection(None, &self.config_set);
1247                    oks.push(os);
1248                    errs.push(es);
1249                }
1250                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1251                if consolidate_output {
1252                    oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1253                }
1254                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1255                CollectionBundle::from_collections(oks, errs)
1256            }
1257            ArrangeBy {
1258                input_key,
1259                input,
1260                input_mfp,
1261                forms: keys,
1262            } => {
1263                let input = expect_input(input);
1264                input.ensure_collections(
1265                    keys,
1266                    input_key,
1267                    input_mfp,
1268                    self.until.clone(),
1269                    &self.config_set,
1270                )
1271            }
1272        }
1273    }
1274
1275    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1276        if let Some(logger) = &self.compute_logger {
1277            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1278                dataflow_index,
1279                global_id,
1280            }));
1281        }
1282    }
1283
1284    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1285        if let Some(logger) = &self.compute_logger {
1286            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1287        }
1288    }
1289
1290    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1291        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1292        // which we should attach the logging operator.
1293        //
1294        // We could attach to each collection and track the lower bound of output frontiers.
1295        // However, that would be of limited use because we expect all collections to hydrate at
1296        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1297        // frontier of the unarranged collection advance, it will perform all work necessary to
1298        // also advance its own frontier. We don't expect significant delays between frontier
1299        // advancements of the unarranged and arranged collections, so attaching the logging
1300        // operator to any one of them should produce accurate results.
1301        //
1302        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1303        // beneficial to attach the logging operator to one of the arranged representation to avoid
1304        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1305        // if we attached the logging operator to it, we would introduce a fork in its output
1306        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1307        // that the output streams of the arrangements don't yet feed into anything else, so
1308        // attaching a (pass-through) logging operator does not introduce a fork.
1309
1310        match bundle.arranged.values_mut().next() {
1311            Some(arrangement) => {
1312                use ArrangementFlavor::*;
1313
1314                match arrangement {
1315                    Local(a, _) => {
1316                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1317                    }
1318                    Trace(_, a, _) => {
1319                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1320                    }
1321                }
1322            }
1323            None => {
1324                let (oks, _) = bundle
1325                    .collection
1326                    .as_mut()
1327                    .expect("CollectionBundle invariant");
1328                let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1329                *oks = stream.as_collection();
1330            }
1331        }
1332    }
1333
1334    fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1335    where
1336        D: Clone + 'static,
1337    {
1338        let Some(logger) = self.compute_logger.clone() else {
1339            return stream.clone(); // hydration logging disabled
1340        };
1341
1342        let export_ids = self.export_ids.clone();
1343
1344        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1345        //
1346        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1347        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1348        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1349        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1350        // instead.
1351        let mut hydration_frontier = Antichain::new();
1352        for time in self.as_of_frontier.iter() {
1353            if let Some(time) = time.try_step_forward() {
1354                hydration_frontier.insert(Refines::to_inner(time));
1355            }
1356        }
1357
1358        let name = format!("LogOperatorHydration ({lir_id})");
1359        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1360            let mut hydrated = false;
1361
1362            for &export_id in &export_ids {
1363                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1364                    export_id,
1365                    lir_id,
1366                    hydrated,
1367                }));
1368            }
1369
1370            move |(input, frontier), output| {
1371                // Pass through inputs.
1372                input.for_each(|cap, data| {
1373                    output.session(&cap).give_container(data);
1374                });
1375
1376                if hydrated {
1377                    return;
1378                }
1379
1380                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1381                    hydrated = true;
1382
1383                    for &export_id in &export_ids {
1384                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1385                            export_id,
1386                            lir_id,
1387                            hydrated,
1388                        }));
1389                    }
1390                }
1391            }
1392        })
1393    }
1394}
1395
1396#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1397/// A timestamp type that can be used for operations within MZ's dataflow layer.
1398pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1399    /// The system timestamp component of the timestamp.
1400    ///
1401    /// This is useful for manipulating the system time, as when delaying
1402    /// updates for subsequent cancellation, as with monotonic reduction.
1403    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1404    /// Effects a system delay in terms of the timestamp summary.
1405    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1406    /// The event timestamp component of the timestamp.
1407    fn event_time(&self) -> mz_repr::Timestamp;
1408    /// The event timestamp component of the timestamp, as a mutable reference.
1409    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1410    /// Effects an event delay in terms of the timestamp summary.
1411    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1412    /// Steps the timestamp back so that logical compaction to the output will
1413    /// not conflate `self` with any historical times.
1414    fn step_back(&self) -> Self;
1415}
1416
1417impl RenderTimestamp for mz_repr::Timestamp {
1418    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1419        self
1420    }
1421    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1422        delay
1423    }
1424    fn event_time(&self) -> mz_repr::Timestamp {
1425        *self
1426    }
1427    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1428        self
1429    }
1430    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1431        delay
1432    }
1433    fn step_back(&self) -> Self {
1434        self.saturating_sub(1)
1435    }
1436}
1437
1438impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1439    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1440        &mut self.outer
1441    }
1442    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1443        Product::new(delay, Default::default())
1444    }
1445    fn event_time(&self) -> mz_repr::Timestamp {
1446        self.outer
1447    }
1448    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1449        &mut self.outer
1450    }
1451    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1452        Product::new(delay, Default::default())
1453    }
1454    fn step_back(&self) -> Self {
1455        // It is necessary to step back both coordinates of a product,
1456        // and when one is a `PointStamp` that also means all coordinates
1457        // of the pointstamp.
1458        let inner = self.inner.clone();
1459        let mut vec = inner.into_vec();
1460        for item in vec.iter_mut() {
1461            *item = item.saturating_sub(1);
1462        }
1463        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1464    }
1465}
1466
1467/// A signal that can be awaited by operators to suspend them prior to startup.
1468///
1469/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1470///
1471/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1472///
1473///  * Async operators can simply `await` it.
1474///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1475///    and then check `StartSignal::has_fired()` on each activation.
1476#[derive(Clone)]
1477pub(crate) struct StartSignal {
1478    /// A future that completes when the signal fires.
1479    ///
1480    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1481    /// signal is activated by dropping the corresponding `Sender`.
1482    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1483    /// A weak reference to the token, to register drop-on-fire values.
1484    token_ref: Weak<RefCell<Box<dyn Any>>>,
1485}
1486
1487impl StartSignal {
1488    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1489    /// dropped.
1490    pub fn new() -> (Self, Rc<dyn Any>) {
1491        let (tx, rx) = oneshot::channel::<Infallible>();
1492        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1493        let signal = Self {
1494            fut: rx.shared(),
1495            token_ref: Rc::downgrade(&token),
1496        };
1497        (signal, token)
1498    }
1499
1500    pub fn has_fired(&self) -> bool {
1501        self.token_ref.strong_count() == 0
1502    }
1503
1504    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1505        if let Some(token) = self.token_ref.upgrade() {
1506            let mut token = token.borrow_mut();
1507            let inner = std::mem::replace(&mut *token, Box::new(()));
1508            *token = Box::new((inner, to_drop));
1509        }
1510    }
1511}
1512
1513impl Future for StartSignal {
1514    type Output = ();
1515
1516    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1517        self.fut.poll_unpin(cx).map(|_| ())
1518    }
1519}
1520
1521/// Extension trait to attach a `StartSignal` to operator outputs.
1522pub(crate) trait WithStartSignal {
1523    /// Delays data and progress updates until the start signal has fired.
1524    ///
1525    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1526    /// depending on the amount and shape of its inputs.
1527    fn with_start_signal(self, signal: StartSignal) -> Self;
1528}
1529
1530impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1531where
1532    S: Scope,
1533    S::Timestamp: RenderTimestamp,
1534    Tr: TraceReader + Clone,
1535{
1536    fn with_start_signal(self, signal: StartSignal) -> Self {
1537        Arranged {
1538            stream: self.stream.with_start_signal(signal),
1539            trace: self.trace,
1540        }
1541    }
1542}
1543
1544impl<S, D> WithStartSignal for Stream<S, D>
1545where
1546    S: Scope,
1547    D: timely::Data,
1548{
1549    fn with_start_signal(self, signal: StartSignal) -> Self {
1550        self.unary(Pipeline, "StartSignal", |_cap, info| {
1551            let token = Box::new(ActivateOnDrop::new(
1552                (),
1553                info.address,
1554                self.scope().activations(),
1555            ));
1556            signal.drop_on_fire(token);
1557
1558            let mut stash = Vec::new();
1559
1560            move |input, output| {
1561                // Stash incoming updates as long as the start signal has not fired.
1562                if !signal.has_fired() {
1563                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1564                    return;
1565                }
1566
1567                // Release any data we might still have stashed.
1568                for (cap, mut data) in std::mem::take(&mut stash) {
1569                    output.session(&cap).give_container(&mut data);
1570                }
1571
1572                // Pass through all remaining input data.
1573                input.for_each(|cap, data| {
1574                    output.session(&cap).give_container(data);
1575                });
1576            }
1577        })
1578    }
1579}
1580
1581/// Suppress progress messages for times before the given `as_of`.
1582///
1583/// This operator exists specifically to work around a memory spike we'd otherwise see when
1584/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1585/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1586/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1587/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1588/// the snapshot batch.
1589///
1590/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1591/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1592/// beyond the `as_of`, so the snapshot data has already been collected.
1593///
1594/// To ensure this, this operator needs to take two measures:
1595///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1596///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1597///
1598/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1599/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1600/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1601/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1602fn suppress_early_progress<G, D>(
1603    stream: Stream<G, D>,
1604    as_of: Antichain<G::Timestamp>,
1605) -> Stream<G, D>
1606where
1607    G: Scope,
1608    D: Data,
1609{
1610    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1611        let mut early_cap = Some(default_cap);
1612
1613        move |(input, frontier), output| {
1614            input.for_each_time(|data_cap, data| {
1615                if as_of.less_than(data_cap.time()) {
1616                    let mut session = output.session(&data_cap);
1617                    for data in data {
1618                        session.give_container(data);
1619                    }
1620                } else {
1621                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1622                    let mut session = output.session(&cap);
1623                    for data in data {
1624                        session.give_container(data);
1625                    }
1626                }
1627            });
1628
1629            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1630                early_cap.take();
1631            }
1632        }
1633    })
1634}
1635
1636/// Extension trait for [`StreamCore`] to selectively limit progress.
1637trait LimitProgress<T: Timestamp> {
1638    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1639    /// the implementation to observe times in data, and release capabilities based on the probe's
1640    /// frontier, after applying `slack` to round up timestamps.
1641    ///
1642    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1643    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1644    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1645    /// for times before the `upper`, which is the time until which the source can distinguish
1646    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1647    /// our capability.
1648    ///
1649    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1650    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1651    ///
1652    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1653    /// allow downstream operators to release resources.
1654    ///
1655    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1656    /// avoid unbounded memory usage.
1657    ///
1658    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1659    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1660    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1661    /// * `name` is a human-readable name for the operator.
1662    /// * `limit` is the maximum number of pending times to keep around.
1663    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1664    ///   retain a capability.
1665    fn limit_progress(
1666        &self,
1667        handle: MzProbeHandle<T>,
1668        slack_ms: u64,
1669        limit: Option<usize>,
1670        upper: Antichain<T>,
1671        name: String,
1672    ) -> Self;
1673}
1674
1675// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1676// number.
1677impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1678where
1679    G: Scope<Timestamp = mz_repr::Timestamp>,
1680    D: timely::Data,
1681    R: timely::Data,
1682{
1683    fn limit_progress(
1684        &self,
1685        handle: MzProbeHandle<mz_repr::Timestamp>,
1686        slack_ms: u64,
1687        limit: Option<usize>,
1688        upper: Antichain<mz_repr::Timestamp>,
1689        name: String,
1690    ) -> Self {
1691        let stream =
1692            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1693                // Times that we've observed on our input.
1694                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1695                // Capability for the lower bound of `pending_times`, if any.
1696                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1697
1698                let activator = self.scope().activator_for(info.address);
1699                handle.activate(activator.clone());
1700
1701                move |(input, frontier), output| {
1702                    input.for_each(|cap, data| {
1703                        for time in data
1704                            .iter()
1705                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1706                        {
1707                            let rounded_time =
1708                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1709                            if !upper.less_than(&rounded_time.into()) {
1710                                pending_times.insert(rounded_time.into());
1711                            }
1712                        }
1713                        output.session(&cap).give_container(data);
1714                        if retained_cap.as_ref().is_none_or(|c| {
1715                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1716                        }) {
1717                            retained_cap = Some(cap.retain());
1718                        }
1719                    });
1720
1721                    handle.with_frontier(|f| {
1722                        while pending_times
1723                            .first()
1724                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1725                        {
1726                            let _ = pending_times.pop_first();
1727                        }
1728                    });
1729
1730                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1731                        let _ = pending_times.pop_first();
1732                    }
1733
1734                    match (retained_cap.as_mut(), pending_times.first()) {
1735                        (Some(cap), Some(first)) => cap.downgrade(first),
1736                        (_, None) => retained_cap = None,
1737                        _ => {}
1738                    }
1739
1740                    if frontier.is_empty() {
1741                        retained_cap = None;
1742                        pending_times.clear();
1743                    }
1744
1745                    if !pending_times.is_empty() {
1746                        tracing::debug!(
1747                            name,
1748                            info.global_id,
1749                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1750                            frontier = ?frontier.frontier().get(0),
1751                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1752                            ?upper,
1753                            "pending times",
1754                        );
1755                    }
1756                }
1757            });
1758        stream
1759    }
1760}
1761
1762/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1763/// the difference between timestamps.
1764struct PendingTimesDisplay<T>(T);
1765
1766impl<T> std::fmt::Display for PendingTimesDisplay<T>
1767where
1768    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1769{
1770    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1771        let mut iter = self.0.clone().into_iter();
1772        write!(f, "[")?;
1773        if let Some(first) = iter.next() {
1774            write!(f, "{}", first)?;
1775            let mut last = u64::from(first);
1776            for time in iter {
1777                write!(f, ", +{}", u64::from(time) - last)?;
1778                last = u64::from(time);
1779            }
1780        }
1781        write!(f, "]")?;
1782        Ok(())
1783    }
1784}
1785
1786/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1787/// into two rows, given the arity of the first component.
1788#[derive(Clone, Copy, Debug)]
1789struct Pairer {
1790    split_arity: usize,
1791}
1792
1793impl Pairer {
1794    /// Creates a pairer with knowledge of the arity of first component in the pair.
1795    fn new(split_arity: usize) -> Self {
1796        Self { split_arity }
1797    }
1798
1799    /// Merges a pair of datum iterators creating a `Row` instance.
1800    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1801    where
1802        I1: IntoIterator<Item = Datum<'a>>,
1803        I2: IntoIterator<Item = Datum<'a>>,
1804    {
1805        SharedRow::pack(first.into_iter().chain(second))
1806    }
1807
1808    /// Splits a datum iterator into a pair of `Row` instances.
1809    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1810        let mut datum_iter = datum_iter.into_iter();
1811        let mut row_builder = SharedRow::get();
1812        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1813        let second = row_builder.pack_using(datum_iter);
1814        (first, second)
1815    }
1816}