mz_compute_types/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15
16use columnar::Columnar;
17use mz_expr::{
18    CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc,
20};
21use mz_ore::soft_assert_eq_no_log;
22use mz_ore::str::Indent;
23use mz_repr::explain::text::text_string_at;
24use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Diff, GlobalId, Row};
27use serde::{Deserialize, Serialize};
28
29use crate::dataflows::DataflowDescription;
30use crate::plan::join::JoinPlan;
31use crate::plan::reduce::{KeyValPlan, ReducePlan};
32use crate::plan::threshold::ThresholdPlan;
33use crate::plan::top_k::TopKPlan;
34use crate::plan::transform::{Transform, TransformConfig};
35
36mod lowering;
37
38pub mod interpret;
39pub mod join;
40pub mod reduce;
41pub mod render_plan;
42pub mod threshold;
43pub mod top_k;
44pub mod transform;
45
46/// The forms in which an operator's output is available.
47///
48/// These forms may include "raw", meaning as a streamed collection, but also any
49/// number of "arranged" representations.
50///
51/// Each arranged representation is described by a `KeyValRowMapping`, or rather
52/// at the moment by its three fields in a triple. These fields explain how to form
53/// a "key" by applying some expressions to each row, how to select "values" from
54/// columns not explicitly captured by the key, and how to return to the original
55/// row from the concatenation of key and value. Further explanation is available
56/// in the documentation for `KeyValRowMapping`.
57#[derive(Clone, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
58pub struct AvailableCollections {
59    /// Whether the collection exists in unarranged form.
60    pub raw: bool,
61    /// The list of available arrangements, presented as a `KeyValRowMapping`,
62    /// but here represented by a triple `(to_key, to_val, to_row)` instead.
63    /// The documentation for `KeyValRowMapping` explains these fields better.
64    pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
65}
66
67impl AvailableCollections {
68    /// Represent a collection that has no arrangements.
69    pub fn new_raw() -> Self {
70        Self {
71            raw: true,
72            arranged: Vec::new(),
73        }
74    }
75
76    /// Represent a collection that is arranged in the specified ways.
77    pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
78        assert!(
79            !arranged.is_empty(),
80            "Invariant violated: at least one collection must exist"
81        );
82        Self {
83            raw: false,
84            arranged,
85        }
86    }
87
88    /// Get some arrangement, if one exists.
89    pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
90        assert!(
91            self.raw || !self.arranged.is_empty(),
92            "Invariant violated: at least one collection must exist"
93        );
94        self.arranged.get(0)
95    }
96}
97
98/// An identifier for an LIR node.
99#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
100pub struct LirId(u64);
101
102impl LirId {
103    fn as_u64(&self) -> u64 {
104        self.0
105    }
106}
107
108impl From<LirId> for u64 {
109    fn from(value: LirId) -> Self {
110        value.as_u64()
111    }
112}
113
114impl std::fmt::Display for LirId {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{}", self.0)
117    }
118}
119
120/// A rendering plan with as much conditional logic as possible removed.
121#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
122pub struct Plan<T = mz_repr::Timestamp> {
123    /// A dataflow-local identifier.
124    pub lir_id: LirId,
125    /// The underlying operator.
126    pub node: PlanNode<T>,
127}
128
129/// The actual AST node of the `Plan`.
130#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
131pub enum PlanNode<T = mz_repr::Timestamp> {
132    /// A collection containing a pre-determined collection.
133    Constant {
134        /// Explicit update triples for the collection.
135        rows: Result<Vec<(Row, T, Diff)>, EvalError>,
136    },
137    /// A reference to a bound collection.
138    ///
139    /// This is commonly either an external reference to an existing source or
140    /// maintained arrangement, or an internal reference to a `Let` identifier.
141    Get {
142        /// A global or local identifier naming the collection.
143        id: Id,
144        /// Arrangements that will be available.
145        ///
146        /// The collection will also be loaded if available, which it will
147        /// not be for imported data, but which it may be for locally defined
148        /// data.
149        // TODO: Be more explicit about whether a collection is available,
150        // although one can always produce it from an arrangement, and it
151        // seems generally advantageous to do that instead (to avoid cloning
152        // rows, by using `mfp` first on borrowed data).
153        keys: AvailableCollections,
154        /// The actions to take when introducing the collection.
155        plan: GetPlan,
156    },
157    /// Binds `value` to `id`, and then results in `body` with that binding.
158    ///
159    /// This stage has the effect of sharing `value` across multiple possible
160    /// uses in `body`, and is the only mechanism we have for sharing collection
161    /// information across parts of a dataflow.
162    ///
163    /// The binding is not available outside of `body`.
164    Let {
165        /// The local identifier to be used, available to `body` as `Id::Local(id)`.
166        id: LocalId,
167        /// The collection that should be bound to `id`.
168        value: Box<Plan<T>>,
169        /// The collection that results, which is allowed to contain `Get` stages
170        /// that reference `Id::Local(id)`.
171        body: Box<Plan<T>>,
172    },
173    /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
174    ///
175    /// All bindings are available to all bindings, and to `body`.
176    /// The contents of each binding are initially empty, and then updated through a sequence
177    /// of iterations in which each binding is updated in sequence, from the most recent values
178    /// of all bindings.
179    LetRec {
180        /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
181        ids: Vec<LocalId>,
182        /// The collection that should be bound to `id`.
183        values: Vec<Plan<T>>,
184        /// Maximum number of iterations. See further info on the MIR `LetRec`.
185        limits: Vec<Option<LetRecLimit>>,
186        /// The collection that results, which is allowed to contain `Get` stages
187        /// that reference `Id::Local(id)`.
188        body: Box<Plan<T>>,
189    },
190    /// Map, Filter, and Project operators.
191    ///
192    /// This stage contains work that we would ideally like to fuse to other plan
193    /// stages, but for practical reasons cannot. For example: threshold, topk,
194    /// and sometimes reduce stages are not able to absorb this operator.
195    Mfp {
196        /// The input collection.
197        input: Box<Plan<T>>,
198        /// Linear operator to apply to each record.
199        mfp: MapFilterProject,
200        /// Whether the input is from an arrangement, and if so,
201        /// whether we can seek to a specific value therein
202        input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
203    },
204    /// A variable number of output records for each input record.
205    ///
206    /// This stage is a bit of a catch-all for logic that does not easily fit in
207    /// map stages. This includes table valued functions, but also functions of
208    /// multiple arguments, and functions that modify the sign of updates.
209    ///
210    /// This stage allows a `MapFilterProject` operator to be fused to its output,
211    /// and this can be very important as otherwise the output of `func` is just
212    /// appended to the input record, for as many outputs as it has. This has the
213    /// unpleasant default behavior of repeating potentially large records that
214    /// are being unpacked, producing quadratic output in those cases. Instead,
215    /// in these cases use a `mfp` member that projects away these large fields.
216    FlatMap {
217        /// The particular arrangement of the input we expect to use,
218        /// if any
219        input_key: Option<Vec<MirScalarExpr>>,
220        /// The input collection.
221        input: Box<Plan<T>>,
222        /// Expressions that for each row prepare the arguments to `func`.
223        exprs: Vec<MirScalarExpr>,
224        /// The variable-record emitting function.
225        func: TableFunc,
226        /// Linear operator to apply to each record produced by `func`.
227        mfp_after: MapFilterProject,
228    },
229    /// A multiway relational equijoin, with fused map, filter, and projection.
230    ///
231    /// This stage performs a multiway join among `inputs`, using the equality
232    /// constraints expressed in `plan`. The plan also describes the implementation
233    /// strategy we will use, and any pushed down per-record work.
234    Join {
235        /// An ordered list of inputs that will be joined.
236        inputs: Vec<Plan<T>>,
237        /// Detailed information about the implementation of the join.
238        ///
239        /// This includes information about the implementation strategy, but also
240        /// any map, filter, project work that we might follow the join with, but
241        /// potentially pushed down into the implementation of the join.
242        plan: JoinPlan,
243    },
244    /// Aggregation by key.
245    Reduce {
246        /// The particular arrangement of the input we expect to use,
247        /// if any
248        input_key: Option<Vec<MirScalarExpr>>,
249        /// The input collection.
250        input: Box<Plan<T>>,
251        /// A plan for changing input records into key, value pairs.
252        key_val_plan: KeyValPlan,
253        /// A plan for performing the reduce.
254        ///
255        /// The implementation of reduction has several different strategies based
256        /// on the properties of the reduction, and the input itself. Please check
257        /// out the documentation for this type for more detail.
258        plan: ReducePlan,
259        /// An MFP that must be applied to results. The projection part of this
260        /// MFP must preserve the key for the reduction; otherwise, the results
261        /// become undefined. Additionally, the MFP must be free from temporal
262        /// predicates so that it can be readily evaluated.
263        /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
264        mfp_after: MapFilterProject,
265    },
266    /// Key-based "Top K" operator, retaining the first K records in each group.
267    TopK {
268        /// The input collection.
269        input: Box<Plan<T>>,
270        /// A plan for performing the Top-K.
271        ///
272        /// The implementation of reduction has several different strategies based
273        /// on the properties of the reduction, and the input itself. Please check
274        /// out the documentation for this type for more detail.
275        top_k_plan: TopKPlan,
276    },
277    /// Inverts the sign of each update.
278    Negate {
279        /// The input collection.
280        input: Box<Plan<T>>,
281    },
282    /// Filters records that accumulate negatively.
283    ///
284    /// Although the operator suppresses updates, it is a stateful operator taking
285    /// resources proportional to the number of records with non-zero accumulation.
286    Threshold {
287        /// The input collection.
288        input: Box<Plan<T>>,
289        /// A plan for performing the threshold.
290        ///
291        /// The implementation of reduction has several different strategies based
292        /// on the properties of the reduction, and the input itself. Please check
293        /// out the documentation for this type for more detail.
294        threshold_plan: ThresholdPlan,
295    },
296    /// Adds the contents of the input collections.
297    ///
298    /// Importantly, this is *multiset* union, so the multiplicities of records will
299    /// add. This is in contrast to *set* union, where the multiplicities would be
300    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
301    /// implementing the "distinct" operator.
302    Union {
303        /// The input collections
304        inputs: Vec<Plan<T>>,
305        /// Whether to consolidate the output, e.g., cancel negated records.
306        consolidate_output: bool,
307    },
308    /// The `input` plan, but with additional arrangements.
309    ///
310    /// This operator does not change the logical contents of `input`, but ensures
311    /// that certain arrangements are available in the results. This operator can
312    /// be important for e.g. the `Join` stage which benefits from multiple arrangements
313    /// or to cap a `Plan` so that indexes can be exported.
314    ArrangeBy {
315        /// The key that must be used to access the input.
316        input_key: Option<Vec<MirScalarExpr>>,
317        /// The input collection.
318        input: Box<Plan<T>>,
319        /// The MFP that must be applied to the input.
320        input_mfp: MapFilterProject,
321        /// A list of arrangement keys, and possibly a raw collection,
322        /// that will be added to those of the input. Does not include
323        /// any other existing arrangements.
324        forms: AvailableCollections,
325    },
326}
327
328impl<T> PlanNode<T> {
329    /// Iterates through references to child expressions.
330    pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
331        let mut first = None;
332        let mut second = None;
333        let mut rest = None;
334        let mut last = None;
335
336        use PlanNode::*;
337        match self {
338            Constant { .. } | Get { .. } => (),
339            Let { value, body, .. } => {
340                first = Some(&**value);
341                second = Some(&**body);
342            }
343            LetRec { values, body, .. } => {
344                rest = Some(values);
345                last = Some(&**body);
346            }
347            Mfp { input, .. }
348            | FlatMap { input, .. }
349            | Reduce { input, .. }
350            | TopK { input, .. }
351            | Negate { input, .. }
352            | Threshold { input, .. }
353            | ArrangeBy { input, .. } => {
354                first = Some(&**input);
355            }
356            Join { inputs, .. } | Union { inputs, .. } => {
357                rest = Some(inputs);
358            }
359        }
360
361        first
362            .into_iter()
363            .chain(second)
364            .chain(rest.into_iter().flatten())
365            .chain(last)
366    }
367
368    /// Iterates through mutable references to child expressions.
369    pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
370        let mut first = None;
371        let mut second = None;
372        let mut rest = None;
373        let mut last = None;
374
375        use PlanNode::*;
376        match self {
377            Constant { .. } | Get { .. } => (),
378            Let { value, body, .. } => {
379                first = Some(&mut **value);
380                second = Some(&mut **body);
381            }
382            LetRec { values, body, .. } => {
383                rest = Some(values);
384                last = Some(&mut **body);
385            }
386            Mfp { input, .. }
387            | FlatMap { input, .. }
388            | Reduce { input, .. }
389            | TopK { input, .. }
390            | Negate { input, .. }
391            | Threshold { input, .. }
392            | ArrangeBy { input, .. } => {
393                first = Some(&mut **input);
394            }
395            Join { inputs, .. } | Union { inputs, .. } => {
396                rest = Some(inputs);
397            }
398        }
399
400        first
401            .into_iter()
402            .chain(second)
403            .chain(rest.into_iter().flatten())
404            .chain(last)
405    }
406}
407
408impl<T> PlanNode<T> {
409    /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
410    pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
411        Plan { lir_id, node: self }
412    }
413}
414
415impl Plan {
416    /// Pretty-print this [Plan] to a string.
417    pub fn pretty(&self) -> String {
418        let config = ExplainConfig::default();
419        self.explain(&config, None)
420    }
421
422    /// Pretty-print this [Plan] to a string using a custom
423    /// [ExplainConfig] and an optionally provided [ExprHumanizer].
424    pub fn explain(&self, config: &ExplainConfig, humanizer: Option<&dyn ExprHumanizer>) -> String {
425        text_string_at(self, || PlanRenderingContext {
426            indent: Indent::default(),
427            humanizer: humanizer.unwrap_or(&DummyHumanizer),
428            annotations: BTreeMap::default(),
429            config,
430        })
431    }
432}
433
434/// How a `Get` stage will be rendered.
435#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
436pub enum GetPlan {
437    /// Simply pass input arrangements on to the next stage.
438    PassArrangements,
439    /// Using the supplied key, optionally seek the row, and apply the MFP.
440    Arrangement(Vec<MirScalarExpr>, Option<Row>, MapFilterProject),
441    /// Scan the input collection (unarranged) and apply the MFP.
442    Collection(MapFilterProject),
443}
444
445impl<T: timely::progress::Timestamp> Plan<T> {
446    /// Convert the dataflow description into one that uses render plans.
447    #[mz_ore::instrument(
448        target = "optimizer",
449        level = "debug",
450        fields(path.segment = "finalize_dataflow")
451    )]
452    pub fn finalize_dataflow(
453        desc: DataflowDescription<OptimizedMirRelationExpr>,
454        features: &OptimizerFeatures,
455    ) -> Result<DataflowDescription<Self>, String> {
456        // First, we lower the dataflow description from MIR to LIR.
457        let mut dataflow = Self::lower_dataflow(desc, features)?;
458
459        // Subsequently, we perform plan refinements for the dataflow.
460        Self::refine_source_mfps(&mut dataflow);
461
462        if features.enable_consolidate_after_union_negate {
463            Self::refine_union_negate_consolidation(&mut dataflow);
464        }
465
466        if dataflow.is_single_time() {
467            Self::refine_single_time_operator_selection(&mut dataflow);
468
469            // The relaxation of the `must_consolidate` flag performs an LIR-based
470            // analysis and transform under checked recursion. By a similar argument
471            // made in `from_mir`, we do not expect the recursion limit to be hit.
472            // However, if that happens, we propagate an error to the caller.
473            // To apply the transform, we first obtain monotonic source and index
474            // global IDs and add them to a `TransformConfig` instance.
475            let monotonic_ids = dataflow
476                .source_imports
477                .iter()
478                .filter_map(
479                    |(id, (_, monotonic, _upper))| if *monotonic { Some(*id) } else { None },
480                )
481                .chain(
482                    dataflow
483                        .index_imports
484                        .iter()
485                        .filter_map(|(_id, index_import)| {
486                            if index_import.monotonic {
487                                Some(index_import.desc.on_id)
488                            } else {
489                                None
490                            }
491                        }),
492                )
493                .collect::<BTreeSet<_>>();
494
495            let config = TransformConfig { monotonic_ids };
496            Self::refine_single_time_consolidation(&mut dataflow, &config)?;
497        }
498
499        soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
500
501        mz_repr::explain::trace_plan(&dataflow);
502
503        Ok(dataflow)
504    }
505
506    /// Lowers the dataflow description from MIR to LIR. To this end, the
507    /// method collects all available arrangements and based on this information
508    /// creates plans for every object to be built for the dataflow.
509    #[mz_ore::instrument(
510        target = "optimizer",
511        level = "debug",
512        fields(path.segment ="mir_to_lir")
513    )]
514    fn lower_dataflow(
515        desc: DataflowDescription<OptimizedMirRelationExpr>,
516        features: &OptimizerFeatures,
517    ) -> Result<DataflowDescription<Self>, String> {
518        let context = lowering::Context::new(desc.debug_name.clone(), features);
519        let dataflow = context.lower(desc)?;
520
521        mz_repr::explain::trace_plan(&dataflow);
522
523        Ok(dataflow)
524    }
525
526    /// Refines the source instance descriptions for sources imported by `dataflow` to
527    /// push down common MFP expressions.
528    #[mz_ore::instrument(
529        target = "optimizer",
530        level = "debug",
531        fields(path.segment = "refine_source_mfps")
532    )]
533    fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
534        // Extract MFPs from Get operators for sources, and extract what we can for the source.
535        // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
536        for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
537            let mut identity_present = false;
538            let mut mfps = Vec::new();
539            for build_desc in dataflow.objects_to_build.iter_mut() {
540                let mut todo = vec![&mut build_desc.plan];
541                while let Some(expression) = todo.pop() {
542                    let node = &mut expression.node;
543                    if let PlanNode::Get { id, plan, .. } = node {
544                        if *id == mz_expr::Id::Global(*source_id) {
545                            match plan {
546                                GetPlan::Collection(mfp) => mfps.push(mfp),
547                                GetPlan::PassArrangements => {
548                                    identity_present = true;
549                                }
550                                GetPlan::Arrangement(..) => {
551                                    panic!("Surprising `GetPlan` for imported source: {:?}", plan);
552                                }
553                            }
554                        }
555                    } else {
556                        todo.extend(node.children_mut());
557                    }
558                }
559            }
560
561            // Direct exports of sources are possible, and prevent pushdown.
562            identity_present |= dataflow
563                .index_exports
564                .values()
565                .any(|(x, _)| x.on_id == *source_id);
566            identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
567
568            if !identity_present && !mfps.is_empty() {
569                // Extract a common prefix `MapFilterProject` from `mfps`.
570                let common = MapFilterProject::extract_common(&mut mfps[..]);
571                // Apply common expressions to the source's `MapFilterProject`.
572                let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
573                    MapFilterProject::compose(mfp, common)
574                } else {
575                    common
576                };
577                mfp.optimize();
578                source.arguments.operators = Some(mfp);
579            }
580        }
581        mz_repr::explain::trace_plan(dataflow);
582    }
583
584    /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
585    #[mz_ore::instrument(
586        target = "optimizer",
587        level = "debug",
588        fields(path.segment = "refine_union_negate_consolidation")
589    )]
590    fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
591        for build_desc in dataflow.objects_to_build.iter_mut() {
592            let mut todo = vec![&mut build_desc.plan];
593            while let Some(expression) = todo.pop() {
594                let node = &mut expression.node;
595                match node {
596                    PlanNode::Union {
597                        inputs,
598                        consolidate_output,
599                        ..
600                    } => {
601                        if inputs
602                            .iter()
603                            .any(|input| matches!(input.node, PlanNode::Negate { .. }))
604                        {
605                            *consolidate_output = true;
606                        }
607                    }
608                    _ => {}
609                }
610                todo.extend(node.children_mut());
611            }
612        }
613        mz_repr::explain::trace_plan(dataflow);
614    }
615
616    /// Refines the plans of objects to be built as part of `dataflow` to take advantage
617    /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
618    /// one-shot SELECT query.
619    #[mz_ore::instrument(
620        target = "optimizer",
621        level = "debug",
622        fields(path.segment = "refine_single_time_operator_selection")
623    )]
624    fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
625        // We should only reach here if we have a one-shot SELECT query, i.e.,
626        // a single-time dataflow.
627        assert!(dataflow.is_single_time());
628
629        // Upgrade single-time plans to monotonic.
630        for build_desc in dataflow.objects_to_build.iter_mut() {
631            let mut todo = vec![&mut build_desc.plan];
632            while let Some(expression) = todo.pop() {
633                let node = &mut expression.node;
634                match node {
635                    PlanNode::Reduce { plan, .. } => {
636                        // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
637                        match plan {
638                            ReducePlan::Collation(collation) => {
639                                collation.as_monotonic(true);
640                            }
641                            ReducePlan::Hierarchical(hierarchical) => {
642                                hierarchical.as_monotonic(true);
643                            }
644                            _ => {
645                                // Nothing to do for other plans, and doing nothing is safe for future variants.
646                            }
647                        }
648                        todo.extend(node.children_mut());
649                    }
650                    PlanNode::TopK { top_k_plan, .. } => {
651                        top_k_plan.as_monotonic(true);
652                        todo.extend(node.children_mut());
653                    }
654                    PlanNode::LetRec { body, .. } => {
655                        // Only the non-recursive `body` is restricted to a single time.
656                        todo.push(body);
657                    }
658                    _ => {
659                        // Nothing to do for other expressions, and doing nothing is safe for future expressions.
660                        todo.extend(node.children_mut());
661                    }
662                }
663            }
664        }
665        mz_repr::explain::trace_plan(dataflow);
666    }
667
668    /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
669    /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
670    /// whenever the input is deemed to be physically monotonic.
671    #[mz_ore::instrument(
672        target = "optimizer",
673        level = "debug",
674        fields(path.segment = "refine_single_time_consolidation")
675    )]
676    fn refine_single_time_consolidation(
677        dataflow: &mut DataflowDescription<Self>,
678        config: &TransformConfig,
679    ) -> Result<(), String> {
680        // We should only reach here if we have a one-shot SELECT query, i.e.,
681        // a single-time dataflow.
682        assert!(dataflow.is_single_time());
683
684        let transform = transform::RelaxMustConsolidate::<T>::new();
685        for build_desc in dataflow.objects_to_build.iter_mut() {
686            transform
687                .transform(config, &mut build_desc.plan)
688                .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
689        }
690        mz_repr::explain::trace_plan(dataflow);
691        Ok(())
692    }
693}
694
695impl<T> CollectionPlan for PlanNode<T> {
696    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
697        match self {
698            PlanNode::Constant { rows: _ } => (),
699            PlanNode::Get {
700                id,
701                keys: _,
702                plan: _,
703            } => match id {
704                Id::Global(id) => {
705                    out.insert(*id);
706                }
707                Id::Local(_) => (),
708            },
709            PlanNode::Let { id: _, value, body } => {
710                value.depends_on_into(out);
711                body.depends_on_into(out);
712            }
713            PlanNode::LetRec {
714                ids: _,
715                values,
716                limits: _,
717                body,
718            } => {
719                for value in values.iter() {
720                    value.depends_on_into(out);
721                }
722                body.depends_on_into(out);
723            }
724            PlanNode::Join { inputs, plan: _ }
725            | PlanNode::Union {
726                inputs,
727                consolidate_output: _,
728            } => {
729                for input in inputs {
730                    input.depends_on_into(out);
731                }
732            }
733            PlanNode::Mfp {
734                input,
735                mfp: _,
736                input_key_val: _,
737            }
738            | PlanNode::FlatMap {
739                input_key: _,
740                input,
741                exprs: _,
742                func: _,
743                mfp_after: _,
744            }
745            | PlanNode::ArrangeBy {
746                input_key: _,
747                input,
748                input_mfp: _,
749                forms: _,
750            }
751            | PlanNode::Reduce {
752                input_key: _,
753                input,
754                key_val_plan: _,
755                plan: _,
756                mfp_after: _,
757            }
758            | PlanNode::TopK {
759                input,
760                top_k_plan: _,
761            }
762            | PlanNode::Negate { input }
763            | PlanNode::Threshold {
764                input,
765                threshold_plan: _,
766            } => {
767                input.depends_on_into(out);
768            }
769        }
770    }
771}
772
773impl<T> CollectionPlan for Plan<T> {
774    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
775        self.node.depends_on_into(out);
776    }
777}
778
779/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
780/// on the expected number of rows that will have the same group key.
781fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
782    // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
783    // mz_introspection.mz_expected_group_size_advice.
784    let mut buckets = vec![];
785    let mut current = 16;
786
787    // Plan for 4B records in the expected case if the user didn't specify a group size.
788    let limit = expected_group_size.unwrap_or(4_000_000_000);
789
790    // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
791    // each layer gets from the preceding layer, while also limiting the number of layers.
792    while current < limit {
793        buckets.push(current);
794        current = current.saturating_mul(16);
795    }
796
797    buckets.reverse();
798    buckets
799}