Skip to main content

mz_compute/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders a plan into a timely/differential dataflow computation.
11//!
12//! ## Error handling
13//!
14//! Timely and differential have no idioms for computations that can error. The
15//! philosophy is, reasonably, to define the semantics of the computation such
16//! that errors are unnecessary: e.g., by using wrap-around semantics for
17//! integer overflow.
18//!
19//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
20//! in myriad cases. The classic example is a division by zero, but invalid
21//! input for casts, overflowing integer operations, and dozens of other
22//! functions need the ability to produce errors ar runtime.
23//!
24//! At the moment, only *scalar* expression evaluation can fail, so only
25//! operators that evaluate scalar expressions can fail. At the time of writing,
26//! that includes map, filter, reduce, and join operators. Constants are a bit
27//! of a special case: they can be either a constant vector of rows *or* a
28//! constant, singular error.
29//!
30//! The approach taken is to build two parallel trees of computation: one for
31//! the rows that have been successfully evaluated (the "oks tree"), and one for
32//! the errors that have been generated (the "errs tree"). For example:
33//!
34//! ```text
35//!    oks1  errs1       oks2  errs2
36//!      |     |           |     |
37//!      |     |           |     |
38//!   project  |           |     |
39//!      |     |           |     |
40//!      |     |           |     |
41//!     map    |           |     |
42//!      |\    |           |     |
43//!      | \   |           |     |
44//!      |  \  |           |     |
45//!      |   \ |           |     |
46//!      |    \|           |     |
47//!   project  +           +     +
48//!      |     |          /     /
49//!      |     |         /     /
50//!    join ------------+     /
51//!      |     |             /
52//!      |     | +----------+
53//!      |     |/
54//!     oks   errs
55//! ```
56//!
57//! The project operation cannot fail, so errors from errs1 are propagated
58//! directly. Map operators are fallible and so can inject additional errors
59//! into the stream. Join operators combine the errors from each of their
60//! inputs.
61//!
62//! The semantics of the error stream are minimal. From the perspective of SQL,
63//! a dataflow is considered to be in an error state if there is at least one
64//! element in the final errs collection. The error value returned to the user
65//! is selected arbitrarily; SQL only makes provisions to return one error to
66//! the user at a time. There are plans to make the err collection accessible to
67//! end users, so they can see all errors at once.
68//!
69//! To make errors transient, simply ensure that the operator can retract any
70//! produced errors when corrected data arrives. To make errors permanent, write
71//! the operator such that it never retracts the errors it produced. Future work
72//! will likely want to introduce some sort of sort order for errors, so that
73//! permanent errors are returned to the user ahead of transient errors—probably
74//! by introducing a new error type a la:
75//!
76//! ```no_run
77//! # struct EvalError;
78//! # struct SourceError;
79//! enum DataflowError {
80//!     Transient(EvalError),
81//!     Permanent(SourceError),
82//! }
83//! ```
84//!
85//! If the error stream is empty, the oks stream must be correct. If the error
86//! stream is non-empty, then there are no semantics for the oks stream. This is
87//! sufficient to support SQL in its current form, but is likely to be
88//! unsatisfactory long term. We suspect that we can continue to imbue the oks
89//! stream with semantics if we are very careful in describing what data should
90//! and should not be produced upon encountering an error. Roughly speaking, the
91//! oks stream could represent the correct result of the computation where all
92//! rows that caused an error have been pruned from the stream. There are
93//! strange and confusing questions here around foreign keys, though: what if
94//! the optimizer proves that a particular key must exist in a collection, but
95//! the key gets pruned away because its row participated in a scalar expression
96//! evaluation that errored?
97//!
98//! In the meantime, it is probably wise for operators to keep the oks stream
99//! roughly "as correct as possible" even when errors are present in the errs
100//! stream. This reduces the amount of recomputation that must be performed
101//! if/when the errors are retracted.
102
103use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123    COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124    COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125    ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129    self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use timely::PartialOrder;
141use timely::communication::Allocate;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Filter, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::{Product, TotalOrder};
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::{AsWorker, Worker as TimelyWorker};
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160    ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180/// Assemble the "compute"  side of a dataflow, i.e. all but the sources.
181///
182/// This method imports sources from provided assets, and then builds the remaining
183/// dataflow using "compute-local" assets like shared arrangements, and producing
184/// both arrangements and sinks.
185pub fn build_compute_dataflow<A: Allocate>(
186    timely_worker: &mut TimelyWorker<A>,
187    compute_state: &mut ComputeState,
188    dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189    start_signal: StartSignal,
190    until: Antichain<mz_repr::Timestamp>,
191    dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193    // Mutually recursive view definitions require special handling.
194    let recursive = dataflow
195        .objects_to_build
196        .iter()
197        .any(|object| object.plan.is_recursive());
198
199    // Determine indexes to export, and their dependencies.
200    let indexes = dataflow
201        .index_exports
202        .iter()
203        .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204        .collect::<Vec<_>>();
205
206    // Determine sinks to export, and their dependencies.
207    let sinks = dataflow
208        .sink_exports
209        .iter()
210        .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211        .collect::<Vec<_>>();
212
213    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214    let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215    let subscribe_snapshot_optimization =
216        SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
217
218    let name = format!("Dataflow: {}", &dataflow.debug_name);
219    let input_name = format!("InputRegion: {}", &dataflow.debug_name);
220    let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
221
222    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
223        // TODO(ct3): This should be a config of the source instead, but at least try
224        // to contain the hacks.
225        let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
226
227        // The scope.clone() occurs to allow import in the region.
228        // We build a region here to establish a pattern of a scope inside the dataflow,
229        // so that other similar uses (e.g. with iterative scopes) do not require weird
230        // alternate type signatures.
231        let mut imported_sources = Vec::new();
232        let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
233        let output_probe = MzProbeHandle::default();
234
235        scope.clone().region_named(&input_name, |region| {
236            // Import declared sources into the rendering context.
237            for (source_id, import) in dataflow.source_imports.iter() {
238                region.region_named(&format!("Source({:?})", source_id), |inner| {
239                    let mut read_schema = None;
240                    let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
241                        // If enabled, we read from Persist with a `RelationDesc` that
242                        // omits uneeded columns.
243                        if apply_demands {
244                            let demands = ops.demand();
245                            let new_desc = import
246                                .desc
247                                .storage_metadata
248                                .relation_desc
249                                .apply_demand(&demands);
250                            let new_arity = demands.len();
251                            let remap: BTreeMap<_, _> = demands
252                                .into_iter()
253                                .enumerate()
254                                .map(|(new, old)| (old, new))
255                                .collect();
256                            ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
257                            read_schema = Some(new_desc);
258                        }
259
260                        mz_expr::MfpPlan::create_from(ops)
261                            .expect("Linear operators should always be valid")
262                    });
263
264                    let mut snapshot_mode =
265                        if import.with_snapshot || !subscribe_snapshot_optimization {
266                            SnapshotMode::Include
267                        } else {
268                            compute_state.metrics.inc_subscribe_snapshot_optimization();
269                            SnapshotMode::Exclude
270                        };
271                    let mut suppress_early_progress_as_of = dataflow.as_of.clone();
272                    let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
273                    if let Some(x) = ct_source_transformer.as_ref() {
274                        snapshot_mode = x.snapshot_mode();
275                        suppress_early_progress_as_of = suppress_early_progress_as_of
276                            .map(|as_of| x.suppress_early_progress_as_of(as_of));
277                    }
278
279                    // Note: For correctness, we require that sources only emit times advanced by
280                    // `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
281                    let (mut ok_stream, err_stream, token) = persist_source::persist_source(
282                        inner,
283                        *source_id,
284                        Arc::clone(&compute_state.persist_clients),
285                        &compute_state.txns_ctx,
286                        import.desc.storage_metadata.clone(),
287                        read_schema,
288                        dataflow.as_of.clone(),
289                        snapshot_mode,
290                        until.clone(),
291                        mfp.as_mut(),
292                        compute_state.dataflow_max_inflight_bytes(),
293                        start_signal.clone(),
294                        ErrorHandler::Halt("compute_import"),
295                    );
296
297                    // If `mfp` is non-identity, we need to apply what remains.
298                    // For the moment, assert that it is either trivial or `None`.
299                    assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
300
301                    // To avoid a memory spike during arrangement hydration (database-issues#6368), need to
302                    // ensure that the first frontier we report into the dataflow is beyond the
303                    // `as_of`.
304                    if let Some(as_of) = suppress_early_progress_as_of {
305                        ok_stream = suppress_early_progress(ok_stream, as_of);
306                    }
307
308                    if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
309                        // Apply logical backpressure to the source.
310                        let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
311                            .get(&compute_state.worker_config);
312                        let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
313                            .get(&compute_state.worker_config)
314                            .as_millis()
315                            .try_into()
316                            .expect("must fit");
317
318                        let stream = ok_stream.limit_progress(
319                            output_probe.clone(),
320                            slack,
321                            limit,
322                            import.upper.clone(),
323                            name.clone(),
324                        );
325                        ok_stream = stream;
326                    }
327
328                    // Attach a probe reporting the input frontier.
329                    let input_probe =
330                        compute_state.input_probe_for(*source_id, dataflow.export_ids());
331                    ok_stream = ok_stream.probe_with(&input_probe);
332
333                    // The `suppress_early_progress` operator and the input
334                    // probe both want to work on the untransformed ct input,
335                    // make sure this stays after them.
336                    let (ok_stream, err_stream) = match ct_source_transformer {
337                        None => (ok_stream, err_stream),
338                        Some(ct_source_transformer) => {
339                            let (oks, errs, ct_times) = ct_source_transformer
340                                .transform(ok_stream.as_collection(), err_stream.as_collection());
341                            // TODO(ct3): Ideally this would be encapsulated by
342                            // ContinualTaskCtx, but the types are tricky.
343                            ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
344                            (oks.inner, errs.inner)
345                        }
346                    };
347
348                    let (oks, errs) = (
349                        ok_stream.as_collection().leave_region().leave_region(),
350                        err_stream.as_collection().leave_region().leave_region(),
351                    );
352
353                    imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
354
355                    // Associate returned tokens with the source identifier.
356                    tokens.insert(*source_id, Rc::new(token));
357                });
358            }
359        });
360
361        // If there exists a recursive expression, we'll need to use a non-region scope,
362        // in order to support additional timestamp coordinates for iteration.
363        if recursive {
364            scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
365                let mut context = Context::for_dataflow_in(
366                    &dataflow,
367                    region.clone(),
368                    compute_state,
369                    until,
370                    dataflow_expiration,
371                );
372
373                for (id, (oks, errs)) in imported_sources.into_iter() {
374                    let bundle = crate::render::CollectionBundle::from_collections(
375                        oks.enter(region),
376                        errs.enter(region),
377                    );
378                    // Associate collection bundle with the source identifier.
379                    context.insert_id(id, bundle);
380                }
381
382                // Import declared indexes into the rendering context.
383                for (idx_id, idx) in &dataflow.index_imports {
384                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
385                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
386                        SnapshotMode::Include
387                    } else {
388                        compute_state.metrics.inc_subscribe_snapshot_optimization();
389                        SnapshotMode::Exclude
390                    };
391                    context.import_index(
392                        compute_state,
393                        &mut tokens,
394                        input_probe,
395                        *idx_id,
396                        &idx.desc,
397                        snapshot_mode,
398                        start_signal.clone(),
399                    );
400                }
401
402                // Build declared objects.
403                for object in dataflow.objects_to_build {
404                    let bundle = context.scope.clone().region_named(
405                        &format!("BuildingObject({:?})", object.id),
406                        |region| {
407                            let depends = object.plan.depends();
408                            let in_let = object.plan.is_recursive();
409                            context
410                                .enter_region(region, Some(&depends))
411                                .render_recursive_plan(
412                                    object.id,
413                                    0,
414                                    object.plan,
415                                    // recursive plans _must_ have bodies in a let
416                                    BindingInfo::Body { in_let },
417                                )
418                                .leave_region()
419                        },
420                    );
421                    let global_id = object.id;
422
423                    context.log_dataflow_global_id(
424                        *bundle
425                            .scope()
426                            .addr()
427                            .first()
428                            .expect("Dataflow root id must exist"),
429                        global_id,
430                    );
431                    context.insert_id(Id::Global(object.id), bundle);
432                }
433
434                // Export declared indexes.
435                for (idx_id, dependencies, idx) in indexes {
436                    context.export_index_iterative(
437                        compute_state,
438                        &tokens,
439                        dependencies,
440                        idx_id,
441                        &idx,
442                        &output_probe,
443                    );
444                }
445
446                // Export declared sinks.
447                for (sink_id, dependencies, sink) in sinks {
448                    context.export_sink(
449                        compute_state,
450                        &tokens,
451                        dependencies,
452                        sink_id,
453                        &sink,
454                        start_signal.clone(),
455                        ct_ctx.input_times(&context.scope.parent),
456                        &output_probe,
457                    );
458                }
459            });
460        } else {
461            scope.clone().region_named(&build_name, |region| {
462                let mut context = Context::for_dataflow_in(
463                    &dataflow,
464                    region.clone(),
465                    compute_state,
466                    until,
467                    dataflow_expiration,
468                );
469
470                for (id, (oks, errs)) in imported_sources.into_iter() {
471                    let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
472                        let as_of = context.as_of_frontier.clone();
473                        let summary = TEMPORAL_BUCKETING_SUMMARY
474                            .get(&compute_state.worker_config)
475                            .try_into()
476                            .expect("must fit");
477                        oks.inner
478                            .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
479                            .as_collection()
480                    } else {
481                        oks
482                    };
483                    let bundle = crate::render::CollectionBundle::from_collections(
484                        oks.enter_region(region),
485                        errs.enter_region(region),
486                    );
487                    // Associate collection bundle with the source identifier.
488                    context.insert_id(id, bundle);
489                }
490
491                // Import declared indexes into the rendering context.
492                for (idx_id, idx) in &dataflow.index_imports {
493                    let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
494                    let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
495                        SnapshotMode::Include
496                    } else {
497                        compute_state.metrics.inc_subscribe_snapshot_optimization();
498                        SnapshotMode::Exclude
499                    };
500                    context.import_index(
501                        compute_state,
502                        &mut tokens,
503                        input_probe,
504                        *idx_id,
505                        &idx.desc,
506                        snapshot_mode,
507                        start_signal.clone(),
508                    );
509                }
510
511                // Build declared objects.
512                for object in dataflow.objects_to_build {
513                    let bundle = context.scope.clone().region_named(
514                        &format!("BuildingObject({:?})", object.id),
515                        |region| {
516                            let depends = object.plan.depends();
517                            context
518                                .enter_region(region, Some(&depends))
519                                .render_plan(object.id, object.plan)
520                                .leave_region()
521                        },
522                    );
523                    let global_id = object.id;
524                    context.log_dataflow_global_id(
525                        *bundle
526                            .scope()
527                            .addr()
528                            .first()
529                            .expect("Dataflow root id must exist"),
530                        global_id,
531                    );
532                    context.insert_id(Id::Global(object.id), bundle);
533                }
534
535                // Export declared indexes.
536                for (idx_id, dependencies, idx) in indexes {
537                    context.export_index(
538                        compute_state,
539                        &tokens,
540                        dependencies,
541                        idx_id,
542                        &idx,
543                        &output_probe,
544                    );
545                }
546
547                // Export declared sinks.
548                for (sink_id, dependencies, sink) in sinks {
549                    context.export_sink(
550                        compute_state,
551                        &tokens,
552                        dependencies,
553                        sink_id,
554                        &sink,
555                        start_signal.clone(),
556                        ct_ctx.input_times(&context.scope.parent),
557                        &output_probe,
558                    );
559                }
560            });
561        }
562    })
563}
564
565// This implementation block allows child timestamps to vary from parent timestamps,
566// but requires the parent timestamp to be `repr::Timestamp`.
567impl<'g, G, T> Context<Child<'g, G, T>>
568where
569    G: Scope<Timestamp = mz_repr::Timestamp>,
570    T: Refines<G::Timestamp> + RenderTimestamp,
571{
572    /// Import the collection from the arrangement, discarding batches from the snapshot.
573    /// (This does not guarantee that no records from the snapshot are included; the assumption is
574    /// that we'll filter those out later if necessary.)
575    fn import_filtered_index_collection<Tr: TraceReader<Time = G::Timestamp> + Clone, V: Data>(
576        &self,
577        arranged: Arranged<G, Tr>,
578        start_signal: StartSignal,
579        mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
580    ) -> VecCollection<Child<'g, G, T>, V, Tr::Diff>
581    where
582        // This is implied by the fact that G::Timestamp = mz_repr::Timestamp, but it's essential
583        // for our batch-level filtering to be safe, so we document it here regardless.
584        G::Timestamp: TotalOrder,
585    {
586        let oks = arranged.stream.with_start_signal(start_signal).filter({
587            let as_of = self.as_of_frontier.clone();
588            move |b| !<Antichain<G::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
589        });
590        Arranged::<G, Tr>::flat_map_batches(&oks, move |a, b| [logic(a, b)]).enter(&self.scope)
591    }
592
593    pub(crate) fn import_index(
594        &mut self,
595        compute_state: &mut ComputeState,
596        tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
597        input_probe: probe::Handle<mz_repr::Timestamp>,
598        idx_id: GlobalId,
599        idx: &IndexDesc,
600        snapshot_mode: SnapshotMode,
601        start_signal: StartSignal,
602    ) {
603        if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
604            assert!(
605                PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
606                "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
607            );
608
609            let token = traces.to_drop().clone();
610
611            let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
612                &self.scope.parent,
613                &format!("Index({}, {:?})", idx.on_id, idx.key),
614                self.as_of_frontier.clone(),
615                self.until.clone(),
616            );
617
618            oks.stream = oks.stream.probe_with(&input_probe);
619
620            let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
621                &self.scope.parent,
622                &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
623                self.as_of_frontier.clone(),
624                self.until.clone(),
625            );
626
627            let bundle = match snapshot_mode {
628                SnapshotMode::Include => {
629                    let ok_arranged = oks
630                        .enter(&self.scope)
631                        .with_start_signal(start_signal.clone());
632                    let err_arranged = err_arranged
633                        .enter(&self.scope)
634                        .with_start_signal(start_signal);
635                    CollectionBundle::from_expressions(
636                        idx.key.clone(),
637                        ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
638                    )
639                }
640                SnapshotMode::Exclude => {
641                    // When we import an index without a snapshot, we have two balancing considerations:
642                    // - It's easy to filter out irrelevant batches from the stream, but hard to filter them out from an arrangement.
643                    //   (The `TraceFrontier` wrapper allows us to set an "until" frontier, but not a lower.)
644                    // - We do not actually need to reference the arrangement in this dataflow, since all operators that use the arrangement
645                    //   (joins, reduces, etc.) also require the snapshot data.
646                    // So: when the snapshot is excluded, we import only the (filtered) collection itself and ignore the arrangement.
647                    let oks = {
648                        let mut datums = DatumVec::new();
649                        self.import_filtered_index_collection(
650                            oks,
651                            start_signal.clone(),
652                            move |k: DatumSeq, v: DatumSeq| {
653                                let mut datums_borrow = datums.borrow();
654                                datums_borrow.extend(k);
655                                datums_borrow.extend(v);
656                                SharedRow::pack(&**datums_borrow)
657                            },
658                        )
659                    };
660                    let errs = self.import_filtered_index_collection(
661                        err_arranged,
662                        start_signal,
663                        |e, _| e.clone(),
664                    );
665                    CollectionBundle::from_collections(oks, errs)
666                }
667            };
668            self.update_id(Id::Global(idx.on_id), bundle);
669            tokens.insert(
670                idx_id,
671                Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
672            );
673        } else {
674            panic!(
675                "import of index {} failed while building dataflow {}",
676                idx_id, self.dataflow_id
677            );
678        }
679    }
680}
681
682// This implementation block requires the scopes have the same timestamp as the trace manager.
683// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
684impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
685where
686    G: Scope<Timestamp = mz_repr::Timestamp>,
687{
688    pub(crate) fn export_index(
689        &self,
690        compute_state: &mut ComputeState,
691        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
692        dependency_ids: BTreeSet<GlobalId>,
693        idx_id: GlobalId,
694        idx: &IndexDesc,
695        output_probe: &MzProbeHandle<G::Timestamp>,
696    ) {
697        // put together tokens that belong to the export
698        let mut needed_tokens = Vec::new();
699        for dep_id in dependency_ids {
700            if let Some(token) = tokens.get(&dep_id) {
701                needed_tokens.push(Rc::clone(token));
702            }
703        }
704        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
705            panic!(
706                "Arrangement alarmingly absent! id: {:?}",
707                Id::Global(idx_id)
708            )
709        });
710
711        match bundle.arrangement(&idx.key) {
712            Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
713                // Ensure that the frontier does not advance past the expiration time, if set.
714                // Otherwise, we might write down incorrect data.
715                if let Some(&expiration) = self.dataflow_expiration.as_option() {
716                    oks.stream = oks.stream.expire_stream_at(
717                        &format!("{}_export_index_oks", self.debug_name),
718                        expiration,
719                    );
720                    errs.stream = errs.stream.expire_stream_at(
721                        &format!("{}_export_index_errs", self.debug_name),
722                        expiration,
723                    );
724                }
725
726                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
727
728                // Attach logging of dataflow errors.
729                if let Some(logger) = compute_state.compute_logger.clone() {
730                    errs.stream.log_dataflow_errors(logger, idx_id);
731                }
732
733                compute_state.traces.set(
734                    idx_id,
735                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
736                );
737            }
738            Some(ArrangementFlavor::Trace(gid, _, _)) => {
739                // Duplicate of existing arrangement with id `gid`, so
740                // just create another handle to that arrangement.
741                let trace = compute_state.traces.get(&gid).unwrap().clone();
742                compute_state.traces.set(idx_id, trace);
743            }
744            None => {
745                println!("collection available: {:?}", bundle.collection.is_none());
746                println!(
747                    "keys available: {:?}",
748                    bundle.arranged.keys().collect::<Vec<_>>()
749                );
750                panic!(
751                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
752                    Id::Global(idx_id),
753                    &idx.key
754                );
755            }
756        };
757    }
758}
759
760// This implementation block requires the scopes have the same timestamp as the trace manager.
761// That makes some sense, because we are hoping to deposit an arrangement in the trace manager.
762impl<'g, G, T> Context<Child<'g, G, T>>
763where
764    G: Scope<Timestamp = mz_repr::Timestamp>,
765    T: RenderTimestamp,
766{
767    pub(crate) fn export_index_iterative(
768        &self,
769        compute_state: &mut ComputeState,
770        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
771        dependency_ids: BTreeSet<GlobalId>,
772        idx_id: GlobalId,
773        idx: &IndexDesc,
774        output_probe: &MzProbeHandle<G::Timestamp>,
775    ) {
776        // put together tokens that belong to the export
777        let mut needed_tokens = Vec::new();
778        for dep_id in dependency_ids {
779            if let Some(token) = tokens.get(&dep_id) {
780                needed_tokens.push(Rc::clone(token));
781            }
782        }
783        let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
784            panic!(
785                "Arrangement alarmingly absent! id: {:?}",
786                Id::Global(idx_id)
787            )
788        });
789
790        match bundle.arrangement(&idx.key) {
791            Some(ArrangementFlavor::Local(oks, errs)) => {
792                // TODO: The following as_collection/leave/arrange sequence could be optimized.
793                //   * Combine as_collection and leave into a single function.
794                //   * Use columnar to extract columns from the batches to implement leave.
795                let mut oks = oks
796                    .as_collection(|k, v| (k.to_row(), v.to_row()))
797                    .leave()
798                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
799                    "Arrange export iterative",
800                );
801
802                let mut errs = errs
803                    .as_collection(|k, v| (k.clone(), v.clone()))
804                    .leave()
805                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
806                        "Arrange export iterative err",
807                    );
808
809                // Ensure that the frontier does not advance past the expiration time, if set.
810                // Otherwise, we might write down incorrect data.
811                if let Some(&expiration) = self.dataflow_expiration.as_option() {
812                    oks.stream = oks.stream.expire_stream_at(
813                        &format!("{}_export_index_iterative_oks", self.debug_name),
814                        expiration,
815                    );
816                    errs.stream = errs.stream.expire_stream_at(
817                        &format!("{}_export_index_iterative_err", self.debug_name),
818                        expiration,
819                    );
820                }
821
822                oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
823
824                // Attach logging of dataflow errors.
825                if let Some(logger) = compute_state.compute_logger.clone() {
826                    errs.stream.log_dataflow_errors(logger, idx_id);
827                }
828
829                compute_state.traces.set(
830                    idx_id,
831                    TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
832                );
833            }
834            Some(ArrangementFlavor::Trace(gid, _, _)) => {
835                // Duplicate of existing arrangement with id `gid`, so
836                // just create another handle to that arrangement.
837                let trace = compute_state.traces.get(&gid).unwrap().clone();
838                compute_state.traces.set(idx_id, trace);
839            }
840            None => {
841                println!("collection available: {:?}", bundle.collection.is_none());
842                println!(
843                    "keys available: {:?}",
844                    bundle.arranged.keys().collect::<Vec<_>>()
845                );
846                panic!(
847                    "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
848                    Id::Global(idx_id),
849                    &idx.key
850                );
851            }
852        };
853    }
854}
855
856/// Information about bindings, tracked in `render_recursive_plan` and
857/// `render_plan`, to be passed to `render_letfree_plan`.
858///
859/// `render_letfree_plan` uses these to produce nice output (e.g., `With ...
860/// Returning ...`) for local bindings in the `mz_lir_mapping` output.
861enum BindingInfo {
862    Body { in_let: bool },
863    Let { id: LocalId, last: bool },
864    LetRec { id: LocalId, last: bool },
865}
866
867impl<G> Context<G>
868where
869    G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
870{
871    /// Renders a plan to a differential dataflow, producing the collection of results.
872    ///
873    /// This method allows for `plan` to contain [`RecBind`]s, and is planned
874    /// in the context of `level` pre-existing iteration coordinates.
875    ///
876    /// This method recursively descends [`RecBind`] values, establishing nested scopes for each
877    /// and establishing the appropriate recursive dependencies among the bound variables.
878    /// Once all [`RecBind`]s have been rendered it calls in to `render_plan` which will error if
879    /// further [`RecBind`]s are found.
880    ///
881    /// The method requires that all variables conclude with a physical representation that
882    /// contains a collection (i.e. a non-arrangement), and it will panic otherwise.
883    fn render_recursive_plan(
884        &mut self,
885        object_id: GlobalId,
886        level: usize,
887        plan: RenderPlan,
888        binding: BindingInfo,
889    ) -> CollectionBundle<G> {
890        for BindStage { lets, recs } in plan.binds {
891            // Render the let bindings in order.
892            let mut let_iter = lets.into_iter().peekable();
893            while let Some(LetBind { id, value }) = let_iter.next() {
894                let bundle =
895                    self.scope
896                        .clone()
897                        .region_named(&format!("Binding({:?})", id), |region| {
898                            let depends = value.depends();
899                            let last = let_iter.peek().is_none();
900                            let binding = BindingInfo::Let { id, last };
901                            self.enter_region(region, Some(&depends))
902                                .render_letfree_plan(object_id, value, binding)
903                                .leave_region()
904                        });
905                self.insert_id(Id::Local(id), bundle);
906            }
907
908            let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
909
910            // Define variables for rec bindings.
911            // It is important that we only use the `SemigroupVariable` until the object is bound.
912            // At that point, all subsequent uses should have access to the object itself.
913            let mut variables = BTreeMap::new();
914            for id in rec_ids.iter() {
915                use differential_dataflow::dynamic::feedback_summary;
916                let inner = feedback_summary::<u64>(level + 1, 1);
917                let oks_v = SemigroupVariable::new(
918                    &mut self.scope,
919                    Product::new(Default::default(), inner.clone()),
920                );
921                let err_v = SemigroupVariable::new(
922                    &mut self.scope,
923                    Product::new(Default::default(), inner),
924                );
925
926                self.insert_id(
927                    Id::Local(*id),
928                    CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
929                );
930                variables.insert(Id::Local(*id), (oks_v, err_v));
931            }
932            // Now render each of the rec bindings.
933            let mut rec_iter = recs.into_iter().peekable();
934            while let Some(RecBind { id, value, limit }) = rec_iter.next() {
935                let last = rec_iter.peek().is_none();
936                let binding = BindingInfo::LetRec { id, last };
937                let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
938                // We need to ensure that the raw collection exists, but do not have enough information
939                // here to cause that to happen.
940                let (oks, mut err) = bundle.collection.clone().unwrap();
941                self.insert_id(Id::Local(id), bundle);
942                let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
943
944                // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point.
945                let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
946
947                if let Some(limit) = limit {
948                    // We swallow the results of the `max_iter`th iteration, because
949                    // these results would go into the `max_iter + 1`th iteration.
950                    let (in_limit, over_limit) =
951                        oks.inner.branch_when(move |Product { inner: ps, .. }| {
952                            // The iteration number, or if missing a zero (as trailing zeros are truncated).
953                            let iteration_index = *ps.get(level).unwrap_or(&0);
954                            // The pointstamp starts counting from 0, so we need to add 1.
955                            iteration_index + 1 >= limit.max_iters.into()
956                        });
957                    oks = VecCollection::new(in_limit);
958                    if !limit.return_at_limit {
959                        err = err.concat(&VecCollection::new(over_limit).map(move |_data| {
960                            DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
961                                format!("{}", limit.max_iters.get()).into(),
962                            )))
963                        }));
964                    }
965                }
966
967                // Set err variable to the distinct elements of `err`.
968                // Distinctness is important, as we otherwise might add the same error each iteration,
969                // say if the limit of `oks` has an error. This would result in non-terminating rather
970                // than a clean report of the error. The trade-off is that we lose information about
971                // multiplicities of errors, but .. this seems to be the better call.
972                let err: KeyCollection<_, _, _> = err.into();
973                let errs = err
974                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
975                        "Arrange recursive err",
976                    )
977                    .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
978                        "Distinct recursive err",
979                        move |_k, _s, t| t.push(((), Diff::ONE)),
980                    )
981                    .as_collection(|k, _| k.clone());
982
983                oks_v.set(&oks);
984                err_v.set(&errs);
985            }
986            // Now extract each of the rec bindings into the outer scope.
987            for id in rec_ids.into_iter() {
988                let bundle = self.remove_id(Id::Local(id)).unwrap();
989                let (oks, err) = bundle.collection.unwrap();
990                self.insert_id(
991                    Id::Local(id),
992                    CollectionBundle::from_collections(
993                        oks.leave_dynamic(level + 1),
994                        err.leave_dynamic(level + 1),
995                    ),
996                );
997            }
998        }
999
1000        self.render_letfree_plan(object_id, plan.body, binding)
1001    }
1002}
1003
1004impl<G> Context<G>
1005where
1006    G: Scope,
1007    G::Timestamp: RenderTimestamp,
1008{
1009    /// Renders a non-recursive plan to a differential dataflow, producing the collection of
1010    /// results.
1011    ///
1012    /// The return type reflects the uncertainty about the data representation, perhaps
1013    /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
1014    ///
1015    /// # Panics
1016    ///
1017    /// Panics if the given plan contains any [`RecBind`]s. Recursive plans must be rendered using
1018    /// `render_recursive_plan` instead.
1019    fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
1020        let mut in_let = false;
1021        for BindStage { lets, recs } in plan.binds {
1022            assert!(recs.is_empty());
1023
1024            let mut let_iter = lets.into_iter().peekable();
1025            while let Some(LetBind { id, value }) = let_iter.next() {
1026                // if we encounter a single let, the body is in a let
1027                in_let = true;
1028                let bundle =
1029                    self.scope
1030                        .clone()
1031                        .region_named(&format!("Binding({:?})", id), |region| {
1032                            let depends = value.depends();
1033                            let last = let_iter.peek().is_none();
1034                            let binding = BindingInfo::Let { id, last };
1035                            self.enter_region(region, Some(&depends))
1036                                .render_letfree_plan(object_id, value, binding)
1037                                .leave_region()
1038                        });
1039                self.insert_id(Id::Local(id), bundle);
1040            }
1041        }
1042
1043        self.scope.clone().region_named("Main Body", |region| {
1044            let depends = plan.body.depends();
1045            self.enter_region(region, Some(&depends))
1046                .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1047                .leave_region()
1048        })
1049    }
1050
1051    /// Renders a let-free plan to a differential dataflow, producing the collection of results.
1052    fn render_letfree_plan(
1053        &mut self,
1054        object_id: GlobalId,
1055        plan: LetFreePlan,
1056        binding: BindingInfo,
1057    ) -> CollectionBundle<G> {
1058        let (mut nodes, root_id, topological_order) = plan.destruct();
1059
1060        // Rendered collections by their `LirId`.
1061        let mut collections = BTreeMap::new();
1062
1063        // Mappings to send along.
1064        // To save overhead, we'll only compute mappings when we need to,
1065        // which means things get gated behind options. Unfortunately, that means we
1066        // have several `Option<...>` types that are _all_ `Some` or `None` together,
1067        // but there's no convenient way to express the invariant.
1068        let should_compute_lir_metadata = self.compute_logger.is_some();
1069        let mut lir_mapping_metadata = if should_compute_lir_metadata {
1070            Some(Vec::with_capacity(nodes.len()))
1071        } else {
1072            None
1073        };
1074
1075        let mut topo_iter = topological_order.into_iter().peekable();
1076        while let Some(lir_id) = topo_iter.next() {
1077            let node = nodes.remove(&lir_id).unwrap();
1078
1079            // TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
1080            // ActiveComputeState can't have a catalog reference, so we'll need to capture the names
1081            // in some other structure and have that structure impl ExprHumanizer
1082            let metadata = if should_compute_lir_metadata {
1083                let operator = node.expr.humanize(&DummyHumanizer);
1084
1085                // mark the last operator in topo order with any binding decoration
1086                let operator = if topo_iter.peek().is_none() {
1087                    match &binding {
1088                        BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1089                        BindingInfo::Body { in_let: false } => operator,
1090                        BindingInfo::Let { id, last: true } => {
1091                            format!("With {id} = {operator}")
1092                        }
1093                        BindingInfo::Let { id, last: false } => {
1094                            format!("{id} = {operator}")
1095                        }
1096                        BindingInfo::LetRec { id, last: true } => {
1097                            format!("With Recursive {id} = {operator}")
1098                        }
1099                        BindingInfo::LetRec { id, last: false } => {
1100                            format!("{id} = {operator}")
1101                        }
1102                    }
1103                } else {
1104                    operator
1105                };
1106
1107                let operator_id_start = self.scope.peek_identifier();
1108                Some((operator, operator_id_start))
1109            } else {
1110                None
1111            };
1112
1113            let mut bundle = self.render_plan_expr(node.expr, &collections);
1114
1115            if let Some((operator, operator_id_start)) = metadata {
1116                let operator_id_end = self.scope.peek_identifier();
1117                let operator_span = (operator_id_start, operator_id_end);
1118
1119                if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1120                    lir_mapping_metadata.push((
1121                        lir_id,
1122                        LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1123                    ))
1124                }
1125            }
1126
1127            self.log_operator_hydration(&mut bundle, lir_id);
1128
1129            collections.insert(lir_id, bundle);
1130        }
1131
1132        if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1133            self.log_lir_mapping(object_id, lir_mapping_metadata);
1134        }
1135
1136        collections
1137            .remove(&root_id)
1138            .expect("LetFreePlan invariant (1)")
1139    }
1140
1141    /// Renders a [`render_plan::Expr`], producing the collection of results.
1142    ///
1143    /// # Panics
1144    ///
1145    /// Panics if any of the expr's inputs is not found in `collections`.
1146    /// Callers must ensure that input nodes have been rendered previously.
1147    fn render_plan_expr(
1148        &mut self,
1149        expr: render_plan::Expr,
1150        collections: &BTreeMap<LirId, CollectionBundle<G>>,
1151    ) -> CollectionBundle<G> {
1152        use render_plan::Expr::*;
1153
1154        let expect_input = |id| {
1155            collections
1156                .get(&id)
1157                .cloned()
1158                .unwrap_or_else(|| panic!("missing input collection: {id}"))
1159        };
1160
1161        match expr {
1162            Constant { rows } => {
1163                // Produce both rows and errs to avoid conditional dataflow construction.
1164                let (rows, errs) = match rows {
1165                    Ok(rows) => (rows, Vec::new()),
1166                    Err(e) => (Vec::new(), vec![e]),
1167                };
1168
1169                // We should advance times in constant collections to start from `as_of`.
1170                let as_of_frontier = self.as_of_frontier.clone();
1171                let until = self.until.clone();
1172                let ok_collection = rows
1173                    .into_iter()
1174                    .filter_map(move |(row, mut time, diff)| {
1175                        time.advance_by(as_of_frontier.borrow());
1176                        if !until.less_equal(&time) {
1177                            Some((
1178                                row,
1179                                <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1180                                diff,
1181                            ))
1182                        } else {
1183                            None
1184                        }
1185                    })
1186                    .to_stream(&mut self.scope)
1187                    .as_collection();
1188
1189                let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1190                error_time.advance_by(self.as_of_frontier.borrow());
1191                let err_collection = errs
1192                    .into_iter()
1193                    .map(move |e| {
1194                        (
1195                            DataflowError::from(e),
1196                            <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1197                            Diff::ONE,
1198                        )
1199                    })
1200                    .to_stream(&mut self.scope)
1201                    .as_collection();
1202
1203                CollectionBundle::from_collections(ok_collection, err_collection)
1204            }
1205            Get { id, keys, plan } => {
1206                // Recover the collection from `self` and then apply `mfp` to it.
1207                // If `mfp` happens to be trivial, we can just return the collection.
1208                let mut collection = self
1209                    .lookup_id(id)
1210                    .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1211                match plan {
1212                    mz_compute_types::plan::GetPlan::PassArrangements => {
1213                        // Assert that each of `keys` are present in `collection`.
1214                        assert!(
1215                            keys.arranged
1216                                .iter()
1217                                .all(|(key, _, _)| collection.arranged.contains_key(key))
1218                        );
1219                        assert!(keys.raw <= collection.collection.is_some());
1220                        // Retain only those keys we want to import.
1221                        collection.arranged.retain(|key, _value| {
1222                            keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1223                        });
1224                        collection
1225                    }
1226                    mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1227                        let (oks, errs) = collection.as_collection_core(
1228                            mfp,
1229                            Some((key, row)),
1230                            self.until.clone(),
1231                            &self.config_set,
1232                        );
1233                        CollectionBundle::from_collections(oks, errs)
1234                    }
1235                    mz_compute_types::plan::GetPlan::Collection(mfp) => {
1236                        let (oks, errs) = collection.as_collection_core(
1237                            mfp,
1238                            None,
1239                            self.until.clone(),
1240                            &self.config_set,
1241                        );
1242                        CollectionBundle::from_collections(oks, errs)
1243                    }
1244                }
1245            }
1246            Mfp {
1247                input,
1248                mfp,
1249                input_key_val,
1250            } => {
1251                let input = expect_input(input);
1252                // If `mfp` is non-trivial, we should apply it and produce a collection.
1253                if mfp.is_identity() {
1254                    input
1255                } else {
1256                    let (oks, errs) = input.as_collection_core(
1257                        mfp,
1258                        input_key_val,
1259                        self.until.clone(),
1260                        &self.config_set,
1261                    );
1262                    CollectionBundle::from_collections(oks, errs)
1263                }
1264            }
1265            FlatMap {
1266                input_key,
1267                input,
1268                exprs,
1269                func,
1270                mfp_after: mfp,
1271            } => {
1272                let input = expect_input(input);
1273                self.render_flat_map(input_key, input, exprs, func, mfp)
1274            }
1275            Join { inputs, plan } => {
1276                let inputs = inputs.into_iter().map(expect_input).collect();
1277                match plan {
1278                    mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1279                        self.render_join(inputs, linear_plan)
1280                    }
1281                    mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1282                        self.render_delta_join(inputs, delta_plan)
1283                    }
1284                }
1285            }
1286            Reduce {
1287                input_key,
1288                input,
1289                key_val_plan,
1290                plan,
1291                mfp_after,
1292            } => {
1293                let input = expect_input(input);
1294                let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1295                self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1296            }
1297            TopK { input, top_k_plan } => {
1298                let input = expect_input(input);
1299                self.render_topk(input, top_k_plan)
1300            }
1301            Negate { input } => {
1302                let input = expect_input(input);
1303                let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1304                CollectionBundle::from_collections(oks.negate(), errs)
1305            }
1306            Threshold {
1307                input,
1308                threshold_plan,
1309            } => {
1310                let input = expect_input(input);
1311                self.render_threshold(input, threshold_plan)
1312            }
1313            Union {
1314                inputs,
1315                consolidate_output,
1316            } => {
1317                let mut oks = Vec::new();
1318                let mut errs = Vec::new();
1319                for input in inputs.into_iter() {
1320                    let (os, es) =
1321                        expect_input(input).as_specific_collection(None, &self.config_set);
1322                    oks.push(os);
1323                    errs.push(es);
1324                }
1325                let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1326                if consolidate_output {
1327                    oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1328                }
1329                let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1330                CollectionBundle::from_collections(oks, errs)
1331            }
1332            ArrangeBy {
1333                input_key,
1334                input,
1335                input_mfp,
1336                forms: keys,
1337            } => {
1338                let input = expect_input(input);
1339                input.ensure_collections(
1340                    keys,
1341                    input_key,
1342                    input_mfp,
1343                    self.until.clone(),
1344                    &self.config_set,
1345                )
1346            }
1347        }
1348    }
1349
1350    fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1351        if let Some(logger) = &self.compute_logger {
1352            logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1353                dataflow_index,
1354                global_id,
1355            }));
1356        }
1357    }
1358
1359    fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1360        if let Some(logger) = &self.compute_logger {
1361            logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1362        }
1363    }
1364
1365    fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1366        // A `CollectionBundle` can contain more than one collection, which makes it not obvious to
1367        // which we should attach the logging operator.
1368        //
1369        // We could attach to each collection and track the lower bound of output frontiers.
1370        // However, that would be of limited use because we expect all collections to hydrate at
1371        // roughly the same time: The `ArrangeBy` operator is not fueled, so as soon as it sees the
1372        // frontier of the unarranged collection advance, it will perform all work necessary to
1373        // also advance its own frontier. We don't expect significant delays between frontier
1374        // advancements of the unarranged and arranged collections, so attaching the logging
1375        // operator to any one of them should produce accurate results.
1376        //
1377        // If the `CollectionBundle` contains both unarranged and arranged representations it is
1378        // beneficial to attach the logging operator to one of the arranged representation to avoid
1379        // unnecessary cloning of data. The unarranged collection feeds into the arrangements, so
1380        // if we attached the logging operator to it, we would introduce a fork in its output
1381        // stream, which would necessitate that all output data is cloned. In contrast, we can hope
1382        // that the output streams of the arrangements don't yet feed into anything else, so
1383        // attaching a (pass-through) logging operator does not introduce a fork.
1384
1385        match bundle.arranged.values_mut().next() {
1386            Some(arrangement) => {
1387                use ArrangementFlavor::*;
1388
1389                match arrangement {
1390                    Local(a, _) => {
1391                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1392                    }
1393                    Trace(_, a, _) => {
1394                        a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1395                    }
1396                }
1397            }
1398            None => {
1399                let (oks, _) = bundle
1400                    .collection
1401                    .as_mut()
1402                    .expect("CollectionBundle invariant");
1403                let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1404                *oks = stream.as_collection();
1405            }
1406        }
1407    }
1408
1409    fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1410    where
1411        D: Clone + 'static,
1412    {
1413        let Some(logger) = self.compute_logger.clone() else {
1414            return stream.clone(); // hydration logging disabled
1415        };
1416
1417        let export_ids = self.export_ids.clone();
1418
1419        // Convert the dataflow as-of into a frontier we can compare with input frontiers.
1420        //
1421        // We (somewhat arbitrarily) define operators in iterative scopes to be hydrated when their
1422        // frontier advances to an outer time that's greater than the `as_of`. Comparing
1423        // `refine(as_of) < input_frontier` would find the moment when the first iteration was
1424        // complete, which is not what we want. We want `refine(as_of + 1) <= input_frontier`
1425        // instead.
1426        let mut hydration_frontier = Antichain::new();
1427        for time in self.as_of_frontier.iter() {
1428            if let Some(time) = time.try_step_forward() {
1429                hydration_frontier.insert(Refines::to_inner(time));
1430            }
1431        }
1432
1433        let name = format!("LogOperatorHydration ({lir_id})");
1434        stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1435            let mut hydrated = false;
1436
1437            for &export_id in &export_ids {
1438                logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1439                    export_id,
1440                    lir_id,
1441                    hydrated,
1442                }));
1443            }
1444
1445            move |(input, frontier), output| {
1446                // Pass through inputs.
1447                input.for_each(|cap, data| {
1448                    output.session(&cap).give_container(data);
1449                });
1450
1451                if hydrated {
1452                    return;
1453                }
1454
1455                if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1456                    hydrated = true;
1457
1458                    for &export_id in &export_ids {
1459                        logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1460                            export_id,
1461                            lir_id,
1462                            hydrated,
1463                        }));
1464                    }
1465                }
1466            }
1467        })
1468    }
1469}
1470
1471#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
1472/// A timestamp type that can be used for operations within MZ's dataflow layer.
1473pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1474    /// The system timestamp component of the timestamp.
1475    ///
1476    /// This is useful for manipulating the system time, as when delaying
1477    /// updates for subsequent cancellation, as with monotonic reduction.
1478    fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1479    /// Effects a system delay in terms of the timestamp summary.
1480    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1481    /// The event timestamp component of the timestamp.
1482    fn event_time(&self) -> mz_repr::Timestamp;
1483    /// The event timestamp component of the timestamp, as a mutable reference.
1484    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1485    /// Effects an event delay in terms of the timestamp summary.
1486    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1487    /// Steps the timestamp back so that logical compaction to the output will
1488    /// not conflate `self` with any historical times.
1489    fn step_back(&self) -> Self;
1490}
1491
1492impl RenderTimestamp for mz_repr::Timestamp {
1493    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1494        self
1495    }
1496    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1497        delay
1498    }
1499    fn event_time(&self) -> mz_repr::Timestamp {
1500        *self
1501    }
1502    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1503        self
1504    }
1505    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1506        delay
1507    }
1508    fn step_back(&self) -> Self {
1509        self.saturating_sub(1)
1510    }
1511}
1512
1513impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1514    fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1515        &mut self.outer
1516    }
1517    fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1518        Product::new(delay, Default::default())
1519    }
1520    fn event_time(&self) -> mz_repr::Timestamp {
1521        self.outer
1522    }
1523    fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1524        &mut self.outer
1525    }
1526    fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1527        Product::new(delay, Default::default())
1528    }
1529    fn step_back(&self) -> Self {
1530        // It is necessary to step back both coordinates of a product,
1531        // and when one is a `PointStamp` that also means all coordinates
1532        // of the pointstamp.
1533        let inner = self.inner.clone();
1534        let mut vec = inner.into_vec();
1535        for item in vec.iter_mut() {
1536            *item = item.saturating_sub(1);
1537        }
1538        Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1539    }
1540}
1541
1542/// A signal that can be awaited by operators to suspend them prior to startup.
1543///
1544/// Creating a signal also yields a token, dropping of which causes the signal to fire.
1545///
1546/// `StartSignal` is designed to be usable by both async and sync Timely operators.
1547///
1548///  * Async operators can simply `await` it.
1549///  * Sync operators should register an [`ActivateOnDrop`] value via [`StartSignal::drop_on_fire`]
1550///    and then check `StartSignal::has_fired()` on each activation.
1551#[derive(Clone)]
1552pub(crate) struct StartSignal {
1553    /// A future that completes when the signal fires.
1554    ///
1555    /// The inner type is `Infallible` because no data is ever expected on this channel. Instead the
1556    /// signal is activated by dropping the corresponding `Sender`.
1557    fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1558    /// A weak reference to the token, to register drop-on-fire values.
1559    token_ref: Weak<RefCell<Box<dyn Any>>>,
1560}
1561
1562impl StartSignal {
1563    /// Create a new `StartSignal` and a corresponding token that activates the signal when
1564    /// dropped.
1565    pub fn new() -> (Self, Rc<dyn Any>) {
1566        let (tx, rx) = oneshot::channel::<Infallible>();
1567        let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1568        let signal = Self {
1569            fut: rx.shared(),
1570            token_ref: Rc::downgrade(&token),
1571        };
1572        (signal, token)
1573    }
1574
1575    pub fn has_fired(&self) -> bool {
1576        self.token_ref.strong_count() == 0
1577    }
1578
1579    pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1580        if let Some(token) = self.token_ref.upgrade() {
1581            let mut token = token.borrow_mut();
1582            let inner = std::mem::replace(&mut *token, Box::new(()));
1583            *token = Box::new((inner, to_drop));
1584        }
1585    }
1586}
1587
1588impl Future for StartSignal {
1589    type Output = ();
1590
1591    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1592        self.fut.poll_unpin(cx).map(|_| ())
1593    }
1594}
1595
1596/// Extension trait to attach a `StartSignal` to operator outputs.
1597pub(crate) trait WithStartSignal {
1598    /// Delays data and progress updates until the start signal has fired.
1599    ///
1600    /// Note that this operator needs to buffer all incoming data, so it has some memory footprint,
1601    /// depending on the amount and shape of its inputs.
1602    fn with_start_signal(self, signal: StartSignal) -> Self;
1603}
1604
1605impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1606where
1607    S: Scope,
1608    S::Timestamp: RenderTimestamp,
1609    Tr: TraceReader + Clone,
1610{
1611    fn with_start_signal(self, signal: StartSignal) -> Self {
1612        Arranged {
1613            stream: self.stream.with_start_signal(signal),
1614            trace: self.trace,
1615        }
1616    }
1617}
1618
1619impl<S, D> WithStartSignal for Stream<S, D>
1620where
1621    S: Scope,
1622    D: timely::Data,
1623{
1624    fn with_start_signal(self, signal: StartSignal) -> Self {
1625        self.unary(Pipeline, "StartSignal", |_cap, info| {
1626            let token = Box::new(ActivateOnDrop::new(
1627                (),
1628                info.address,
1629                self.scope().activations(),
1630            ));
1631            signal.drop_on_fire(token);
1632
1633            let mut stash = Vec::new();
1634
1635            move |input, output| {
1636                // Stash incoming updates as long as the start signal has not fired.
1637                if !signal.has_fired() {
1638                    input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1639                    return;
1640                }
1641
1642                // Release any data we might still have stashed.
1643                for (cap, mut data) in std::mem::take(&mut stash) {
1644                    output.session(&cap).give_container(&mut data);
1645                }
1646
1647                // Pass through all remaining input data.
1648                input.for_each(|cap, data| {
1649                    output.session(&cap).give_container(data);
1650                });
1651            }
1652        })
1653    }
1654}
1655
1656/// Suppress progress messages for times before the given `as_of`.
1657///
1658/// This operator exists specifically to work around a memory spike we'd otherwise see when
1659/// hydrating arrangements (database-issues#6368). The memory spike happens because when the `arrange_core`
1660/// operator observes a frontier advancement without data it inserts an empty batch into the spine.
1661/// When it later inserts the snapshot batch into the spine, an empty batch is already there and
1662/// the spine initiates a merge of these batches, which requires allocating a new batch the size of
1663/// the snapshot batch.
1664///
1665/// The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
1666/// ensuring that the first frontier advancement downstream `arrange_core` operators observe is
1667/// beyond the `as_of`, so the snapshot data has already been collected.
1668///
1669/// To ensure this, this operator needs to take two measures:
1670///  * Keep around a minimum capability until the input announces progress beyond the `as_of`.
1671///  * Reclock all updates emitted at times not beyond the `as_of` to the minimum time.
1672///
1673/// The second measure requires elaboration: If we wouldn't reclock snapshot updates, they might
1674/// still be upstream of `arrange_core` operators when those get to know about us dropping the
1675/// minimum capability. The in-flight snapshot updates would hold back the input frontiers of
1676/// `arrange_core` operators to the `as_of`, which would cause them to insert empty batches.
1677fn suppress_early_progress<G, D>(
1678    stream: Stream<G, D>,
1679    as_of: Antichain<G::Timestamp>,
1680) -> Stream<G, D>
1681where
1682    G: Scope,
1683    D: Data,
1684{
1685    stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1686        let mut early_cap = Some(default_cap);
1687
1688        move |(input, frontier), output| {
1689            input.for_each_time(|data_cap, data| {
1690                if as_of.less_than(data_cap.time()) {
1691                    let mut session = output.session(&data_cap);
1692                    for data in data {
1693                        session.give_container(data);
1694                    }
1695                } else {
1696                    let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1697                    let mut session = output.session(&cap);
1698                    for data in data {
1699                        session.give_container(data);
1700                    }
1701                }
1702            });
1703
1704            if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1705                early_cap.take();
1706            }
1707        }
1708    })
1709}
1710
1711/// Extension trait for [`StreamCore`] to selectively limit progress.
1712trait LimitProgress<T: Timestamp> {
1713    /// Limit the progress of the stream until its frontier reaches the given `upper` bound. Expects
1714    /// the implementation to observe times in data, and release capabilities based on the probe's
1715    /// frontier, after applying `slack` to round up timestamps.
1716    ///
1717    /// The implementation of this operator is subtle to avoid regressions in the rest of the
1718    /// system. Specifically joins hold back compaction on the other side of the join, so we need to
1719    /// make sure we release capabilities as soon as possible. This is why we only limit progress
1720    /// for times before the `upper`, which is the time until which the source can distinguish
1721    /// updates at the time of rendering. Once we make progress to the `upper`, we need to release
1722    /// our capability.
1723    ///
1724    /// This isn't perfect, and can result in regressions if on of the inputs lags behind. We could
1725    /// consider using the join of the uppers, i.e, use lower bound upper of all available inputs.
1726    ///
1727    /// Once the input frontier reaches `[]`, the implementation must release any capability to
1728    /// allow downstream operators to release resources.
1729    ///
1730    /// The implementation should limit the number of pending times to `limit` if it is `Some` to
1731    /// avoid unbounded memory usage.
1732    ///
1733    /// * `handle` is a probe installed on the dataflow's outputs as late as possible, but before
1734    ///   any timestamp rounding happens (c.f., `REFRESH EVERY` materialized views).
1735    /// * `slack_ms` is the number of milliseconds to round up timestamps to.
1736    /// * `name` is a human-readable name for the operator.
1737    /// * `limit` is the maximum number of pending times to keep around.
1738    /// * `upper` is the upper bound of the stream's frontier until which the implementation can
1739    ///   retain a capability.
1740    fn limit_progress(
1741        &self,
1742        handle: MzProbeHandle<T>,
1743        slack_ms: u64,
1744        limit: Option<usize>,
1745        upper: Antichain<T>,
1746        name: String,
1747    ) -> Self;
1748}
1749
1750// TODO: We could make this generic over a `T` that can be converted to and from a u64 millisecond
1751// number.
1752impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1753where
1754    G: Scope<Timestamp = mz_repr::Timestamp>,
1755    D: timely::Data,
1756    R: timely::Data,
1757{
1758    fn limit_progress(
1759        &self,
1760        handle: MzProbeHandle<mz_repr::Timestamp>,
1761        slack_ms: u64,
1762        limit: Option<usize>,
1763        upper: Antichain<mz_repr::Timestamp>,
1764        name: String,
1765    ) -> Self {
1766        let stream =
1767            self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1768                // Times that we've observed on our input.
1769                let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1770                // Capability for the lower bound of `pending_times`, if any.
1771                let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1772
1773                let activator = self.scope().activator_for(info.address);
1774                handle.activate(activator.clone());
1775
1776                move |(input, frontier), output| {
1777                    input.for_each(|cap, data| {
1778                        for time in data
1779                            .iter()
1780                            .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1781                        {
1782                            let rounded_time =
1783                                (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1784                            if !upper.less_than(&rounded_time.into()) {
1785                                pending_times.insert(rounded_time.into());
1786                            }
1787                        }
1788                        output.session(&cap).give_container(data);
1789                        if retained_cap.as_ref().is_none_or(|c| {
1790                            !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1791                        }) {
1792                            retained_cap = Some(cap.retain());
1793                        }
1794                    });
1795
1796                    handle.with_frontier(|f| {
1797                        while pending_times
1798                            .first()
1799                            .map_or(false, |retained_time| !f.less_than(&retained_time))
1800                        {
1801                            let _ = pending_times.pop_first();
1802                        }
1803                    });
1804
1805                    while limit.map_or(false, |limit| pending_times.len() > limit) {
1806                        let _ = pending_times.pop_first();
1807                    }
1808
1809                    match (retained_cap.as_mut(), pending_times.first()) {
1810                        (Some(cap), Some(first)) => cap.downgrade(first),
1811                        (_, None) => retained_cap = None,
1812                        _ => {}
1813                    }
1814
1815                    if frontier.is_empty() {
1816                        retained_cap = None;
1817                        pending_times.clear();
1818                    }
1819
1820                    if !pending_times.is_empty() {
1821                        tracing::debug!(
1822                            name,
1823                            info.global_id,
1824                            pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1825                            frontier = ?frontier.frontier().get(0),
1826                            probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1827                            ?upper,
1828                            "pending times",
1829                        );
1830                    }
1831                }
1832            });
1833        stream
1834    }
1835}
1836
1837/// A formatter for an iterator of timestamps that displays the first element, and subsequently
1838/// the difference between timestamps.
1839struct PendingTimesDisplay<T>(T);
1840
1841impl<T> std::fmt::Display for PendingTimesDisplay<T>
1842where
1843    T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1844{
1845    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1846        let mut iter = self.0.clone().into_iter();
1847        write!(f, "[")?;
1848        if let Some(first) = iter.next() {
1849            write!(f, "{}", first)?;
1850            let mut last = u64::from(first);
1851            for time in iter {
1852                write!(f, ", +{}", u64::from(time) - last)?;
1853                last = u64::from(time);
1854            }
1855        }
1856        write!(f, "]")?;
1857        Ok(())
1858    }
1859}
1860
1861/// Helper to merge pairs of datum iterators into a row or split a datum iterator
1862/// into two rows, given the arity of the first component.
1863#[derive(Clone, Copy, Debug)]
1864struct Pairer {
1865    split_arity: usize,
1866}
1867
1868impl Pairer {
1869    /// Creates a pairer with knowledge of the arity of first component in the pair.
1870    fn new(split_arity: usize) -> Self {
1871        Self { split_arity }
1872    }
1873
1874    /// Merges a pair of datum iterators creating a `Row` instance.
1875    fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1876    where
1877        I1: IntoIterator<Item = Datum<'a>>,
1878        I2: IntoIterator<Item = Datum<'a>>,
1879    {
1880        SharedRow::pack(first.into_iter().chain(second))
1881    }
1882
1883    /// Splits a datum iterator into a pair of `Row` instances.
1884    fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1885        let mut datum_iter = datum_iter.into_iter();
1886        let mut row_builder = SharedRow::get();
1887        let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1888        let second = row_builder.pack_using(datum_iter);
1889        (first, second)
1890    }
1891}