Skip to main content

mz_compute_types/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18    CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row, Timestamp};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(
58    Clone,
59    Debug,
60    Default,
61    Deserialize,
62    Eq,
63    Ord,
64    PartialEq,
65    PartialOrd,
66    Serialize
67)]
68pub struct AvailableCollections {
69    /// Whether the collection exists in unarranged form.
70    pub raw: bool,
71    /// The list of available arrangements, presented as a `KeyValRowMapping`,
72    /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73    /// The documentation for `KeyValRowMapping` explains these fields better.
74    pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
75}
76
77impl AvailableCollections {
78    /// Represent a collection that has no arrangements.
79    pub fn new_raw() -> Self {
80        Self {
81            raw: true,
82            arranged: Vec::new(),
83        }
84    }
85
86    /// Represent a collection that is arranged in the specified ways.
87    pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
88        assert!(
89            !arranged.is_empty(),
90            "Invariant violated: at least one collection must exist"
91        );
92        Self {
93            raw: false,
94            arranged,
95        }
96    }
97
98    /// Get some arrangement, if one exists.
99    pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
100        assert!(
101            self.raw || !self.arranged.is_empty(),
102            "Invariant violated: at least one collection must exist"
103        );
104        self.arranged.get(0)
105    }
106}
107
108/// How to render the arrangements requested by an `ArrangeBy`.
109///
110/// Decided during LIR lowering and consumed by the renderer. The variant says what the
111/// renderer will do, not what it knows about the input.
112#[derive(
113    Clone,
114    Copy,
115    Debug,
116    Deserialize,
117    Eq,
118    Ord,
119    PartialEq,
120    PartialOrd,
121    Serialize
122)]
123pub enum ArrangementStrategy {
124    /// Form arrangements directly from the input collection.
125    Direct,
126    /// Insert temporal bucketing in front of the arrangement, to delay future-stamped
127    /// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them.
128    /// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like
129    /// `Direct`.
130    TemporalBucketing,
131}
132
133impl std::fmt::Display for ArrangementStrategy {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        match self {
136            ArrangementStrategy::Direct => write!(f, "Direct"),
137            ArrangementStrategy::TemporalBucketing => write!(f, "TemporalBucketing"),
138        }
139    }
140}
141
142/// An identifier for an LIR node.
143#[derive(
144    Clone,
145    Copy,
146    Debug,
147    Deserialize,
148    Eq,
149    Ord,
150    PartialEq,
151    PartialOrd,
152    Serialize,
153    Columnar
154)]
155pub struct LirId(u64);
156
157impl LirId {
158    fn as_u64(&self) -> u64 {
159        self.0
160    }
161}
162
163impl From<LirId> for u64 {
164    fn from(value: LirId) -> Self {
165        value.as_u64()
166    }
167}
168
169impl std::fmt::Display for LirId {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        write!(f, "{}", self.0)
172    }
173}
174
175/// A rendering plan with as much conditional logic as possible removed.
176#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
177pub struct Plan {
178    /// A dataflow-local identifier.
179    pub lir_id: LirId,
180    /// The underlying operator.
181    pub node: PlanNode,
182}
183
184/// The actual AST node of the `Plan`.
185#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
186pub enum PlanNode {
187    /// A collection containing a pre-determined collection.
188    Constant {
189        /// Explicit update triples for the collection.
190        rows: Result<Vec<(Row, Timestamp, Diff)>, EvalError>,
191    },
192    /// A reference to a bound collection.
193    ///
194    /// This is commonly either an external reference to an existing source or
195    /// maintained arrangement, or an internal reference to a `Let` identifier.
196    Get {
197        /// A global or local identifier naming the collection.
198        id: Id,
199        /// Arrangements that will be available.
200        ///
201        /// The collection will also be loaded if available, which it will
202        /// not be for imported data, but which it may be for locally defined
203        /// data.
204        // TODO: Be more explicit about whether a collection is available,
205        // although one can always produce it from an arrangement, and it
206        // seems generally advantageous to do that instead (to avoid cloning
207        // rows, by using `mfp` first on borrowed data).
208        keys: AvailableCollections,
209        /// The actions to take when introducing the collection.
210        plan: GetPlan,
211    },
212    /// Binds `value` to `id`, and then results in `body` with that binding.
213    ///
214    /// This stage has the effect of sharing `value` across multiple possible
215    /// uses in `body`, and is the only mechanism we have for sharing collection
216    /// information across parts of a dataflow.
217    ///
218    /// The binding is not available outside of `body`.
219    Let {
220        /// The local identifier to be used, available to `body` as `Id::Local(id)`.
221        id: LocalId,
222        /// The collection that should be bound to `id`.
223        value: Box<Plan>,
224        /// The collection that results, which is allowed to contain `Get` stages
225        /// that reference `Id::Local(id)`.
226        body: Box<Plan>,
227    },
228    /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
229    ///
230    /// All bindings are available to all bindings, and to `body`.
231    /// The contents of each binding are initially empty, and then updated through a sequence
232    /// of iterations in which each binding is updated in sequence, from the most recent values
233    /// of all bindings.
234    LetRec {
235        /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
236        ids: Vec<LocalId>,
237        /// The collection that should be bound to `id`.
238        values: Vec<Plan>,
239        /// Maximum number of iterations. See further info on the MIR `LetRec`.
240        limits: Vec<Option<LetRecLimit>>,
241        /// The collection that results, which is allowed to contain `Get` stages
242        /// that reference `Id::Local(id)`.
243        body: Box<Plan>,
244    },
245    /// Map, Filter, and Project operators.
246    ///
247    /// This stage contains work that we would ideally like to fuse to other plan
248    /// stages, but for practical reasons cannot. For example: threshold, topk,
249    /// and sometimes reduce stages are not able to absorb this operator.
250    Mfp {
251        /// The input collection.
252        input: Box<Plan>,
253        /// Linear operator to apply to each record.
254        mfp: MapFilterProject,
255        /// Whether the input is from an arrangement, and if so,
256        /// whether we can seek to a specific value therein
257        input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
258    },
259    /// A variable number of output records for each input record.
260    ///
261    /// This stage is a bit of a catch-all for logic that does not easily fit in
262    /// map stages. This includes table valued functions, but also functions of
263    /// multiple arguments, and functions that modify the sign of updates.
264    ///
265    /// This stage allows a `MapFilterProject` operator to be fused to its output,
266    /// and this can be very important as otherwise the output of `func` is just
267    /// appended to the input record, for as many outputs as it has. This has the
268    /// unpleasant default behavior of repeating potentially large records that
269    /// are being unpacked, producing quadratic output in those cases. Instead,
270    /// in these cases use a `mfp` member that projects away these large fields.
271    FlatMap {
272        /// The particular arrangement of the input we expect to use,
273        /// if any
274        input_key: Option<Vec<MirScalarExpr>>,
275        /// The input collection.
276        input: Box<Plan>,
277        /// Expressions that for each row prepare the arguments to `func`.
278        exprs: Vec<MirScalarExpr>,
279        /// The variable-record emitting function.
280        func: TableFunc,
281        /// Linear operator to apply to each record produced by `func`.
282        mfp_after: MapFilterProject,
283    },
284    /// A multiway relational equijoin, with fused map, filter, and projection.
285    ///
286    /// This stage performs a multiway join among `inputs`, using the equality
287    /// constraints expressed in `plan`. The plan also describes the implementation
288    /// strategy we will use, and any pushed down per-record work.
289    Join {
290        /// An ordered list of inputs that will be joined.
291        inputs: Vec<Plan>,
292        /// Detailed information about the implementation of the join.
293        ///
294        /// This includes information about the implementation strategy, but also
295        /// any map, filter, project work that we might follow the join with, but
296        /// potentially pushed down into the implementation of the join.
297        plan: JoinPlan,
298    },
299    /// Aggregation by key.
300    Reduce {
301        /// The particular arrangement of the input we expect to use,
302        /// if any
303        input_key: Option<Vec<MirScalarExpr>>,
304        /// The input collection.
305        input: Box<Plan>,
306        /// A plan for changing input records into key, value pairs.
307        key_val_plan: KeyValPlan,
308        /// A plan for performing the reduce.
309        ///
310        /// The implementation of reduction has several different strategies based
311        /// on the properties of the reduction, and the input itself. Please check
312        /// out the documentation for this type for more detail.
313        plan: ReducePlan,
314        /// An MFP that must be applied to results. The projection part of this
315        /// MFP must preserve the key for the reduction; otherwise, the results
316        /// become undefined. Additionally, the MFP must be free from temporal
317        /// predicates so that it can be readily evaluated.
318        /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
319        mfp_after: MapFilterProject,
320        /// Strategy for forming the internal input arrangement built by `Reduce`
321        /// (materialized via `key_val_plan`).
322        ///
323        /// Set by the lowering from the input's `has_future_updates` flag. The
324        /// renderer applies it to the keyed `(key, val)` stream feeding the
325        /// reduce. See `render_reduce` for the rationale on why this is
326        /// plumbed through `Reduce` rather than handled at the arrangement site.
327        ///
328        /// Note: unrelated to the hash buckets used by hierarchical reductions
329        /// (e.g. `ReducePlan::Hierarchical`'s `buckets`), which are an internal
330        /// sharding scheme for `min`/`max`-style aggregations. Here "bucketing"
331        /// refers exclusively to temporal (time-domain) bucketing of
332        /// future-stamped updates.
333        temporal_bucketing_strategy: ArrangementStrategy,
334    },
335    /// Key-based "Top K" operator, retaining the first K records in each group.
336    TopK {
337        /// The input collection.
338        input: Box<Plan>,
339        /// A plan for performing the Top-K.
340        ///
341        /// The implementation of reduction has several different strategies based
342        /// on the properties of the reduction, and the input itself. Please check
343        /// out the documentation for this type for more detail.
344        top_k_plan: TopKPlan,
345        /// Strategy for bucketing the input collection ahead of the Top-K operator.
346        ///
347        /// Set by the lowering from the input's `has_future_updates` flag. The
348        /// renderer applies it to the per-row input stream at the top of
349        /// `render_topk`, covering all three `TopKPlan` arms uniformly. See
350        /// `PlanNode::Reduce::temporal_bucketing_strategy` for the underlying
351        /// convention.
352        temporal_bucketing_strategy: ArrangementStrategy,
353    },
354    /// Inverts the sign of each update.
355    Negate {
356        /// The input collection.
357        input: Box<Plan>,
358    },
359    /// Filters records that accumulate negatively.
360    ///
361    /// Although the operator suppresses updates, it is a stateful operator taking
362    /// resources proportional to the number of records with non-zero accumulation.
363    Threshold {
364        /// The input collection.
365        input: Box<Plan>,
366        /// A plan for performing the threshold.
367        ///
368        /// The implementation of reduction has several different strategies based
369        /// on the properties of the reduction, and the input itself. Please check
370        /// out the documentation for this type for more detail.
371        threshold_plan: ThresholdPlan,
372    },
373    /// Adds the contents of the input collections.
374    ///
375    /// Importantly, this is *multiset* union, so the multiplicities of records will
376    /// add. This is in contrast to *set* union, where the multiplicities would be
377    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
378    /// implementing the "distinct" operator.
379    Union {
380        /// The input collections
381        inputs: Vec<Plan>,
382        /// Whether to consolidate the output, e.g., cancel negated records.
383        consolidate_output: bool,
384        /// Per-input bucketing strategies. Lockstep with `inputs`: index `i` is the
385        /// strategy applied to `inputs[i]` before concatenation.
386        ///
387        /// Set by the lowering from each input's `has_future_updates` flag. Only
388        /// consolidating Unions (`consolidate_output: true`) carry non-`Direct`
389        /// entries, because bucketing only pays off ahead of a consolidating
390        /// downstream operator. See `PlanNode::Reduce::temporal_bucketing_strategy`
391        /// for the underlying convention.
392        temporal_bucketing_strategies: Vec<ArrangementStrategy>,
393    },
394    /// The `input` plan, but with additional arrangements.
395    ///
396    /// This operator does not change the logical contents of `input`, but ensures
397    /// that certain arrangements are available in the results. This operator can
398    /// be important for e.g. the `Join` stage which benefits from multiple arrangements
399    /// or to cap a `Plan` so that indexes can be exported.
400    ArrangeBy {
401        /// The key that must be used to access the input.
402        input_key: Option<Vec<MirScalarExpr>>,
403        /// The input collection.
404        input: Box<Plan>,
405        /// The MFP that must be applied to the input.
406        input_mfp: MapFilterProject,
407        /// A list of arrangement keys, and possibly a raw collection,
408        /// that will be added to those of the input. Does not include
409        /// any other existing arrangements.
410        forms: AvailableCollections,
411        /// How the renderer should form the arrangements requested by `forms`.
412        strategy: ArrangementStrategy,
413    },
414}
415
416impl PlanNode {
417    /// Iterates through references to child expressions.
418    pub fn children(&self) -> impl Iterator<Item = &Plan> {
419        let mut first = None;
420        let mut second = None;
421        let mut rest = None;
422        let mut last = None;
423
424        use PlanNode::*;
425        match self {
426            Constant { .. } | Get { .. } => (),
427            Let { value, body, .. } => {
428                first = Some(&**value);
429                second = Some(&**body);
430            }
431            LetRec { values, body, .. } => {
432                rest = Some(values);
433                last = Some(&**body);
434            }
435            Mfp { input, .. }
436            | FlatMap { input, .. }
437            | Reduce { input, .. }
438            | TopK { input, .. }
439            | Negate { input, .. }
440            | Threshold { input, .. }
441            | ArrangeBy { input, .. } => {
442                first = Some(&**input);
443            }
444            Join { inputs, .. } | Union { inputs, .. } => {
445                rest = Some(inputs);
446            }
447        }
448
449        first
450            .into_iter()
451            .chain(second)
452            .chain(rest.into_iter().flatten())
453            .chain(last)
454    }
455
456    /// Iterates through mutable references to child expressions.
457    pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan> {
458        let mut first = None;
459        let mut second = None;
460        let mut rest = None;
461        let mut last = None;
462
463        use PlanNode::*;
464        match self {
465            Constant { .. } | Get { .. } => (),
466            Let { value, body, .. } => {
467                first = Some(&mut **value);
468                second = Some(&mut **body);
469            }
470            LetRec { values, body, .. } => {
471                rest = Some(values);
472                last = Some(&mut **body);
473            }
474            Mfp { input, .. }
475            | FlatMap { input, .. }
476            | Reduce { input, .. }
477            | TopK { input, .. }
478            | Negate { input, .. }
479            | Threshold { input, .. }
480            | ArrangeBy { input, .. } => {
481                first = Some(&mut **input);
482            }
483            Join { inputs, .. } | Union { inputs, .. } => {
484                rest = Some(inputs);
485            }
486        }
487
488        first
489            .into_iter()
490            .chain(second)
491            .chain(rest.into_iter().flatten())
492            .chain(last)
493    }
494}
495
496impl PlanNode {
497    /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
498    pub fn as_plan(self, lir_id: LirId) -> Plan {
499        Plan { lir_id, node: self }
500    }
501}
502
503impl Plan {
504    /// Pretty-print this [Plan] to a string.
505    pub fn pretty(&self) -> String {
506        let config = ExplainConfig::default();
507        self.debug_explain(&config, None)
508    }
509
510    /// Pretty-print this [Plan] to a string using a custom
511    /// [ExplainConfig] and an optionally provided [ExprHumanizer].
512    /// This is intended for debugging and tests, not users.
513    pub fn debug_explain(
514        &self,
515        config: &ExplainConfig,
516        humanizer: Option<&dyn ExprHumanizer>,
517    ) -> String {
518        text_string_at(self, || PlanRenderingContext {
519            indent: Indent::default(),
520            humanizer: humanizer.unwrap_or(&DummyHumanizer),
521            annotations: BTreeMap::default(),
522            config,
523            ambiguous_ids: BTreeSet::default(),
524        })
525    }
526}
527
528/// How a `Get` stage will be rendered.
529#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
530pub enum GetPlan {
531    /// Simply pass input arrangements on to the next stage.
532    PassArrangements,
533    /// Using the supplied key, optionally seek the row, and apply the MFP.
534    Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
535    /// Scan the input collection (unarranged) and apply the MFP.
536    Collection(MapFilterProject),
537}
538
539impl Plan {
540    /// Convert the dataflow description into one that uses render plans.
541    #[mz_ore::instrument(
542        target = "optimizer",
543        level = "debug",
544        fields(path.segment = "finalize_dataflow")
545    )]
546    pub fn finalize_dataflow(
547        desc: DataflowDescription<OptimizedMirRelationExpr>,
548        features: &OptimizerFeatures,
549    ) -> Result<DataflowDescription<Self>, String> {
550        // First, we lower the dataflow description from MIR to LIR.
551        let mut dataflow = Self::lower_dataflow(desc, features)?;
552
553        // Subsequently, we perform plan refinements for the dataflow.
554        Self::refine_source_mfps(&mut dataflow);
555
556        // Note: `consolidate_output` for `Union` and per-input
557        // `temporal_bucketing_strategies` are decided at lowering time (see the
558        // `Union` arm of `lower_mir_expr_stack_safe`). The pre-existing
559        // `refine_union_negate_consolidation` pass — which used to flip
560        // `consolidate_output` to `true` for Unions with a `Negate` child — has
561        // been folded into the lowering, since lowering is the only point where
562        // the bucketing decision (which depends on `has_future_updates`) is
563        // available.
564
565        if dataflow.is_single_time() {
566            Self::refine_single_time_operator_selection(&mut dataflow);
567
568            // The relaxation of the `must_consolidate` flag performs an LIR-based
569            // analysis and transform under checked recursion. By a similar argument
570            // made in `from_mir`, we do not expect the recursion limit to be hit.
571            // However, if that happens, we propagate an error to the caller.
572            // To apply the transform, we first obtain monotonic source and index
573            // global IDs and add them to a `TransformConfig` instance.
574            let monotonic_ids = dataflow
575                .source_imports
576                .iter()
577                .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
578                .chain(
579                    dataflow
580                        .index_imports
581                        .iter()
582                        .filter_map(|(_id, index_import)| {
583                            if index_import.monotonic {
584                                Some(index_import.desc.on_id)
585                            } else {
586                                None
587                            }
588                        }),
589                )
590                .collect::<BTreeSet<_>>();
591
592            let config = TransformConfig { monotonic_ids };
593            Self::refine_single_time_consolidation(&mut dataflow, &config)?;
594        }
595
596        soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
597
598        mz_repr::explain::trace_plan(&dataflow);
599
600        Ok(dataflow)
601    }
602
603    /// Lowers the dataflow description from MIR to LIR. To this end, the
604    /// method collects all available arrangements and based on this information
605    /// creates plans for every object to be built for the dataflow.
606    #[mz_ore::instrument(
607        target = "optimizer",
608        level = "debug",
609        fields(path.segment ="mir_to_lir")
610    )]
611    fn lower_dataflow(
612        desc: DataflowDescription<OptimizedMirRelationExpr>,
613        features: &OptimizerFeatures,
614    ) -> Result<DataflowDescription<Self>, String> {
615        let context = lowering::Context::new(desc.debug_name.clone(), features);
616        let dataflow = context.lower(desc)?;
617
618        mz_repr::explain::trace_plan(&dataflow);
619
620        Ok(dataflow)
621    }
622
623    /// Refines the source instance descriptions for sources imported by `dataflow` to
624    /// push down common MFP expressions.
625    #[mz_ore::instrument(
626        target = "optimizer",
627        level = "debug",
628        fields(path.segment = "refine_source_mfps")
629    )]
630    fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
631        // Extract MFPs from Get operators for sources, and extract what we can for the source.
632        // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
633        for (source_id, source_import) in dataflow.source_imports.iter_mut() {
634            let source = &mut source_import.desc;
635            let mut identity_present = false;
636            let mut mfps = Vec::new();
637            for build_desc in dataflow.objects_to_build.iter_mut() {
638                let mut todo = vec![&mut build_desc.plan];
639                while let Some(expression) = todo.pop() {
640                    let node = &mut expression.node;
641                    if let PlanNode::Get { id, plan, .. } = node {
642                        if *id == mz_expr::Id::Global(*source_id) {
643                            match plan {
644                                GetPlan::Collection(mfp) => mfps.push(mfp),
645                                GetPlan::PassArrangements => {
646                                    identity_present = true;
647                                }
648                                GetPlan::Arrangement(..) => {
649                                    panic!("Surprising `GetPlan` for imported source: {:?}", plan);
650                                }
651                            }
652                        }
653                    } else {
654                        todo.extend(node.children_mut());
655                    }
656                }
657            }
658
659            // Direct exports of sources are possible, and prevent pushdown.
660            identity_present |= dataflow
661                .index_exports
662                .values()
663                .any(|(x, _)| x.on_id == *source_id);
664            identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
665
666            if !identity_present && !mfps.is_empty() {
667                // Extract a common prefix `MapFilterProject` from `mfps`.
668                let common = MapFilterProject::extract_common(&mut mfps[..]);
669                // Apply common expressions to the source's `MapFilterProject`.
670                let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
671                    MapFilterProject::compose(mfp, common)
672                } else {
673                    common
674                };
675                mfp.optimize();
676                source.arguments.operators = Some(mfp);
677            }
678        }
679        mz_repr::explain::trace_plan(dataflow);
680    }
681
682    /// Refines the plans of objects to be built as part of `dataflow` to take advantage
683    /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
684    /// one-shot SELECT query.
685    #[mz_ore::instrument(
686        target = "optimizer",
687        level = "debug",
688        fields(path.segment = "refine_single_time_operator_selection")
689    )]
690    fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
691        // We should only reach here if we have a one-shot SELECT query, i.e.,
692        // a single-time dataflow.
693        assert!(dataflow.is_single_time());
694
695        // Upgrade single-time plans to monotonic.
696        for build_desc in dataflow.objects_to_build.iter_mut() {
697            let mut todo = vec![&mut build_desc.plan];
698            while let Some(expression) = todo.pop() {
699                let node = &mut expression.node;
700                match node {
701                    PlanNode::Reduce { plan, .. } => {
702                        // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
703                        match plan {
704                            ReducePlan::Hierarchical(hierarchical) => {
705                                hierarchical.as_monotonic(true);
706                            }
707                            _ => {
708                                // Nothing to do for other plans, and doing nothing is safe for future variants.
709                            }
710                        }
711                        todo.extend(node.children_mut());
712                    }
713                    PlanNode::TopK { top_k_plan, .. } => {
714                        top_k_plan.as_monotonic(true);
715                        todo.extend(node.children_mut());
716                    }
717                    PlanNode::LetRec { body, .. } => {
718                        // Only the non-recursive `body` is restricted to a single time.
719                        todo.push(body);
720                    }
721                    _ => {
722                        // Nothing to do for other expressions, and doing nothing is safe for future expressions.
723                        todo.extend(node.children_mut());
724                    }
725                }
726            }
727        }
728        mz_repr::explain::trace_plan(dataflow);
729    }
730
731    /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
732    /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
733    /// whenever the input is deemed to be physically monotonic.
734    #[mz_ore::instrument(
735        target = "optimizer",
736        level = "debug",
737        fields(path.segment = "refine_single_time_consolidation")
738    )]
739    fn refine_single_time_consolidation(
740        dataflow: &mut DataflowDescription<Self>,
741        config: &TransformConfig,
742    ) -> Result<(), String> {
743        // We should only reach here if we have a one-shot SELECT query, i.e.,
744        // a single-time dataflow.
745        assert!(dataflow.is_single_time());
746
747        let transform = transform::RelaxMustConsolidate;
748        for build_desc in dataflow.objects_to_build.iter_mut() {
749            transform
750                .transform(config, &mut build_desc.plan)
751                .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
752        }
753        mz_repr::explain::trace_plan(dataflow);
754        Ok(())
755    }
756}
757
758impl CollectionPlan for PlanNode {
759    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
760        match self {
761            PlanNode::Constant { rows: _ } => (),
762            PlanNode::Get {
763                id,
764                keys: _,
765                plan: _,
766            } => match id {
767                Id::Global(id) => {
768                    out.insert(*id);
769                }
770                Id::Local(_) => (),
771            },
772            PlanNode::Let { id: _, value, body } => {
773                value.depends_on_into(out);
774                body.depends_on_into(out);
775            }
776            PlanNode::LetRec {
777                ids: _,
778                values,
779                limits: _,
780                body,
781            } => {
782                for value in values.iter() {
783                    value.depends_on_into(out);
784                }
785                body.depends_on_into(out);
786            }
787            PlanNode::Join { inputs, plan: _ }
788            | PlanNode::Union {
789                inputs,
790                consolidate_output: _,
791                temporal_bucketing_strategies: _,
792            } => {
793                for input in inputs {
794                    input.depends_on_into(out);
795                }
796            }
797            PlanNode::Mfp {
798                input,
799                mfp: _,
800                input_key_val: _,
801            }
802            | PlanNode::FlatMap {
803                input_key: _,
804                input,
805                exprs: _,
806                func: _,
807                mfp_after: _,
808            }
809            | PlanNode::ArrangeBy {
810                input_key: _,
811                input,
812                input_mfp: _,
813                forms: _,
814                strategy: _,
815            }
816            | PlanNode::Reduce {
817                input_key: _,
818                input,
819                key_val_plan: _,
820                plan: _,
821                mfp_after: _,
822                temporal_bucketing_strategy: _,
823            }
824            | PlanNode::TopK {
825                input,
826                top_k_plan: _,
827                temporal_bucketing_strategy: _,
828            }
829            | PlanNode::Negate { input }
830            | PlanNode::Threshold {
831                input,
832                threshold_plan: _,
833            } => {
834                input.depends_on_into(out);
835            }
836        }
837    }
838}
839
840impl CollectionPlan for Plan {
841    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
842        self.node.depends_on_into(out);
843    }
844}
845
846/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
847/// on the expected number of rows that will have the same group key.
848fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
849    // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
850    // mz_introspection.mz_expected_group_size_advice.
851    let mut buckets = vec![];
852    let mut current = 16;
853
854    // Plan for 4B records in the expected case if the user didn't specify a group size.
855    let limit = expected_group_size.unwrap_or(4_000_000_000);
856
857    // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
858    // each layer gets from the preceding layer, while also limiting the number of layers.
859    while current < limit {
860        buckets.push(current);
861        current = current.saturating_mul(16);
862    }
863
864    buckets.reverse();
865    buckets
866}