Skip to main content

mz_compute_types/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18    CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row, Timestamp};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(
58    Clone,
59    Debug,
60    Default,
61    Deserialize,
62    Eq,
63    Ord,
64    PartialEq,
65    PartialOrd,
66    Serialize
67)]
68pub struct AvailableCollections {
69    /// Whether the collection exists in unarranged form.
70    pub raw: bool,
71    /// The list of available arrangements, presented as a `KeyValRowMapping`,
72    /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73    /// The documentation for `KeyValRowMapping` explains these fields better.
74    pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
75}
76
77impl AvailableCollections {
78    /// Represent a collection that has no arrangements.
79    pub fn new_raw() -> Self {
80        Self {
81            raw: true,
82            arranged: Vec::new(),
83        }
84    }
85
86    /// Represent a collection that is arranged in the specified ways.
87    pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
88        assert!(
89            !arranged.is_empty(),
90            "Invariant violated: at least one collection must exist"
91        );
92        Self {
93            raw: false,
94            arranged,
95        }
96    }
97
98    /// Get some arrangement, if one exists.
99    pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
100        assert!(
101            self.raw || !self.arranged.is_empty(),
102            "Invariant violated: at least one collection must exist"
103        );
104        self.arranged.get(0)
105    }
106}
107
108/// How to render the arrangements requested by an `ArrangeBy`.
109///
110/// Decided during LIR lowering and consumed by the renderer. The variant says what the
111/// renderer will do, not what it knows about the input.
112#[derive(
113    Clone,
114    Copy,
115    Debug,
116    Deserialize,
117    Eq,
118    Ord,
119    PartialEq,
120    PartialOrd,
121    Serialize
122)]
123pub enum ArrangementStrategy {
124    /// Form arrangements directly from the input collection.
125    Direct,
126    /// Insert temporal bucketing in front of the arrangement, to delay future-stamped
127    /// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them.
128    /// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like
129    /// `Direct`.
130    TemporalBucketing,
131}
132
133/// An identifier for an LIR node.
134#[derive(
135    Clone,
136    Copy,
137    Debug,
138    Deserialize,
139    Eq,
140    Ord,
141    PartialEq,
142    PartialOrd,
143    Serialize,
144    Columnar
145)]
146pub struct LirId(u64);
147
148impl LirId {
149    fn as_u64(&self) -> u64 {
150        self.0
151    }
152}
153
154impl From<LirId> for u64 {
155    fn from(value: LirId) -> Self {
156        value.as_u64()
157    }
158}
159
160impl std::fmt::Display for LirId {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        write!(f, "{}", self.0)
163    }
164}
165
166/// A rendering plan with as much conditional logic as possible removed.
167#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
168pub struct Plan {
169    /// A dataflow-local identifier.
170    pub lir_id: LirId,
171    /// The underlying operator.
172    pub node: PlanNode,
173}
174
175/// The actual AST node of the `Plan`.
176#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
177pub enum PlanNode {
178    /// A collection containing a pre-determined collection.
179    Constant {
180        /// Explicit update triples for the collection.
181        rows: Result<Vec<(Row, Timestamp, Diff)>, EvalError>,
182    },
183    /// A reference to a bound collection.
184    ///
185    /// This is commonly either an external reference to an existing source or
186    /// maintained arrangement, or an internal reference to a `Let` identifier.
187    Get {
188        /// A global or local identifier naming the collection.
189        id: Id,
190        /// Arrangements that will be available.
191        ///
192        /// The collection will also be loaded if available, which it will
193        /// not be for imported data, but which it may be for locally defined
194        /// data.
195        // TODO: Be more explicit about whether a collection is available,
196        // although one can always produce it from an arrangement, and it
197        // seems generally advantageous to do that instead (to avoid cloning
198        // rows, by using `mfp` first on borrowed data).
199        keys: AvailableCollections,
200        /// The actions to take when introducing the collection.
201        plan: GetPlan,
202    },
203    /// Binds `value` to `id`, and then results in `body` with that binding.
204    ///
205    /// This stage has the effect of sharing `value` across multiple possible
206    /// uses in `body`, and is the only mechanism we have for sharing collection
207    /// information across parts of a dataflow.
208    ///
209    /// The binding is not available outside of `body`.
210    Let {
211        /// The local identifier to be used, available to `body` as `Id::Local(id)`.
212        id: LocalId,
213        /// The collection that should be bound to `id`.
214        value: Box<Plan>,
215        /// The collection that results, which is allowed to contain `Get` stages
216        /// that reference `Id::Local(id)`.
217        body: Box<Plan>,
218    },
219    /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
220    ///
221    /// All bindings are available to all bindings, and to `body`.
222    /// The contents of each binding are initially empty, and then updated through a sequence
223    /// of iterations in which each binding is updated in sequence, from the most recent values
224    /// of all bindings.
225    LetRec {
226        /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
227        ids: Vec<LocalId>,
228        /// The collection that should be bound to `id`.
229        values: Vec<Plan>,
230        /// Maximum number of iterations. See further info on the MIR `LetRec`.
231        limits: Vec<Option<LetRecLimit>>,
232        /// The collection that results, which is allowed to contain `Get` stages
233        /// that reference `Id::Local(id)`.
234        body: Box<Plan>,
235    },
236    /// Map, Filter, and Project operators.
237    ///
238    /// This stage contains work that we would ideally like to fuse to other plan
239    /// stages, but for practical reasons cannot. For example: threshold, topk,
240    /// and sometimes reduce stages are not able to absorb this operator.
241    Mfp {
242        /// The input collection.
243        input: Box<Plan>,
244        /// Linear operator to apply to each record.
245        mfp: MapFilterProject,
246        /// Whether the input is from an arrangement, and if so,
247        /// whether we can seek to a specific value therein
248        input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
249    },
250    /// A variable number of output records for each input record.
251    ///
252    /// This stage is a bit of a catch-all for logic that does not easily fit in
253    /// map stages. This includes table valued functions, but also functions of
254    /// multiple arguments, and functions that modify the sign of updates.
255    ///
256    /// This stage allows a `MapFilterProject` operator to be fused to its output,
257    /// and this can be very important as otherwise the output of `func` is just
258    /// appended to the input record, for as many outputs as it has. This has the
259    /// unpleasant default behavior of repeating potentially large records that
260    /// are being unpacked, producing quadratic output in those cases. Instead,
261    /// in these cases use a `mfp` member that projects away these large fields.
262    FlatMap {
263        /// The particular arrangement of the input we expect to use,
264        /// if any
265        input_key: Option<Vec<MirScalarExpr>>,
266        /// The input collection.
267        input: Box<Plan>,
268        /// Expressions that for each row prepare the arguments to `func`.
269        exprs: Vec<MirScalarExpr>,
270        /// The variable-record emitting function.
271        func: TableFunc,
272        /// Linear operator to apply to each record produced by `func`.
273        mfp_after: MapFilterProject,
274    },
275    /// A multiway relational equijoin, with fused map, filter, and projection.
276    ///
277    /// This stage performs a multiway join among `inputs`, using the equality
278    /// constraints expressed in `plan`. The plan also describes the implementation
279    /// strategy we will use, and any pushed down per-record work.
280    Join {
281        /// An ordered list of inputs that will be joined.
282        inputs: Vec<Plan>,
283        /// Detailed information about the implementation of the join.
284        ///
285        /// This includes information about the implementation strategy, but also
286        /// any map, filter, project work that we might follow the join with, but
287        /// potentially pushed down into the implementation of the join.
288        plan: JoinPlan,
289    },
290    /// Aggregation by key.
291    Reduce {
292        /// The particular arrangement of the input we expect to use,
293        /// if any
294        input_key: Option<Vec<MirScalarExpr>>,
295        /// The input collection.
296        input: Box<Plan>,
297        /// A plan for changing input records into key, value pairs.
298        key_val_plan: KeyValPlan,
299        /// A plan for performing the reduce.
300        ///
301        /// The implementation of reduction has several different strategies based
302        /// on the properties of the reduction, and the input itself. Please check
303        /// out the documentation for this type for more detail.
304        plan: ReducePlan,
305        /// An MFP that must be applied to results. The projection part of this
306        /// MFP must preserve the key for the reduction; otherwise, the results
307        /// become undefined. Additionally, the MFP must be free from temporal
308        /// predicates so that it can be readily evaluated.
309        /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
310        mfp_after: MapFilterProject,
311    },
312    /// Key-based "Top K" operator, retaining the first K records in each group.
313    TopK {
314        /// The input collection.
315        input: Box<Plan>,
316        /// A plan for performing the Top-K.
317        ///
318        /// The implementation of reduction has several different strategies based
319        /// on the properties of the reduction, and the input itself. Please check
320        /// out the documentation for this type for more detail.
321        top_k_plan: TopKPlan,
322    },
323    /// Inverts the sign of each update.
324    Negate {
325        /// The input collection.
326        input: Box<Plan>,
327    },
328    /// Filters records that accumulate negatively.
329    ///
330    /// Although the operator suppresses updates, it is a stateful operator taking
331    /// resources proportional to the number of records with non-zero accumulation.
332    Threshold {
333        /// The input collection.
334        input: Box<Plan>,
335        /// A plan for performing the threshold.
336        ///
337        /// The implementation of reduction has several different strategies based
338        /// on the properties of the reduction, and the input itself. Please check
339        /// out the documentation for this type for more detail.
340        threshold_plan: ThresholdPlan,
341    },
342    /// Adds the contents of the input collections.
343    ///
344    /// Importantly, this is *multiset* union, so the multiplicities of records will
345    /// add. This is in contrast to *set* union, where the multiplicities would be
346    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
347    /// implementing the "distinct" operator.
348    Union {
349        /// The input collections
350        inputs: Vec<Plan>,
351        /// Whether to consolidate the output, e.g., cancel negated records.
352        consolidate_output: bool,
353    },
354    /// The `input` plan, but with additional arrangements.
355    ///
356    /// This operator does not change the logical contents of `input`, but ensures
357    /// that certain arrangements are available in the results. This operator can
358    /// be important for e.g. the `Join` stage which benefits from multiple arrangements
359    /// or to cap a `Plan` so that indexes can be exported.
360    ArrangeBy {
361        /// The key that must be used to access the input.
362        input_key: Option<Vec<MirScalarExpr>>,
363        /// The input collection.
364        input: Box<Plan>,
365        /// The MFP that must be applied to the input.
366        input_mfp: MapFilterProject,
367        /// A list of arrangement keys, and possibly a raw collection,
368        /// that will be added to those of the input. Does not include
369        /// any other existing arrangements.
370        forms: AvailableCollections,
371        /// How the renderer should form the arrangements requested by `forms`.
372        strategy: ArrangementStrategy,
373    },
374}
375
376impl PlanNode {
377    /// Iterates through references to child expressions.
378    pub fn children(&self) -> impl Iterator<Item = &Plan> {
379        let mut first = None;
380        let mut second = None;
381        let mut rest = None;
382        let mut last = None;
383
384        use PlanNode::*;
385        match self {
386            Constant { .. } | Get { .. } => (),
387            Let { value, body, .. } => {
388                first = Some(&**value);
389                second = Some(&**body);
390            }
391            LetRec { values, body, .. } => {
392                rest = Some(values);
393                last = Some(&**body);
394            }
395            Mfp { input, .. }
396            | FlatMap { input, .. }
397            | Reduce { input, .. }
398            | TopK { input, .. }
399            | Negate { input, .. }
400            | Threshold { input, .. }
401            | ArrangeBy { input, .. } => {
402                first = Some(&**input);
403            }
404            Join { inputs, .. } | Union { inputs, .. } => {
405                rest = Some(inputs);
406            }
407        }
408
409        first
410            .into_iter()
411            .chain(second)
412            .chain(rest.into_iter().flatten())
413            .chain(last)
414    }
415
416    /// Iterates through mutable references to child expressions.
417    pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan> {
418        let mut first = None;
419        let mut second = None;
420        let mut rest = None;
421        let mut last = None;
422
423        use PlanNode::*;
424        match self {
425            Constant { .. } | Get { .. } => (),
426            Let { value, body, .. } => {
427                first = Some(&mut **value);
428                second = Some(&mut **body);
429            }
430            LetRec { values, body, .. } => {
431                rest = Some(values);
432                last = Some(&mut **body);
433            }
434            Mfp { input, .. }
435            | FlatMap { input, .. }
436            | Reduce { input, .. }
437            | TopK { input, .. }
438            | Negate { input, .. }
439            | Threshold { input, .. }
440            | ArrangeBy { input, .. } => {
441                first = Some(&mut **input);
442            }
443            Join { inputs, .. } | Union { inputs, .. } => {
444                rest = Some(inputs);
445            }
446        }
447
448        first
449            .into_iter()
450            .chain(second)
451            .chain(rest.into_iter().flatten())
452            .chain(last)
453    }
454}
455
456impl PlanNode {
457    /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
458    pub fn as_plan(self, lir_id: LirId) -> Plan {
459        Plan { lir_id, node: self }
460    }
461}
462
463impl Plan {
464    /// Pretty-print this [Plan] to a string.
465    pub fn pretty(&self) -> String {
466        let config = ExplainConfig::default();
467        self.debug_explain(&config, None)
468    }
469
470    /// Pretty-print this [Plan] to a string using a custom
471    /// [ExplainConfig] and an optionally provided [ExprHumanizer].
472    /// This is intended for debugging and tests, not users.
473    pub fn debug_explain(
474        &self,
475        config: &ExplainConfig,
476        humanizer: Option<&dyn ExprHumanizer>,
477    ) -> String {
478        text_string_at(self, || PlanRenderingContext {
479            indent: Indent::default(),
480            humanizer: humanizer.unwrap_or(&DummyHumanizer),
481            annotations: BTreeMap::default(),
482            config,
483            ambiguous_ids: BTreeSet::default(),
484        })
485    }
486}
487
488/// How a `Get` stage will be rendered.
489#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
490pub enum GetPlan {
491    /// Simply pass input arrangements on to the next stage.
492    PassArrangements,
493    /// Using the supplied key, optionally seek the row, and apply the MFP.
494    Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
495    /// Scan the input collection (unarranged) and apply the MFP.
496    Collection(MapFilterProject),
497}
498
499impl Plan {
500    /// Convert the dataflow description into one that uses render plans.
501    #[mz_ore::instrument(
502        target = "optimizer",
503        level = "debug",
504        fields(path.segment = "finalize_dataflow")
505    )]
506    pub fn finalize_dataflow(
507        desc: DataflowDescription<OptimizedMirRelationExpr>,
508        features: &OptimizerFeatures,
509    ) -> Result<DataflowDescription<Self>, String> {
510        // First, we lower the dataflow description from MIR to LIR.
511        let mut dataflow = Self::lower_dataflow(desc, features)?;
512
513        // Subsequently, we perform plan refinements for the dataflow.
514        Self::refine_source_mfps(&mut dataflow);
515
516        if features.enable_consolidate_after_union_negate {
517            Self::refine_union_negate_consolidation(&mut dataflow);
518        }
519
520        if dataflow.is_single_time() {
521            Self::refine_single_time_operator_selection(&mut dataflow);
522
523            // The relaxation of the `must_consolidate` flag performs an LIR-based
524            // analysis and transform under checked recursion. By a similar argument
525            // made in `from_mir`, we do not expect the recursion limit to be hit.
526            // However, if that happens, we propagate an error to the caller.
527            // To apply the transform, we first obtain monotonic source and index
528            // global IDs and add them to a `TransformConfig` instance.
529            let monotonic_ids = dataflow
530                .source_imports
531                .iter()
532                .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
533                .chain(
534                    dataflow
535                        .index_imports
536                        .iter()
537                        .filter_map(|(_id, index_import)| {
538                            if index_import.monotonic {
539                                Some(index_import.desc.on_id)
540                            } else {
541                                None
542                            }
543                        }),
544                )
545                .collect::<BTreeSet<_>>();
546
547            let config = TransformConfig { monotonic_ids };
548            Self::refine_single_time_consolidation(&mut dataflow, &config)?;
549        }
550
551        soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
552
553        mz_repr::explain::trace_plan(&dataflow);
554
555        Ok(dataflow)
556    }
557
558    /// Lowers the dataflow description from MIR to LIR. To this end, the
559    /// method collects all available arrangements and based on this information
560    /// creates plans for every object to be built for the dataflow.
561    #[mz_ore::instrument(
562        target = "optimizer",
563        level = "debug",
564        fields(path.segment ="mir_to_lir")
565    )]
566    fn lower_dataflow(
567        desc: DataflowDescription<OptimizedMirRelationExpr>,
568        features: &OptimizerFeatures,
569    ) -> Result<DataflowDescription<Self>, String> {
570        let context = lowering::Context::new(desc.debug_name.clone(), features);
571        let dataflow = context.lower(desc)?;
572
573        mz_repr::explain::trace_plan(&dataflow);
574
575        Ok(dataflow)
576    }
577
578    /// Refines the source instance descriptions for sources imported by `dataflow` to
579    /// push down common MFP expressions.
580    #[mz_ore::instrument(
581        target = "optimizer",
582        level = "debug",
583        fields(path.segment = "refine_source_mfps")
584    )]
585    fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
586        // Extract MFPs from Get operators for sources, and extract what we can for the source.
587        // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
588        for (source_id, source_import) in dataflow.source_imports.iter_mut() {
589            let source = &mut source_import.desc;
590            let mut identity_present = false;
591            let mut mfps = Vec::new();
592            for build_desc in dataflow.objects_to_build.iter_mut() {
593                let mut todo = vec![&mut build_desc.plan];
594                while let Some(expression) = todo.pop() {
595                    let node = &mut expression.node;
596                    if let PlanNode::Get { id, plan, .. } = node {
597                        if *id == mz_expr::Id::Global(*source_id) {
598                            match plan {
599                                GetPlan::Collection(mfp) => mfps.push(mfp),
600                                GetPlan::PassArrangements => {
601                                    identity_present = true;
602                                }
603                                GetPlan::Arrangement(..) => {
604                                    panic!("Surprising `GetPlan` for imported source: {:?}", plan);
605                                }
606                            }
607                        }
608                    } else {
609                        todo.extend(node.children_mut());
610                    }
611                }
612            }
613
614            // Direct exports of sources are possible, and prevent pushdown.
615            identity_present |= dataflow
616                .index_exports
617                .values()
618                .any(|(x, _)| x.on_id == *source_id);
619            identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
620
621            if !identity_present && !mfps.is_empty() {
622                // Extract a common prefix `MapFilterProject` from `mfps`.
623                let common = MapFilterProject::extract_common(&mut mfps[..]);
624                // Apply common expressions to the source's `MapFilterProject`.
625                let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
626                    MapFilterProject::compose(mfp, common)
627                } else {
628                    common
629                };
630                mfp.optimize();
631                source.arguments.operators = Some(mfp);
632            }
633        }
634        mz_repr::explain::trace_plan(dataflow);
635    }
636
637    /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
638    #[mz_ore::instrument(
639        target = "optimizer",
640        level = "debug",
641        fields(path.segment = "refine_union_negate_consolidation")
642    )]
643    fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
644        for build_desc in dataflow.objects_to_build.iter_mut() {
645            let mut todo = vec![&mut build_desc.plan];
646            while let Some(expression) = todo.pop() {
647                let node = &mut expression.node;
648                match node {
649                    PlanNode::Union {
650                        inputs,
651                        consolidate_output,
652                        ..
653                    } => {
654                        if inputs
655                            .iter()
656                            .any(|input| matches!(input.node, PlanNode::Negate { .. }))
657                        {
658                            *consolidate_output = true;
659                        }
660                    }
661                    _ => {}
662                }
663                todo.extend(node.children_mut());
664            }
665        }
666        mz_repr::explain::trace_plan(dataflow);
667    }
668
669    /// Refines the plans of objects to be built as part of `dataflow` to take advantage
670    /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
671    /// one-shot SELECT query.
672    #[mz_ore::instrument(
673        target = "optimizer",
674        level = "debug",
675        fields(path.segment = "refine_single_time_operator_selection")
676    )]
677    fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
678        // We should only reach here if we have a one-shot SELECT query, i.e.,
679        // a single-time dataflow.
680        assert!(dataflow.is_single_time());
681
682        // Upgrade single-time plans to monotonic.
683        for build_desc in dataflow.objects_to_build.iter_mut() {
684            let mut todo = vec![&mut build_desc.plan];
685            while let Some(expression) = todo.pop() {
686                let node = &mut expression.node;
687                match node {
688                    PlanNode::Reduce { plan, .. } => {
689                        // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
690                        match plan {
691                            ReducePlan::Hierarchical(hierarchical) => {
692                                hierarchical.as_monotonic(true);
693                            }
694                            _ => {
695                                // Nothing to do for other plans, and doing nothing is safe for future variants.
696                            }
697                        }
698                        todo.extend(node.children_mut());
699                    }
700                    PlanNode::TopK { top_k_plan, .. } => {
701                        top_k_plan.as_monotonic(true);
702                        todo.extend(node.children_mut());
703                    }
704                    PlanNode::LetRec { body, .. } => {
705                        // Only the non-recursive `body` is restricted to a single time.
706                        todo.push(body);
707                    }
708                    _ => {
709                        // Nothing to do for other expressions, and doing nothing is safe for future expressions.
710                        todo.extend(node.children_mut());
711                    }
712                }
713            }
714        }
715        mz_repr::explain::trace_plan(dataflow);
716    }
717
718    /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
719    /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
720    /// whenever the input is deemed to be physically monotonic.
721    #[mz_ore::instrument(
722        target = "optimizer",
723        level = "debug",
724        fields(path.segment = "refine_single_time_consolidation")
725    )]
726    fn refine_single_time_consolidation(
727        dataflow: &mut DataflowDescription<Self>,
728        config: &TransformConfig,
729    ) -> Result<(), String> {
730        // We should only reach here if we have a one-shot SELECT query, i.e.,
731        // a single-time dataflow.
732        assert!(dataflow.is_single_time());
733
734        let transform = transform::RelaxMustConsolidate;
735        for build_desc in dataflow.objects_to_build.iter_mut() {
736            transform
737                .transform(config, &mut build_desc.plan)
738                .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
739        }
740        mz_repr::explain::trace_plan(dataflow);
741        Ok(())
742    }
743}
744
745impl CollectionPlan for PlanNode {
746    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
747        match self {
748            PlanNode::Constant { rows: _ } => (),
749            PlanNode::Get {
750                id,
751                keys: _,
752                plan: _,
753            } => match id {
754                Id::Global(id) => {
755                    out.insert(*id);
756                }
757                Id::Local(_) => (),
758            },
759            PlanNode::Let { id: _, value, body } => {
760                value.depends_on_into(out);
761                body.depends_on_into(out);
762            }
763            PlanNode::LetRec {
764                ids: _,
765                values,
766                limits: _,
767                body,
768            } => {
769                for value in values.iter() {
770                    value.depends_on_into(out);
771                }
772                body.depends_on_into(out);
773            }
774            PlanNode::Join { inputs, plan: _ }
775            | PlanNode::Union {
776                inputs,
777                consolidate_output: _,
778            } => {
779                for input in inputs {
780                    input.depends_on_into(out);
781                }
782            }
783            PlanNode::Mfp {
784                input,
785                mfp: _,
786                input_key_val: _,
787            }
788            | PlanNode::FlatMap {
789                input_key: _,
790                input,
791                exprs: _,
792                func: _,
793                mfp_after: _,
794            }
795            | PlanNode::ArrangeBy {
796                input_key: _,
797                input,
798                input_mfp: _,
799                forms: _,
800                strategy: _,
801            }
802            | PlanNode::Reduce {
803                input_key: _,
804                input,
805                key_val_plan: _,
806                plan: _,
807                mfp_after: _,
808            }
809            | PlanNode::TopK {
810                input,
811                top_k_plan: _,
812            }
813            | PlanNode::Negate { input }
814            | PlanNode::Threshold {
815                input,
816                threshold_plan: _,
817            } => {
818                input.depends_on_into(out);
819            }
820        }
821    }
822}
823
824impl CollectionPlan for Plan {
825    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
826        self.node.depends_on_into(out);
827    }
828}
829
830/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
831/// on the expected number of rows that will have the same group key.
832fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
833    // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
834    // mz_introspection.mz_expected_group_size_advice.
835    let mut buckets = vec![];
836    let mut current = 16;
837
838    // Plan for 4B records in the expected case if the user didn't specify a group size.
839    let limit = expected_group_size.unwrap_or(4_000_000_000);
840
841    // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
842    // each layer gets from the preceding layer, while also limiting the number of layers.
843    while current < limit {
844        buckets.push(current);
845        current = current.saturating_mul(16);
846    }
847
848    buckets.reverse();
849    buckets
850}