Skip to main content

mz_compute_types/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18    CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(
58    Clone,
59    Debug,
60    Default,
61    Deserialize,
62    Eq,
63    Ord,
64    PartialEq,
65    PartialOrd,
66    Serialize
67)]
68pub struct AvailableCollections {
69    /// Whether the collection exists in unarranged form.
70    pub raw: bool,
71    /// The list of available arrangements, presented as a `KeyValRowMapping`,
72    /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73    /// The documentation for `KeyValRowMapping` explains these fields better.
74    pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
75}
76
77impl AvailableCollections {
78    /// Represent a collection that has no arrangements.
79    pub fn new_raw() -> Self {
80        Self {
81            raw: true,
82            arranged: Vec::new(),
83        }
84    }
85
86    /// Represent a collection that is arranged in the specified ways.
87    pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
88        assert!(
89            !arranged.is_empty(),
90            "Invariant violated: at least one collection must exist"
91        );
92        Self {
93            raw: false,
94            arranged,
95        }
96    }
97
98    /// Get some arrangement, if one exists.
99    pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
100        assert!(
101            self.raw || !self.arranged.is_empty(),
102            "Invariant violated: at least one collection must exist"
103        );
104        self.arranged.get(0)
105    }
106}
107
108/// An identifier for an LIR node.
109#[derive(
110    Clone,
111    Copy,
112    Debug,
113    Deserialize,
114    Eq,
115    Ord,
116    PartialEq,
117    PartialOrd,
118    Serialize,
119    Columnar
120)]
121pub struct LirId(u64);
122
123impl LirId {
124    fn as_u64(&self) -> u64 {
125        self.0
126    }
127}
128
129impl From<LirId> for u64 {
130    fn from(value: LirId) -> Self {
131        value.as_u64()
132    }
133}
134
135impl std::fmt::Display for LirId {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        write!(f, "{}", self.0)
138    }
139}
140
141/// A rendering plan with as much conditional logic as possible removed.
142#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
143pub struct Plan<T = mz_repr::Timestamp> {
144    /// A dataflow-local identifier.
145    pub lir_id: LirId,
146    /// The underlying operator.
147    pub node: PlanNode<T>,
148}
149
150/// The actual AST node of the `Plan`.
151#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
152pub enum PlanNode<T = mz_repr::Timestamp> {
153    /// A collection containing a pre-determined collection.
154    Constant {
155        /// Explicit update triples for the collection.
156        rows: Result<Vec<(Row, T, Diff)>, EvalError>,
157    },
158    /// A reference to a bound collection.
159    ///
160    /// This is commonly either an external reference to an existing source or
161    /// maintained arrangement, or an internal reference to a `Let` identifier.
162    Get {
163        /// A global or local identifier naming the collection.
164        id: Id,
165        /// Arrangements that will be available.
166        ///
167        /// The collection will also be loaded if available, which it will
168        /// not be for imported data, but which it may be for locally defined
169        /// data.
170        // TODO: Be more explicit about whether a collection is available,
171        // although one can always produce it from an arrangement, and it
172        // seems generally advantageous to do that instead (to avoid cloning
173        // rows, by using `mfp` first on borrowed data).
174        keys: AvailableCollections,
175        /// The actions to take when introducing the collection.
176        plan: GetPlan,
177    },
178    /// Binds `value` to `id`, and then results in `body` with that binding.
179    ///
180    /// This stage has the effect of sharing `value` across multiple possible
181    /// uses in `body`, and is the only mechanism we have for sharing collection
182    /// information across parts of a dataflow.
183    ///
184    /// The binding is not available outside of `body`.
185    Let {
186        /// The local identifier to be used, available to `body` as `Id::Local(id)`.
187        id: LocalId,
188        /// The collection that should be bound to `id`.
189        value: Box<Plan<T>>,
190        /// The collection that results, which is allowed to contain `Get` stages
191        /// that reference `Id::Local(id)`.
192        body: Box<Plan<T>>,
193    },
194    /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
195    ///
196    /// All bindings are available to all bindings, and to `body`.
197    /// The contents of each binding are initially empty, and then updated through a sequence
198    /// of iterations in which each binding is updated in sequence, from the most recent values
199    /// of all bindings.
200    LetRec {
201        /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
202        ids: Vec<LocalId>,
203        /// The collection that should be bound to `id`.
204        values: Vec<Plan<T>>,
205        /// Maximum number of iterations. See further info on the MIR `LetRec`.
206        limits: Vec<Option<LetRecLimit>>,
207        /// The collection that results, which is allowed to contain `Get` stages
208        /// that reference `Id::Local(id)`.
209        body: Box<Plan<T>>,
210    },
211    /// Map, Filter, and Project operators.
212    ///
213    /// This stage contains work that we would ideally like to fuse to other plan
214    /// stages, but for practical reasons cannot. For example: threshold, topk,
215    /// and sometimes reduce stages are not able to absorb this operator.
216    Mfp {
217        /// The input collection.
218        input: Box<Plan<T>>,
219        /// Linear operator to apply to each record.
220        mfp: MapFilterProject,
221        /// Whether the input is from an arrangement, and if so,
222        /// whether we can seek to a specific value therein
223        input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
224    },
225    /// A variable number of output records for each input record.
226    ///
227    /// This stage is a bit of a catch-all for logic that does not easily fit in
228    /// map stages. This includes table valued functions, but also functions of
229    /// multiple arguments, and functions that modify the sign of updates.
230    ///
231    /// This stage allows a `MapFilterProject` operator to be fused to its output,
232    /// and this can be very important as otherwise the output of `func` is just
233    /// appended to the input record, for as many outputs as it has. This has the
234    /// unpleasant default behavior of repeating potentially large records that
235    /// are being unpacked, producing quadratic output in those cases. Instead,
236    /// in these cases use a `mfp` member that projects away these large fields.
237    FlatMap {
238        /// The particular arrangement of the input we expect to use,
239        /// if any
240        input_key: Option<Vec<MirScalarExpr>>,
241        /// The input collection.
242        input: Box<Plan<T>>,
243        /// Expressions that for each row prepare the arguments to `func`.
244        exprs: Vec<MirScalarExpr>,
245        /// The variable-record emitting function.
246        func: TableFunc,
247        /// Linear operator to apply to each record produced by `func`.
248        mfp_after: MapFilterProject,
249    },
250    /// A multiway relational equijoin, with fused map, filter, and projection.
251    ///
252    /// This stage performs a multiway join among `inputs`, using the equality
253    /// constraints expressed in `plan`. The plan also describes the implementation
254    /// strategy we will use, and any pushed down per-record work.
255    Join {
256        /// An ordered list of inputs that will be joined.
257        inputs: Vec<Plan<T>>,
258        /// Detailed information about the implementation of the join.
259        ///
260        /// This includes information about the implementation strategy, but also
261        /// any map, filter, project work that we might follow the join with, but
262        /// potentially pushed down into the implementation of the join.
263        plan: JoinPlan,
264    },
265    /// Aggregation by key.
266    Reduce {
267        /// The particular arrangement of the input we expect to use,
268        /// if any
269        input_key: Option<Vec<MirScalarExpr>>,
270        /// The input collection.
271        input: Box<Plan<T>>,
272        /// A plan for changing input records into key, value pairs.
273        key_val_plan: KeyValPlan,
274        /// A plan for performing the reduce.
275        ///
276        /// The implementation of reduction has several different strategies based
277        /// on the properties of the reduction, and the input itself. Please check
278        /// out the documentation for this type for more detail.
279        plan: ReducePlan,
280        /// An MFP that must be applied to results. The projection part of this
281        /// MFP must preserve the key for the reduction; otherwise, the results
282        /// become undefined. Additionally, the MFP must be free from temporal
283        /// predicates so that it can be readily evaluated.
284        /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
285        mfp_after: MapFilterProject,
286    },
287    /// Key-based "Top K" operator, retaining the first K records in each group.
288    TopK {
289        /// The input collection.
290        input: Box<Plan<T>>,
291        /// A plan for performing the Top-K.
292        ///
293        /// The implementation of reduction has several different strategies based
294        /// on the properties of the reduction, and the input itself. Please check
295        /// out the documentation for this type for more detail.
296        top_k_plan: TopKPlan,
297    },
298    /// Inverts the sign of each update.
299    Negate {
300        /// The input collection.
301        input: Box<Plan<T>>,
302    },
303    /// Filters records that accumulate negatively.
304    ///
305    /// Although the operator suppresses updates, it is a stateful operator taking
306    /// resources proportional to the number of records with non-zero accumulation.
307    Threshold {
308        /// The input collection.
309        input: Box<Plan<T>>,
310        /// A plan for performing the threshold.
311        ///
312        /// The implementation of reduction has several different strategies based
313        /// on the properties of the reduction, and the input itself. Please check
314        /// out the documentation for this type for more detail.
315        threshold_plan: ThresholdPlan,
316    },
317    /// Adds the contents of the input collections.
318    ///
319    /// Importantly, this is *multiset* union, so the multiplicities of records will
320    /// add. This is in contrast to *set* union, where the multiplicities would be
321    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
322    /// implementing the "distinct" operator.
323    Union {
324        /// The input collections
325        inputs: Vec<Plan<T>>,
326        /// Whether to consolidate the output, e.g., cancel negated records.
327        consolidate_output: bool,
328    },
329    /// The `input` plan, but with additional arrangements.
330    ///
331    /// This operator does not change the logical contents of `input`, but ensures
332    /// that certain arrangements are available in the results. This operator can
333    /// be important for e.g. the `Join` stage which benefits from multiple arrangements
334    /// or to cap a `Plan` so that indexes can be exported.
335    ArrangeBy {
336        /// The key that must be used to access the input.
337        input_key: Option<Vec<MirScalarExpr>>,
338        /// The input collection.
339        input: Box<Plan<T>>,
340        /// The MFP that must be applied to the input.
341        input_mfp: MapFilterProject,
342        /// A list of arrangement keys, and possibly a raw collection,
343        /// that will be added to those of the input. Does not include
344        /// any other existing arrangements.
345        forms: AvailableCollections,
346    },
347}
348
349impl<T> PlanNode<T> {
350    /// Iterates through references to child expressions.
351    pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
352        let mut first = None;
353        let mut second = None;
354        let mut rest = None;
355        let mut last = None;
356
357        use PlanNode::*;
358        match self {
359            Constant { .. } | Get { .. } => (),
360            Let { value, body, .. } => {
361                first = Some(&**value);
362                second = Some(&**body);
363            }
364            LetRec { values, body, .. } => {
365                rest = Some(values);
366                last = Some(&**body);
367            }
368            Mfp { input, .. }
369            | FlatMap { input, .. }
370            | Reduce { input, .. }
371            | TopK { input, .. }
372            | Negate { input, .. }
373            | Threshold { input, .. }
374            | ArrangeBy { input, .. } => {
375                first = Some(&**input);
376            }
377            Join { inputs, .. } | Union { inputs, .. } => {
378                rest = Some(inputs);
379            }
380        }
381
382        first
383            .into_iter()
384            .chain(second)
385            .chain(rest.into_iter().flatten())
386            .chain(last)
387    }
388
389    /// Iterates through mutable references to child expressions.
390    pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
391        let mut first = None;
392        let mut second = None;
393        let mut rest = None;
394        let mut last = None;
395
396        use PlanNode::*;
397        match self {
398            Constant { .. } | Get { .. } => (),
399            Let { value, body, .. } => {
400                first = Some(&mut **value);
401                second = Some(&mut **body);
402            }
403            LetRec { values, body, .. } => {
404                rest = Some(values);
405                last = Some(&mut **body);
406            }
407            Mfp { input, .. }
408            | FlatMap { input, .. }
409            | Reduce { input, .. }
410            | TopK { input, .. }
411            | Negate { input, .. }
412            | Threshold { input, .. }
413            | ArrangeBy { input, .. } => {
414                first = Some(&mut **input);
415            }
416            Join { inputs, .. } | Union { inputs, .. } => {
417                rest = Some(inputs);
418            }
419        }
420
421        first
422            .into_iter()
423            .chain(second)
424            .chain(rest.into_iter().flatten())
425            .chain(last)
426    }
427}
428
429impl<T> PlanNode<T> {
430    /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
431    pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
432        Plan { lir_id, node: self }
433    }
434}
435
436impl Plan {
437    /// Pretty-print this [Plan] to a string.
438    pub fn pretty(&self) -> String {
439        let config = ExplainConfig::default();
440        self.debug_explain(&config, None)
441    }
442
443    /// Pretty-print this [Plan] to a string using a custom
444    /// [ExplainConfig] and an optionally provided [ExprHumanizer].
445    /// This is intended for debugging and tests, not users.
446    pub fn debug_explain(
447        &self,
448        config: &ExplainConfig,
449        humanizer: Option<&dyn ExprHumanizer>,
450    ) -> String {
451        text_string_at(self, || PlanRenderingContext {
452            indent: Indent::default(),
453            humanizer: humanizer.unwrap_or(&DummyHumanizer),
454            annotations: BTreeMap::default(),
455            config,
456            ambiguous_ids: BTreeSet::default(),
457        })
458    }
459}
460
461/// How a `Get` stage will be rendered.
462#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
463pub enum GetPlan {
464    /// Simply pass input arrangements on to the next stage.
465    PassArrangements,
466    /// Using the supplied key, optionally seek the row, and apply the MFP.
467    Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
468    /// Scan the input collection (unarranged) and apply the MFP.
469    Collection(MapFilterProject),
470}
471
472impl<T: timely::progress::Timestamp> Plan<T> {
473    /// Convert the dataflow description into one that uses render plans.
474    #[mz_ore::instrument(
475        target = "optimizer",
476        level = "debug",
477        fields(path.segment = "finalize_dataflow")
478    )]
479    pub fn finalize_dataflow(
480        desc: DataflowDescription<OptimizedMirRelationExpr>,
481        features: &OptimizerFeatures,
482    ) -> Result<DataflowDescription<Self>, String> {
483        // First, we lower the dataflow description from MIR to LIR.
484        let mut dataflow = Self::lower_dataflow(desc, features)?;
485
486        // Subsequently, we perform plan refinements for the dataflow.
487        Self::refine_source_mfps(&mut dataflow);
488
489        if features.enable_consolidate_after_union_negate {
490            Self::refine_union_negate_consolidation(&mut dataflow);
491        }
492
493        if dataflow.is_single_time() {
494            Self::refine_single_time_operator_selection(&mut dataflow);
495
496            // The relaxation of the `must_consolidate` flag performs an LIR-based
497            // analysis and transform under checked recursion. By a similar argument
498            // made in `from_mir`, we do not expect the recursion limit to be hit.
499            // However, if that happens, we propagate an error to the caller.
500            // To apply the transform, we first obtain monotonic source and index
501            // global IDs and add them to a `TransformConfig` instance.
502            let monotonic_ids = dataflow
503                .source_imports
504                .iter()
505                .filter_map(|(id, source_import)| source_import.monotonic.then_some(*id))
506                .chain(
507                    dataflow
508                        .index_imports
509                        .iter()
510                        .filter_map(|(_id, index_import)| {
511                            if index_import.monotonic {
512                                Some(index_import.desc.on_id)
513                            } else {
514                                None
515                            }
516                        }),
517                )
518                .collect::<BTreeSet<_>>();
519
520            let config = TransformConfig { monotonic_ids };
521            Self::refine_single_time_consolidation(&mut dataflow, &config)?;
522        }
523
524        soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
525
526        mz_repr::explain::trace_plan(&dataflow);
527
528        Ok(dataflow)
529    }
530
531    /// Lowers the dataflow description from MIR to LIR. To this end, the
532    /// method collects all available arrangements and based on this information
533    /// creates plans for every object to be built for the dataflow.
534    #[mz_ore::instrument(
535        target = "optimizer",
536        level = "debug",
537        fields(path.segment ="mir_to_lir")
538    )]
539    fn lower_dataflow(
540        desc: DataflowDescription<OptimizedMirRelationExpr>,
541        features: &OptimizerFeatures,
542    ) -> Result<DataflowDescription<Self>, String> {
543        let context = lowering::Context::new(desc.debug_name.clone(), features);
544        let dataflow = context.lower(desc)?;
545
546        mz_repr::explain::trace_plan(&dataflow);
547
548        Ok(dataflow)
549    }
550
551    /// Refines the source instance descriptions for sources imported by `dataflow` to
552    /// push down common MFP expressions.
553    #[mz_ore::instrument(
554        target = "optimizer",
555        level = "debug",
556        fields(path.segment = "refine_source_mfps")
557    )]
558    fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
559        // Extract MFPs from Get operators for sources, and extract what we can for the source.
560        // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
561        for (source_id, source_import) in dataflow.source_imports.iter_mut() {
562            let source = &mut source_import.desc;
563            let mut identity_present = false;
564            let mut mfps = Vec::new();
565            for build_desc in dataflow.objects_to_build.iter_mut() {
566                let mut todo = vec![&mut build_desc.plan];
567                while let Some(expression) = todo.pop() {
568                    let node = &mut expression.node;
569                    if let PlanNode::Get { id, plan, .. } = node {
570                        if *id == mz_expr::Id::Global(*source_id) {
571                            match plan {
572                                GetPlan::Collection(mfp) => mfps.push(mfp),
573                                GetPlan::PassArrangements => {
574                                    identity_present = true;
575                                }
576                                GetPlan::Arrangement(..) => {
577                                    panic!("Surprising `GetPlan` for imported source: {:?}", plan);
578                                }
579                            }
580                        }
581                    } else {
582                        todo.extend(node.children_mut());
583                    }
584                }
585            }
586
587            // Direct exports of sources are possible, and prevent pushdown.
588            identity_present |= dataflow
589                .index_exports
590                .values()
591                .any(|(x, _)| x.on_id == *source_id);
592            identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
593
594            if !identity_present && !mfps.is_empty() {
595                // Extract a common prefix `MapFilterProject` from `mfps`.
596                let common = MapFilterProject::extract_common(&mut mfps[..]);
597                // Apply common expressions to the source's `MapFilterProject`.
598                let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
599                    MapFilterProject::compose(mfp, common)
600                } else {
601                    common
602                };
603                mfp.optimize();
604                source.arguments.operators = Some(mfp);
605            }
606        }
607        mz_repr::explain::trace_plan(dataflow);
608    }
609
610    /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
611    #[mz_ore::instrument(
612        target = "optimizer",
613        level = "debug",
614        fields(path.segment = "refine_union_negate_consolidation")
615    )]
616    fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
617        for build_desc in dataflow.objects_to_build.iter_mut() {
618            let mut todo = vec![&mut build_desc.plan];
619            while let Some(expression) = todo.pop() {
620                let node = &mut expression.node;
621                match node {
622                    PlanNode::Union {
623                        inputs,
624                        consolidate_output,
625                        ..
626                    } => {
627                        if inputs
628                            .iter()
629                            .any(|input| matches!(input.node, PlanNode::Negate { .. }))
630                        {
631                            *consolidate_output = true;
632                        }
633                    }
634                    _ => {}
635                }
636                todo.extend(node.children_mut());
637            }
638        }
639        mz_repr::explain::trace_plan(dataflow);
640    }
641
642    /// Refines the plans of objects to be built as part of `dataflow` to take advantage
643    /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
644    /// one-shot SELECT query.
645    #[mz_ore::instrument(
646        target = "optimizer",
647        level = "debug",
648        fields(path.segment = "refine_single_time_operator_selection")
649    )]
650    fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
651        // We should only reach here if we have a one-shot SELECT query, i.e.,
652        // a single-time dataflow.
653        assert!(dataflow.is_single_time());
654
655        // Upgrade single-time plans to monotonic.
656        for build_desc in dataflow.objects_to_build.iter_mut() {
657            let mut todo = vec![&mut build_desc.plan];
658            while let Some(expression) = todo.pop() {
659                let node = &mut expression.node;
660                match node {
661                    PlanNode::Reduce { plan, .. } => {
662                        // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
663                        match plan {
664                            ReducePlan::Hierarchical(hierarchical) => {
665                                hierarchical.as_monotonic(true);
666                            }
667                            _ => {
668                                // Nothing to do for other plans, and doing nothing is safe for future variants.
669                            }
670                        }
671                        todo.extend(node.children_mut());
672                    }
673                    PlanNode::TopK { top_k_plan, .. } => {
674                        top_k_plan.as_monotonic(true);
675                        todo.extend(node.children_mut());
676                    }
677                    PlanNode::LetRec { body, .. } => {
678                        // Only the non-recursive `body` is restricted to a single time.
679                        todo.push(body);
680                    }
681                    _ => {
682                        // Nothing to do for other expressions, and doing nothing is safe for future expressions.
683                        todo.extend(node.children_mut());
684                    }
685                }
686            }
687        }
688        mz_repr::explain::trace_plan(dataflow);
689    }
690
691    /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
692    /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
693    /// whenever the input is deemed to be physically monotonic.
694    #[mz_ore::instrument(
695        target = "optimizer",
696        level = "debug",
697        fields(path.segment = "refine_single_time_consolidation")
698    )]
699    fn refine_single_time_consolidation(
700        dataflow: &mut DataflowDescription<Self>,
701        config: &TransformConfig,
702    ) -> Result<(), String> {
703        // We should only reach here if we have a one-shot SELECT query, i.e.,
704        // a single-time dataflow.
705        assert!(dataflow.is_single_time());
706
707        let transform = transform::RelaxMustConsolidate::<T>::new();
708        for build_desc in dataflow.objects_to_build.iter_mut() {
709            transform
710                .transform(config, &mut build_desc.plan)
711                .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
712        }
713        mz_repr::explain::trace_plan(dataflow);
714        Ok(())
715    }
716}
717
718impl<T> CollectionPlan for PlanNode<T> {
719    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
720        match self {
721            PlanNode::Constant { rows: _ } => (),
722            PlanNode::Get {
723                id,
724                keys: _,
725                plan: _,
726            } => match id {
727                Id::Global(id) => {
728                    out.insert(*id);
729                }
730                Id::Local(_) => (),
731            },
732            PlanNode::Let { id: _, value, body } => {
733                value.depends_on_into(out);
734                body.depends_on_into(out);
735            }
736            PlanNode::LetRec {
737                ids: _,
738                values,
739                limits: _,
740                body,
741            } => {
742                for value in values.iter() {
743                    value.depends_on_into(out);
744                }
745                body.depends_on_into(out);
746            }
747            PlanNode::Join { inputs, plan: _ }
748            | PlanNode::Union {
749                inputs,
750                consolidate_output: _,
751            } => {
752                for input in inputs {
753                    input.depends_on_into(out);
754                }
755            }
756            PlanNode::Mfp {
757                input,
758                mfp: _,
759                input_key_val: _,
760            }
761            | PlanNode::FlatMap {
762                input_key: _,
763                input,
764                exprs: _,
765                func: _,
766                mfp_after: _,
767            }
768            | PlanNode::ArrangeBy {
769                input_key: _,
770                input,
771                input_mfp: _,
772                forms: _,
773            }
774            | PlanNode::Reduce {
775                input_key: _,
776                input,
777                key_val_plan: _,
778                plan: _,
779                mfp_after: _,
780            }
781            | PlanNode::TopK {
782                input,
783                top_k_plan: _,
784            }
785            | PlanNode::Negate { input }
786            | PlanNode::Threshold {
787                input,
788                threshold_plan: _,
789            } => {
790                input.depends_on_into(out);
791            }
792        }
793    }
794}
795
796impl<T> CollectionPlan for Plan<T> {
797    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
798        self.node.depends_on_into(out);
799    }
800}
801
802/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
803/// on the expected number of rows that will have the same group key.
804fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
805    // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
806    // mz_introspection.mz_expected_group_size_advice.
807    let mut buckets = vec![];
808    let mut current = 16;
809
810    // Plan for 4B records in the expected case if the user didn't specify a group size.
811    let limit = expected_group_size.unwrap_or(4_000_000_000);
812
813    // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
814    // each layer gets from the preceding layer, while also limiting the number of layers.
815    while current < limit {
816        buckets.push(current);
817        current = current.saturating_mul(16);
818    }
819
820    buckets.reverse();
821    buckets
822}