mz_compute_types/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU64;
16
17use columnar::Columnar;
18use mz_expr::{
19    CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
20    OptimizedMirRelationExpr, TableFunc,
21};
22use mz_ore::soft_assert_eq_no_log;
23use mz_ore::str::Indent;
24use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
25use mz_repr::explain::text::text_string_at;
26use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
27use mz_repr::optimize::OptimizerFeatures;
28use mz_repr::{Diff, GlobalId, Row};
29use proptest::arbitrary::Arbitrary;
30use proptest::prelude::*;
31use proptest::strategy::Strategy;
32use proptest_derive::Arbitrary;
33use serde::{Deserialize, Serialize};
34
35use crate::dataflows::DataflowDescription;
36use crate::plan::join::JoinPlan;
37use crate::plan::reduce::{KeyValPlan, ReducePlan};
38use crate::plan::threshold::ThresholdPlan;
39use crate::plan::top_k::TopKPlan;
40use crate::plan::transform::{Transform, TransformConfig};
41
42mod lowering;
43
44pub mod interpret;
45pub mod join;
46pub mod reduce;
47pub mod render_plan;
48pub mod threshold;
49pub mod top_k;
50pub mod transform;
51
52include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.rs"));
53
54/// The forms in which an operator's output is available.
55///
56/// These forms may include "raw", meaning as a streamed collection, but also any
57/// number of "arranged" representations.
58///
59/// Each arranged representation is described by a `KeyValRowMapping`, or rather
60/// at the moment by its three fields in a triple. These fields explain how to form
61/// a "key" by applying some expressions to each row, how to select "values" from
62/// columns not explicitly captured by the key, and how to return to the original
63/// row from the concatenation of key and value. Further explanation is available
64/// in the documentation for `KeyValRowMapping`.
65#[derive(
66    Arbitrary, Clone, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize,
67)]
68pub struct AvailableCollections {
69    /// Whether the collection exists in unarranged form.
70    pub raw: bool,
71    /// The list of available arrangements, presented as a `KeyValRowMapping`,
72    /// but here represented by a triple `(to_key, to_val, to_row)` instead.
73    /// The documentation for `KeyValRowMapping` explains these fields better.
74    #[proptest(strategy = "prop::collection::vec(any_arranged_thin(), 0..3)")]
75    pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
76}
77
78/// A strategy that produces arrangements that are thinner than the default. That is
79/// the number of direct children is limited to a maximum of 3.
80pub(crate) fn any_arranged_thin()
81-> impl Strategy<Value = (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
82    (
83        prop::collection::vec(MirScalarExpr::arbitrary(), 0..3),
84        Vec::<usize>::arbitrary(),
85        Vec::<usize>::arbitrary(),
86    )
87}
88
89impl RustType<ProtoAvailableCollections> for AvailableCollections {
90    fn into_proto(&self) -> ProtoAvailableCollections {
91        ProtoAvailableCollections {
92            raw: self.raw,
93            arranged: self.arranged.into_proto(),
94        }
95    }
96
97    fn from_proto(x: ProtoAvailableCollections) -> Result<Self, TryFromProtoError> {
98        Ok({
99            Self {
100                raw: x.raw,
101                arranged: x.arranged.into_rust()?,
102            }
103        })
104    }
105}
106
107impl AvailableCollections {
108    /// Represent a collection that has no arrangements.
109    pub fn new_raw() -> Self {
110        Self {
111            raw: true,
112            arranged: Vec::new(),
113        }
114    }
115
116    /// Represent a collection that is arranged in the specified ways.
117    pub fn new_arranged(arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>) -> Self {
118        assert!(
119            !arranged.is_empty(),
120            "Invariant violated: at least one collection must exist"
121        );
122        Self {
123            raw: false,
124            arranged,
125        }
126    }
127
128    /// Get some arrangement, if one exists.
129    pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
130        assert!(
131            self.raw || !self.arranged.is_empty(),
132            "Invariant violated: at least one collection must exist"
133        );
134        self.arranged.get(0)
135    }
136}
137
138/// An identifier for an LIR node.
139#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
140pub struct LirId(u64);
141
142impl LirId {
143    fn as_u64(&self) -> u64 {
144        self.0
145    }
146}
147
148impl From<LirId> for u64 {
149    fn from(value: LirId) -> Self {
150        value.as_u64()
151    }
152}
153
154impl std::fmt::Display for LirId {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        write!(f, "{}", self.0)
157    }
158}
159
160impl RustType<u64> for LirId {
161    fn into_proto(&self) -> u64 {
162        self.0
163    }
164
165    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
166        Ok(Self(proto))
167    }
168}
169
170/// A rendering plan with as much conditional logic as possible removed.
171#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
172pub struct Plan<T = mz_repr::Timestamp> {
173    /// A dataflow-local identifier.
174    pub lir_id: LirId,
175    /// The underlying operator.
176    pub node: PlanNode<T>,
177}
178
179/// The actual AST node of the `Plan`.
180#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
181pub enum PlanNode<T = mz_repr::Timestamp> {
182    /// A collection containing a pre-determined collection.
183    Constant {
184        /// Explicit update triples for the collection.
185        rows: Result<Vec<(Row, T, Diff)>, EvalError>,
186    },
187    /// A reference to a bound collection.
188    ///
189    /// This is commonly either an external reference to an existing source or
190    /// maintained arrangement, or an internal reference to a `Let` identifier.
191    Get {
192        /// A global or local identifier naming the collection.
193        id: Id,
194        /// Arrangements that will be available.
195        ///
196        /// The collection will also be loaded if available, which it will
197        /// not be for imported data, but which it may be for locally defined
198        /// data.
199        // TODO: Be more explicit about whether a collection is available,
200        // although one can always produce it from an arrangement, and it
201        // seems generally advantageous to do that instead (to avoid cloning
202        // rows, by using `mfp` first on borrowed data).
203        keys: AvailableCollections,
204        /// The actions to take when introducing the collection.
205        plan: GetPlan,
206    },
207    /// Binds `value` to `id`, and then results in `body` with that binding.
208    ///
209    /// This stage has the effect of sharing `value` across multiple possible
210    /// uses in `body`, and is the only mechanism we have for sharing collection
211    /// information across parts of a dataflow.
212    ///
213    /// The binding is not available outside of `body`.
214    Let {
215        /// The local identifier to be used, available to `body` as `Id::Local(id)`.
216        id: LocalId,
217        /// The collection that should be bound to `id`.
218        value: Box<Plan<T>>,
219        /// The collection that results, which is allowed to contain `Get` stages
220        /// that reference `Id::Local(id)`.
221        body: Box<Plan<T>>,
222    },
223    /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
224    ///
225    /// All bindings are available to all bindings, and to `body`.
226    /// The contents of each binding are initially empty, and then updated through a sequence
227    /// of iterations in which each binding is updated in sequence, from the most recent values
228    /// of all bindings.
229    LetRec {
230        /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
231        ids: Vec<LocalId>,
232        /// The collection that should be bound to `id`.
233        values: Vec<Plan<T>>,
234        /// Maximum number of iterations. See further info on the MIR `LetRec`.
235        limits: Vec<Option<LetRecLimit>>,
236        /// The collection that results, which is allowed to contain `Get` stages
237        /// that reference `Id::Local(id)`.
238        body: Box<Plan<T>>,
239    },
240    /// Map, Filter, and Project operators.
241    ///
242    /// This stage contains work that we would ideally like to fuse to other plan
243    /// stages, but for practical reasons cannot. For example: threshold, topk,
244    /// and sometimes reduce stages are not able to absorb this operator.
245    Mfp {
246        /// The input collection.
247        input: Box<Plan<T>>,
248        /// Linear operator to apply to each record.
249        mfp: MapFilterProject,
250        /// Whether the input is from an arrangement, and if so,
251        /// whether we can seek to a specific value therein
252        input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
253    },
254    /// A variable number of output records for each input record.
255    ///
256    /// This stage is a bit of a catch-all for logic that does not easily fit in
257    /// map stages. This includes table valued functions, but also functions of
258    /// multiple arguments, and functions that modify the sign of updates.
259    ///
260    /// This stage allows a `MapFilterProject` operator to be fused to its output,
261    /// and this can be very important as otherwise the output of `func` is just
262    /// appended to the input record, for as many outputs as it has. This has the
263    /// unpleasant default behavior of repeating potentially large records that
264    /// are being unpacked, producing quadratic output in those cases. Instead,
265    /// in these cases use a `mfp` member that projects away these large fields.
266    FlatMap {
267        /// The particular arrangement of the input we expect to use,
268        /// if any
269        input_key: Option<Vec<MirScalarExpr>>,
270        /// The input collection.
271        input: Box<Plan<T>>,
272        /// Expressions that for each row prepare the arguments to `func`.
273        exprs: Vec<MirScalarExpr>,
274        /// The variable-record emitting function.
275        func: TableFunc,
276        /// Linear operator to apply to each record produced by `func`.
277        mfp_after: MapFilterProject,
278    },
279    /// A multiway relational equijoin, with fused map, filter, and projection.
280    ///
281    /// This stage performs a multiway join among `inputs`, using the equality
282    /// constraints expressed in `plan`. The plan also describes the implementation
283    /// strategy we will use, and any pushed down per-record work.
284    Join {
285        /// An ordered list of inputs that will be joined.
286        inputs: Vec<Plan<T>>,
287        /// Detailed information about the implementation of the join.
288        ///
289        /// This includes information about the implementation strategy, but also
290        /// any map, filter, project work that we might follow the join with, but
291        /// potentially pushed down into the implementation of the join.
292        plan: JoinPlan,
293    },
294    /// Aggregation by key.
295    Reduce {
296        /// The particular arrangement of the input we expect to use,
297        /// if any
298        input_key: Option<Vec<MirScalarExpr>>,
299        /// The input collection.
300        input: Box<Plan<T>>,
301        /// A plan for changing input records into key, value pairs.
302        key_val_plan: KeyValPlan,
303        /// A plan for performing the reduce.
304        ///
305        /// The implementation of reduction has several different strategies based
306        /// on the properties of the reduction, and the input itself. Please check
307        /// out the documentation for this type for more detail.
308        plan: ReducePlan,
309        /// An MFP that must be applied to results. The projection part of this
310        /// MFP must preserve the key for the reduction; otherwise, the results
311        /// become undefined. Additionally, the MFP must be free from temporal
312        /// predicates so that it can be readily evaluated.
313        /// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
314        mfp_after: MapFilterProject,
315    },
316    /// Key-based "Top K" operator, retaining the first K records in each group.
317    TopK {
318        /// The input collection.
319        input: Box<Plan<T>>,
320        /// A plan for performing the Top-K.
321        ///
322        /// The implementation of reduction has several different strategies based
323        /// on the properties of the reduction, and the input itself. Please check
324        /// out the documentation for this type for more detail.
325        top_k_plan: TopKPlan,
326    },
327    /// Inverts the sign of each update.
328    Negate {
329        /// The input collection.
330        input: Box<Plan<T>>,
331    },
332    /// Filters records that accumulate negatively.
333    ///
334    /// Although the operator suppresses updates, it is a stateful operator taking
335    /// resources proportional to the number of records with non-zero accumulation.
336    Threshold {
337        /// The input collection.
338        input: Box<Plan<T>>,
339        /// A plan for performing the threshold.
340        ///
341        /// The implementation of reduction has several different strategies based
342        /// on the properties of the reduction, and the input itself. Please check
343        /// out the documentation for this type for more detail.
344        threshold_plan: ThresholdPlan,
345    },
346    /// Adds the contents of the input collections.
347    ///
348    /// Importantly, this is *multiset* union, so the multiplicities of records will
349    /// add. This is in contrast to *set* union, where the multiplicities would be
350    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
351    /// implementing the "distinct" operator.
352    Union {
353        /// The input collections
354        inputs: Vec<Plan<T>>,
355        /// Whether to consolidate the output, e.g., cancel negated records.
356        consolidate_output: bool,
357    },
358    /// The `input` plan, but with additional arrangements.
359    ///
360    /// This operator does not change the logical contents of `input`, but ensures
361    /// that certain arrangements are available in the results. This operator can
362    /// be important for e.g. the `Join` stage which benefits from multiple arrangements
363    /// or to cap a `Plan` so that indexes can be exported.
364    ArrangeBy {
365        /// The key that must be used to access the input.
366        input_key: Option<Vec<MirScalarExpr>>,
367        /// The input collection.
368        input: Box<Plan<T>>,
369        /// The MFP that must be applied to the input.
370        input_mfp: MapFilterProject,
371        /// A list of arrangement keys, and possibly a raw collection,
372        /// that will be added to those of the input. Does not include
373        /// any other existing arrangements.
374        forms: AvailableCollections,
375    },
376}
377
378impl<T> PlanNode<T> {
379    /// Iterates through references to child expressions.
380    pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
381        let mut first = None;
382        let mut second = None;
383        let mut rest = None;
384        let mut last = None;
385
386        use PlanNode::*;
387        match self {
388            Constant { .. } | Get { .. } => (),
389            Let { value, body, .. } => {
390                first = Some(&**value);
391                second = Some(&**body);
392            }
393            LetRec { values, body, .. } => {
394                rest = Some(values);
395                last = Some(&**body);
396            }
397            Mfp { input, .. }
398            | FlatMap { input, .. }
399            | Reduce { input, .. }
400            | TopK { input, .. }
401            | Negate { input, .. }
402            | Threshold { input, .. }
403            | ArrangeBy { input, .. } => {
404                first = Some(&**input);
405            }
406            Join { inputs, .. } | Union { inputs, .. } => {
407                rest = Some(inputs);
408            }
409        }
410
411        first
412            .into_iter()
413            .chain(second)
414            .chain(rest.into_iter().flatten())
415            .chain(last)
416    }
417
418    /// Iterates through mutable references to child expressions.
419    pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
420        let mut first = None;
421        let mut second = None;
422        let mut rest = None;
423        let mut last = None;
424
425        use PlanNode::*;
426        match self {
427            Constant { .. } | Get { .. } => (),
428            Let { value, body, .. } => {
429                first = Some(&mut **value);
430                second = Some(&mut **body);
431            }
432            LetRec { values, body, .. } => {
433                rest = Some(values);
434                last = Some(&mut **body);
435            }
436            Mfp { input, .. }
437            | FlatMap { input, .. }
438            | Reduce { input, .. }
439            | TopK { input, .. }
440            | Negate { input, .. }
441            | Threshold { input, .. }
442            | ArrangeBy { input, .. } => {
443                first = Some(&mut **input);
444            }
445            Join { inputs, .. } | Union { inputs, .. } => {
446                rest = Some(inputs);
447            }
448        }
449
450        first
451            .into_iter()
452            .chain(second)
453            .chain(rest.into_iter().flatten())
454            .chain(last)
455    }
456}
457
458impl<T> PlanNode<T> {
459    /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
460    pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
461        Plan { lir_id, node: self }
462    }
463}
464
465impl Plan {
466    /// Pretty-print this [Plan] to a string.
467    pub fn pretty(&self) -> String {
468        let config = ExplainConfig::default();
469        self.explain(&config, None)
470    }
471
472    /// Pretty-print this [Plan] to a string using a custom
473    /// [ExplainConfig] and an optionally provided [ExprHumanizer].
474    pub fn explain(&self, config: &ExplainConfig, humanizer: Option<&dyn ExprHumanizer>) -> String {
475        text_string_at(self, || PlanRenderingContext {
476            indent: Indent::default(),
477            humanizer: humanizer.unwrap_or(&DummyHumanizer),
478            annotations: BTreeMap::default(),
479            config,
480        })
481    }
482}
483
484impl Arbitrary for LirId {
485    type Strategy = BoxedStrategy<LirId>;
486    type Parameters = ();
487
488    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
489        let lir_id = u64::arbitrary();
490        lir_id.prop_map(LirId).boxed()
491    }
492}
493
494impl Arbitrary for Plan {
495    type Strategy = BoxedStrategy<Plan>;
496    type Parameters = ();
497
498    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
499        let row_diff = prop::collection::vec(
500            (
501                Row::arbitrary_with((1..5).into()),
502                mz_repr::Timestamp::arbitrary(),
503                Diff::arbitrary(),
504            ),
505            0..2,
506        );
507        let rows = prop::result::maybe_ok(row_diff, EvalError::arbitrary());
508        let constant = (rows, any::<LirId>()).prop_map(|(rows, lir_id)| {
509            PlanNode::<mz_repr::Timestamp>::Constant { rows }.as_plan(lir_id)
510        });
511
512        let get = (
513            any::<GlobalId>(),
514            any::<AvailableCollections>(),
515            any::<GetPlan>(),
516            any::<LirId>(),
517        )
518            .prop_map(|(id, keys, plan, lir_id)| {
519                PlanNode::<mz_repr::Timestamp>::Get {
520                    id: Id::Global(id),
521                    keys,
522                    plan,
523                }
524                .as_plan(lir_id)
525            });
526
527        let leaf = prop::strategy::Union::new(vec![constant.boxed(), get.boxed()]).boxed();
528
529        leaf.prop_recursive(2, 4, 5, |inner| {
530            prop::strategy::Union::new(vec![
531                //Plan::Let
532                (
533                    any::<LocalId>(),
534                    inner.clone(),
535                    inner.clone(),
536                    any::<LirId>(),
537                )
538                    .prop_map(|(id, value, body, lir_id)| {
539                        PlanNode::<mz_repr::Timestamp>::Let {
540                            id,
541                            value: value.into(),
542                            body: body.into(),
543                        }
544                        .as_plan(lir_id)
545                    })
546                    .boxed(),
547                //Plan::Mfp
548                (
549                    inner.clone(),
550                    any::<MapFilterProject>(),
551                    any::<Option<(Vec<MirScalarExpr>, Option<Row>)>>(),
552                    any::<LirId>(),
553                )
554                    .prop_map(|(input, mfp, input_key_val, lir_id)| {
555                        PlanNode::Mfp {
556                            input: input.into(),
557                            mfp,
558                            input_key_val,
559                        }
560                        .as_plan(lir_id)
561                    })
562                    .boxed(),
563                //Plan::FlatMap
564                (
565                    inner.clone(),
566                    any::<TableFunc>(),
567                    any::<Vec<MirScalarExpr>>(),
568                    any::<MapFilterProject>(),
569                    any::<Option<Vec<MirScalarExpr>>>(),
570                    any::<LirId>(),
571                )
572                    .prop_map(|(input, func, exprs, mfp, input_key, lir_id)| {
573                        PlanNode::FlatMap {
574                            input_key,
575                            input: input.into(),
576                            exprs,
577                            func,
578                            mfp_after: mfp,
579                        }
580                        .as_plan(lir_id)
581                    })
582                    .boxed(),
583                //Plan::Join
584                (
585                    prop::collection::vec(inner.clone(), 0..2),
586                    any::<JoinPlan>(),
587                    any::<LirId>(),
588                )
589                    .prop_map(|(inputs, plan, lir_id)| {
590                        PlanNode::Join { inputs, plan }.as_plan(lir_id)
591                    })
592                    .boxed(),
593                //Plan::Reduce
594                (
595                    inner.clone(),
596                    any::<KeyValPlan>(),
597                    any::<ReducePlan>(),
598                    any::<Option<Vec<MirScalarExpr>>>(),
599                    any::<MapFilterProject>(),
600                    any::<LirId>(),
601                )
602                    .prop_map(
603                        |(input, key_val_plan, plan, input_key, mfp_after, lir_id)| {
604                            PlanNode::Reduce {
605                                input_key,
606                                input: input.into(),
607                                key_val_plan,
608                                plan,
609                                mfp_after,
610                            }
611                            .as_plan(lir_id)
612                        },
613                    )
614                    .boxed(),
615                //Plan::TopK
616                (inner.clone(), any::<TopKPlan>(), any::<LirId>())
617                    .prop_map(|(input, top_k_plan, lir_id)| {
618                        PlanNode::TopK {
619                            input: input.into(),
620                            top_k_plan,
621                        }
622                        .as_plan(lir_id)
623                    })
624                    .boxed(),
625                //Plan::Negate
626                (inner.clone(), any::<LirId>())
627                    .prop_map(|(x, lir_id)| PlanNode::Negate { input: x.into() }.as_plan(lir_id))
628                    .boxed(),
629                //Plan::Threshold
630                (inner.clone(), any::<ThresholdPlan>(), any::<LirId>())
631                    .prop_map(|(input, threshold_plan, lir_id)| {
632                        PlanNode::Threshold {
633                            input: input.into(),
634                            threshold_plan,
635                        }
636                        .as_plan(lir_id)
637                    })
638                    .boxed(),
639                // Plan::Union
640                (
641                    prop::collection::vec(inner.clone(), 0..2),
642                    any::<bool>(),
643                    any::<LirId>(),
644                )
645                    .prop_map(|(x, b, lir_id)| {
646                        PlanNode::Union {
647                            inputs: x,
648                            consolidate_output: b,
649                        }
650                        .as_plan(lir_id)
651                    })
652                    .boxed(),
653                //Plan::ArrangeBy
654                (
655                    inner,
656                    any::<AvailableCollections>(),
657                    any::<Option<Vec<MirScalarExpr>>>(),
658                    any::<MapFilterProject>(),
659                    any::<LirId>(),
660                )
661                    .prop_map(|(input, forms, input_key, input_mfp, lir_id)| {
662                        PlanNode::ArrangeBy {
663                            input_key,
664                            input: input.into(),
665                            input_mfp,
666                            forms,
667                        }
668                        .as_plan(lir_id)
669                    })
670                    .boxed(),
671            ])
672        })
673        .boxed()
674    }
675}
676
677/// How a `Get` stage will be rendered.
678#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
679pub enum GetPlan {
680    /// Simply pass input arrangements on to the next stage.
681    PassArrangements,
682    /// Using the supplied key, optionally seek the row, and apply the MFP.
683    Arrangement(
684        #[proptest(strategy = "prop::collection::vec(MirScalarExpr::arbitrary(), 0..3)")]
685        Vec<MirScalarExpr>,
686        Option<Row>,
687        MapFilterProject,
688    ),
689    /// Scan the input collection (unarranged) and apply the MFP.
690    Collection(MapFilterProject),
691}
692
693impl RustType<ProtoGetPlan> for GetPlan {
694    fn into_proto(&self) -> ProtoGetPlan {
695        use proto_get_plan::Kind::*;
696
697        ProtoGetPlan {
698            kind: Some(match self {
699                GetPlan::PassArrangements => PassArrangements(()),
700                GetPlan::Arrangement(k, s, m) => {
701                    Arrangement(proto_get_plan::ProtoGetPlanArrangement {
702                        key: k.into_proto(),
703                        seek: s.into_proto(),
704                        mfp: Some(m.into_proto()),
705                    })
706                }
707                GetPlan::Collection(mfp) => Collection(mfp.into_proto()),
708            }),
709        }
710    }
711
712    fn from_proto(proto: ProtoGetPlan) -> Result<Self, TryFromProtoError> {
713        use proto_get_plan::Kind::*;
714        use proto_get_plan::ProtoGetPlanArrangement;
715        match proto.kind {
716            Some(PassArrangements(())) => Ok(GetPlan::PassArrangements),
717            Some(Arrangement(ProtoGetPlanArrangement { key, seek, mfp })) => {
718                Ok(GetPlan::Arrangement(
719                    key.into_rust()?,
720                    seek.into_rust()?,
721                    mfp.into_rust_if_some("ProtoGetPlanArrangement::mfp")?,
722                ))
723            }
724            Some(Collection(mfp)) => Ok(GetPlan::Collection(mfp.into_rust()?)),
725            None => Err(TryFromProtoError::missing_field("ProtoGetPlan::kind")),
726        }
727    }
728}
729
730impl RustType<ProtoLetRecLimit> for LetRecLimit {
731    fn into_proto(&self) -> ProtoLetRecLimit {
732        ProtoLetRecLimit {
733            max_iters: self.max_iters.get(),
734            return_at_limit: self.return_at_limit,
735        }
736    }
737
738    fn from_proto(proto: ProtoLetRecLimit) -> Result<Self, TryFromProtoError> {
739        Ok(LetRecLimit {
740            max_iters: NonZeroU64::new(proto.max_iters).expect("max_iters > 0"),
741            return_at_limit: proto.return_at_limit,
742        })
743    }
744}
745
746impl<T: timely::progress::Timestamp> Plan<T> {
747    /// Convert the dataflow description into one that uses render plans.
748    #[mz_ore::instrument(
749        target = "optimizer",
750        level = "debug",
751        fields(path.segment = "finalize_dataflow")
752    )]
753    pub fn finalize_dataflow(
754        desc: DataflowDescription<OptimizedMirRelationExpr>,
755        features: &OptimizerFeatures,
756    ) -> Result<DataflowDescription<Self>, String> {
757        // First, we lower the dataflow description from MIR to LIR.
758        let mut dataflow = Self::lower_dataflow(desc, features)?;
759
760        // Subsequently, we perform plan refinements for the dataflow.
761        Self::refine_source_mfps(&mut dataflow);
762
763        if features.enable_consolidate_after_union_negate {
764            Self::refine_union_negate_consolidation(&mut dataflow);
765        }
766
767        if dataflow.is_single_time() {
768            Self::refine_single_time_operator_selection(&mut dataflow);
769
770            // The relaxation of the `must_consolidate` flag performs an LIR-based
771            // analysis and transform under checked recursion. By a similar argument
772            // made in `from_mir`, we do not expect the recursion limit to be hit.
773            // However, if that happens, we propagate an error to the caller.
774            // To apply the transform, we first obtain monotonic source and index
775            // global IDs and add them to a `TransformConfig` instance.
776            let monotonic_ids = dataflow
777                .source_imports
778                .iter()
779                .filter_map(|(id, (_, monotonic, _upper))| if *monotonic { Some(id) } else { None })
780                .chain(
781                    dataflow
782                        .index_imports
783                        .iter()
784                        .filter_map(|(id, index_import)| {
785                            if index_import.monotonic {
786                                Some(id)
787                            } else {
788                                None
789                            }
790                        }),
791                )
792                .cloned()
793                .collect::<BTreeSet<_>>();
794
795            let config = TransformConfig { monotonic_ids };
796            Self::refine_single_time_consolidation(&mut dataflow, &config)?;
797        }
798
799        soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
800
801        mz_repr::explain::trace_plan(&dataflow);
802
803        Ok(dataflow)
804    }
805
806    /// Lowers the dataflow description from MIR to LIR. To this end, the
807    /// method collects all available arrangements and based on this information
808    /// creates plans for every object to be built for the dataflow.
809    #[mz_ore::instrument(
810        target = "optimizer",
811        level = "debug",
812        fields(path.segment ="mir_to_lir")
813    )]
814    fn lower_dataflow(
815        desc: DataflowDescription<OptimizedMirRelationExpr>,
816        features: &OptimizerFeatures,
817    ) -> Result<DataflowDescription<Self>, String> {
818        let context = lowering::Context::new(desc.debug_name.clone(), features);
819        let dataflow = context.lower(desc)?;
820
821        mz_repr::explain::trace_plan(&dataflow);
822
823        Ok(dataflow)
824    }
825
826    /// Refines the source instance descriptions for sources imported by `dataflow` to
827    /// push down common MFP expressions.
828    #[mz_ore::instrument(
829        target = "optimizer",
830        level = "debug",
831        fields(path.segment = "refine_source_mfps")
832    )]
833    fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
834        // Extract MFPs from Get operators for sources, and extract what we can for the source.
835        // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
836        for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
837            let mut identity_present = false;
838            let mut mfps = Vec::new();
839            for build_desc in dataflow.objects_to_build.iter_mut() {
840                let mut todo = vec![&mut build_desc.plan];
841                while let Some(expression) = todo.pop() {
842                    let node = &mut expression.node;
843                    if let PlanNode::Get { id, plan, .. } = node {
844                        if *id == mz_expr::Id::Global(*source_id) {
845                            match plan {
846                                GetPlan::Collection(mfp) => mfps.push(mfp),
847                                GetPlan::PassArrangements => {
848                                    identity_present = true;
849                                }
850                                GetPlan::Arrangement(..) => {
851                                    panic!("Surprising `GetPlan` for imported source: {:?}", plan);
852                                }
853                            }
854                        }
855                    } else {
856                        todo.extend(node.children_mut());
857                    }
858                }
859            }
860
861            // Direct exports of sources are possible, and prevent pushdown.
862            identity_present |= dataflow
863                .index_exports
864                .values()
865                .any(|(x, _)| x.on_id == *source_id);
866            identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
867
868            if !identity_present && !mfps.is_empty() {
869                // Extract a common prefix `MapFilterProject` from `mfps`.
870                let common = MapFilterProject::extract_common(&mut mfps[..]);
871                // Apply common expressions to the source's `MapFilterProject`.
872                let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
873                    MapFilterProject::compose(mfp, common)
874                } else {
875                    common
876                };
877                mfp.optimize();
878                source.arguments.operators = Some(mfp);
879            }
880        }
881        mz_repr::explain::trace_plan(dataflow);
882    }
883
884    /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
885    #[mz_ore::instrument(
886        target = "optimizer",
887        level = "debug",
888        fields(path.segment = "refine_union_negate_consolidation")
889    )]
890    fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
891        for build_desc in dataflow.objects_to_build.iter_mut() {
892            let mut todo = vec![&mut build_desc.plan];
893            while let Some(expression) = todo.pop() {
894                let node = &mut expression.node;
895                match node {
896                    PlanNode::Union {
897                        inputs,
898                        consolidate_output,
899                        ..
900                    } => {
901                        if inputs
902                            .iter()
903                            .any(|input| matches!(input.node, PlanNode::Negate { .. }))
904                        {
905                            *consolidate_output = true;
906                        }
907                    }
908                    _ => {}
909                }
910                todo.extend(node.children_mut());
911            }
912        }
913        mz_repr::explain::trace_plan(dataflow);
914    }
915
916    /// Refines the plans of objects to be built as part of `dataflow` to take advantage
917    /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
918    /// one-shot SELECT query.
919    #[mz_ore::instrument(
920        target = "optimizer",
921        level = "debug",
922        fields(path.segment = "refine_single_time_operator_selection")
923    )]
924    fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
925        // We should only reach here if we have a one-shot SELECT query, i.e.,
926        // a single-time dataflow.
927        assert!(dataflow.is_single_time());
928
929        // Upgrade single-time plans to monotonic.
930        for build_desc in dataflow.objects_to_build.iter_mut() {
931            let mut todo = vec![&mut build_desc.plan];
932            while let Some(expression) = todo.pop() {
933                let node = &mut expression.node;
934                match node {
935                    PlanNode::Reduce { plan, .. } => {
936                        // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
937                        match plan {
938                            ReducePlan::Collation(collation) => {
939                                collation.as_monotonic(true);
940                            }
941                            ReducePlan::Hierarchical(hierarchical) => {
942                                hierarchical.as_monotonic(true);
943                            }
944                            _ => {
945                                // Nothing to do for other plans, and doing nothing is safe for future variants.
946                            }
947                        }
948                        todo.extend(node.children_mut());
949                    }
950                    PlanNode::TopK { top_k_plan, .. } => {
951                        top_k_plan.as_monotonic(true);
952                        todo.extend(node.children_mut());
953                    }
954                    PlanNode::LetRec { body, .. } => {
955                        // Only the non-recursive `body` is restricted to a single time.
956                        todo.push(body);
957                    }
958                    _ => {
959                        // Nothing to do for other expressions, and doing nothing is safe for future expressions.
960                        todo.extend(node.children_mut());
961                    }
962                }
963            }
964        }
965        mz_repr::explain::trace_plan(dataflow);
966    }
967
968    /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
969    /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
970    /// whenever the input is deemed to be physically monotonic.
971    #[mz_ore::instrument(
972        target = "optimizer",
973        level = "debug",
974        fields(path.segment = "refine_single_time_consolidation")
975    )]
976    fn refine_single_time_consolidation(
977        dataflow: &mut DataflowDescription<Self>,
978        config: &TransformConfig,
979    ) -> Result<(), String> {
980        // We should only reach here if we have a one-shot SELECT query, i.e.,
981        // a single-time dataflow.
982        assert!(dataflow.is_single_time());
983
984        let transform = transform::RelaxMustConsolidate::<T>::new();
985        for build_desc in dataflow.objects_to_build.iter_mut() {
986            transform
987                .transform(config, &mut build_desc.plan)
988                .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
989        }
990        mz_repr::explain::trace_plan(dataflow);
991        Ok(())
992    }
993}
994
995impl<T> CollectionPlan for PlanNode<T> {
996    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
997        match self {
998            PlanNode::Constant { rows: _ } => (),
999            PlanNode::Get {
1000                id,
1001                keys: _,
1002                plan: _,
1003            } => match id {
1004                Id::Global(id) => {
1005                    out.insert(*id);
1006                }
1007                Id::Local(_) => (),
1008            },
1009            PlanNode::Let { id: _, value, body } => {
1010                value.depends_on_into(out);
1011                body.depends_on_into(out);
1012            }
1013            PlanNode::LetRec {
1014                ids: _,
1015                values,
1016                limits: _,
1017                body,
1018            } => {
1019                for value in values.iter() {
1020                    value.depends_on_into(out);
1021                }
1022                body.depends_on_into(out);
1023            }
1024            PlanNode::Join { inputs, plan: _ }
1025            | PlanNode::Union {
1026                inputs,
1027                consolidate_output: _,
1028            } => {
1029                for input in inputs {
1030                    input.depends_on_into(out);
1031                }
1032            }
1033            PlanNode::Mfp {
1034                input,
1035                mfp: _,
1036                input_key_val: _,
1037            }
1038            | PlanNode::FlatMap {
1039                input_key: _,
1040                input,
1041                exprs: _,
1042                func: _,
1043                mfp_after: _,
1044            }
1045            | PlanNode::ArrangeBy {
1046                input_key: _,
1047                input,
1048                input_mfp: _,
1049                forms: _,
1050            }
1051            | PlanNode::Reduce {
1052                input_key: _,
1053                input,
1054                key_val_plan: _,
1055                plan: _,
1056                mfp_after: _,
1057            }
1058            | PlanNode::TopK {
1059                input,
1060                top_k_plan: _,
1061            }
1062            | PlanNode::Negate { input }
1063            | PlanNode::Threshold {
1064                input,
1065                threshold_plan: _,
1066            } => {
1067                input.depends_on_into(out);
1068            }
1069        }
1070    }
1071}
1072
1073impl<T> CollectionPlan for Plan<T> {
1074    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
1075        self.node.depends_on_into(out);
1076    }
1077}
1078
1079/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
1080/// on the expected number of rows that will have the same group key.
1081fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
1082    // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
1083    // mz_introspection.mz_expected_group_size_advice.
1084    let mut buckets = vec![];
1085    let mut current = 16;
1086
1087    // Plan for 4B records in the expected case if the user didn't specify a group size.
1088    let limit = expected_group_size.unwrap_or(4_000_000_000);
1089
1090    // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
1091    // each layer gets from the preceding layer, while also limiting the number of layers.
1092    while current < limit {
1093        buckets.push(current);
1094        current = current.saturating_mul(16);
1095    }
1096
1097    buckets.reverse();
1098    buckets
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103    use mz_ore::assert_ok;
1104    use mz_proto::protobuf_roundtrip;
1105
1106    use super::*;
1107
1108    proptest! {
1109        #![proptest_config(ProptestConfig::with_cases(10))]
1110        #[mz_ore::test]
1111        #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1112        fn available_collections_protobuf_roundtrip(expect in any::<AvailableCollections>() ) {
1113            let actual = protobuf_roundtrip::<_, ProtoAvailableCollections>(&expect);
1114            assert_ok!(actual);
1115            assert_eq!(actual.unwrap(), expect);
1116        }
1117    }
1118
1119    proptest! {
1120        #![proptest_config(ProptestConfig::with_cases(10))]
1121        #[mz_ore::test]
1122        #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1123        fn get_plan_protobuf_roundtrip(expect in any::<GetPlan>()) {
1124            let actual = protobuf_roundtrip::<_, ProtoGetPlan>(&expect);
1125            assert_ok!(actual);
1126            assert_eq!(actual.unwrap(), expect);
1127        }
1128    }
1129}