mz_compute_types/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An explicit representation of a rendering plan for provided dataflows.
11
12#![warn(missing_debug_implementations)]
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU64;
16
17use columnar::Columnar;
18use mz_expr::{
19    CollectionPlan, EvalError, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr,
20    OptimizedMirRelationExpr, TableFunc,
21};
22use mz_ore::soft_assert_eq_no_log;
23use mz_ore::str::Indent;
24use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
25use mz_repr::explain::text::text_string_at;
26use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext};
27use mz_repr::optimize::OptimizerFeatures;
28use mz_repr::{ColumnType, Diff, GlobalId, Row};
29use proptest::arbitrary::Arbitrary;
30use proptest::prelude::*;
31use proptest::strategy::Strategy;
32use proptest_derive::Arbitrary;
33use serde::{Deserialize, Serialize};
34
35use crate::dataflows::DataflowDescription;
36use crate::plan::join::JoinPlan;
37use crate::plan::proto_available_collections::ProtoColumnTypes;
38use crate::plan::reduce::{KeyValPlan, ReducePlan};
39use crate::plan::threshold::ThresholdPlan;
40use crate::plan::top_k::TopKPlan;
41use crate::plan::transform::{Transform, TransformConfig};
42
43mod lowering;
44
45pub mod interpret;
46pub mod join;
47pub mod reduce;
48pub mod render_plan;
49pub mod threshold;
50pub mod top_k;
51pub mod transform;
52
53include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.rs"));
54
55/// The forms in which an operator's output is available.
56///
57/// These forms may include "raw", meaning as a streamed collection, but also any
58/// number of "arranged" representations.
59///
60/// Each arranged representation is described by a `KeyValRowMapping`, or rather
61/// at the moment by its three fields in a triple. These fields explain how to form
62/// a "key" by applying some expressions to each row, how to select "values" from
63/// columns not explicitly captured by the key, and how to return to the original
64/// row from the concatenation of key and value. Further explanation is available
65/// in the documentation for `KeyValRowMapping`.
66#[derive(
67    Arbitrary, Clone, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize,
68)]
69pub struct AvailableCollections {
70    /// Whether the collection exists in unarranged form.
71    pub raw: bool,
72    /// The list of available arrangements, presented as a `KeyValRowMapping`,
73    /// but here represented by a triple `(to_key, to_val, to_row)` instead.
74    /// The documentation for `KeyValRowMapping` explains these fields better.
75    #[proptest(strategy = "prop::collection::vec(any_arranged_thin(), 0..3)")]
76    pub arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
77    /// The types of the columns in the raw form of the collection, if known. We
78    /// only capture types when necessary to support arrangement specialization,
79    /// so this only done for specific LIR operators during lowering.
80    pub types: Option<Vec<ColumnType>>,
81}
82
83/// A strategy that produces arrangements that are thinner than the default. That is
84/// the number of direct children is limited to a maximum of 3.
85pub(crate) fn any_arranged_thin()
86-> impl Strategy<Value = (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
87    (
88        prop::collection::vec(MirScalarExpr::arbitrary(), 0..3),
89        Vec::<usize>::arbitrary(),
90        Vec::<usize>::arbitrary(),
91    )
92}
93
94impl RustType<ProtoColumnTypes> for Vec<ColumnType> {
95    fn into_proto(&self) -> ProtoColumnTypes {
96        ProtoColumnTypes {
97            types: self.into_proto(),
98        }
99    }
100
101    fn from_proto(proto: ProtoColumnTypes) -> Result<Self, TryFromProtoError> {
102        proto.types.into_rust()
103    }
104}
105
106impl RustType<ProtoAvailableCollections> for AvailableCollections {
107    fn into_proto(&self) -> ProtoAvailableCollections {
108        ProtoAvailableCollections {
109            raw: self.raw,
110            arranged: self.arranged.into_proto(),
111            types: self.types.into_proto(),
112        }
113    }
114
115    fn from_proto(x: ProtoAvailableCollections) -> Result<Self, TryFromProtoError> {
116        Ok({
117            Self {
118                raw: x.raw,
119                arranged: x.arranged.into_rust()?,
120                types: x.types.into_rust()?,
121            }
122        })
123    }
124}
125
126impl AvailableCollections {
127    /// Represent a collection that has no arrangements.
128    pub fn new_raw() -> Self {
129        Self {
130            raw: true,
131            arranged: Vec::new(),
132            types: None,
133        }
134    }
135
136    /// Represent a collection that is arranged in the
137    /// specified ways, with optionally given types describing
138    /// the rows that would be in the raw form of the collection.
139    pub fn new_arranged(
140        arranged: Vec<(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)>,
141        types: Option<Vec<ColumnType>>,
142    ) -> Self {
143        assert!(
144            !arranged.is_empty(),
145            "Invariant violated: at least one collection must exist"
146        );
147        Self {
148            raw: false,
149            arranged,
150            types,
151        }
152    }
153
154    /// Get some arrangement, if one exists.
155    pub fn arbitrary_arrangement(&self) -> Option<&(Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> {
156        assert!(
157            self.raw || !self.arranged.is_empty(),
158            "Invariant violated: at least one collection must exist"
159        );
160        self.arranged.get(0)
161    }
162}
163
164/// An identifier for an LIR node.
165#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, Columnar)]
166pub struct LirId(u64);
167
168impl LirId {
169    fn as_u64(&self) -> u64 {
170        self.0
171    }
172}
173
174impl From<LirId> for u64 {
175    fn from(value: LirId) -> Self {
176        value.as_u64()
177    }
178}
179
180impl std::fmt::Display for LirId {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        write!(f, "{}", self.0)
183    }
184}
185
186impl RustType<u64> for LirId {
187    fn into_proto(&self) -> u64 {
188        self.0
189    }
190
191    fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
192        Ok(Self(proto))
193    }
194}
195
196/// A rendering plan with as much conditional logic as possible removed.
197#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
198pub struct Plan<T = mz_repr::Timestamp> {
199    /// A dataflow-local identifier.
200    pub lir_id: LirId,
201    /// The underlying operator.
202    pub node: PlanNode<T>,
203}
204
205/// The actual AST node of the `Plan`.
206#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
207pub enum PlanNode<T = mz_repr::Timestamp> {
208    /// A collection containing a pre-determined collection.
209    Constant {
210        /// Explicit update triples for the collection.
211        rows: Result<Vec<(Row, T, Diff)>, EvalError>,
212    },
213    /// A reference to a bound collection.
214    ///
215    /// This is commonly either an external reference to an existing source or
216    /// maintained arrangement, or an internal reference to a `Let` identifier.
217    Get {
218        /// A global or local identifier naming the collection.
219        id: Id,
220        /// Arrangements that will be available.
221        ///
222        /// The collection will also be loaded if available, which it will
223        /// not be for imported data, but which it may be for locally defined
224        /// data.
225        // TODO: Be more explicit about whether a collection is available,
226        // although one can always produce it from an arrangement, and it
227        // seems generally advantageous to do that instead (to avoid cloning
228        // rows, by using `mfp` first on borrowed data).
229        keys: AvailableCollections,
230        /// The actions to take when introducing the collection.
231        plan: GetPlan,
232    },
233    /// Binds `value` to `id`, and then results in `body` with that binding.
234    ///
235    /// This stage has the effect of sharing `value` across multiple possible
236    /// uses in `body`, and is the only mechanism we have for sharing collection
237    /// information across parts of a dataflow.
238    ///
239    /// The binding is not available outside of `body`.
240    Let {
241        /// The local identifier to be used, available to `body` as `Id::Local(id)`.
242        id: LocalId,
243        /// The collection that should be bound to `id`.
244        value: Box<Plan<T>>,
245        /// The collection that results, which is allowed to contain `Get` stages
246        /// that reference `Id::Local(id)`.
247        body: Box<Plan<T>>,
248    },
249    /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`.
250    ///
251    /// All bindings are available to all bindings, and to `body`.
252    /// The contents of each binding are initially empty, and then updated through a sequence
253    /// of iterations in which each binding is updated in sequence, from the most recent values
254    /// of all bindings.
255    LetRec {
256        /// The local identifiers to be used, available to `body` as `Id::Local(id)`.
257        ids: Vec<LocalId>,
258        /// The collection that should be bound to `id`.
259        values: Vec<Plan<T>>,
260        /// Maximum number of iterations. See further info on the MIR `LetRec`.
261        limits: Vec<Option<LetRecLimit>>,
262        /// The collection that results, which is allowed to contain `Get` stages
263        /// that reference `Id::Local(id)`.
264        body: Box<Plan<T>>,
265    },
266    /// Map, Filter, and Project operators.
267    ///
268    /// This stage contains work that we would ideally like to fuse to other plan
269    /// stages, but for practical reasons cannot. For example: threshold, topk,
270    /// and sometimes reduce stages are not able to absorb this operator.
271    Mfp {
272        /// The input collection.
273        input: Box<Plan<T>>,
274        /// Linear operator to apply to each record.
275        mfp: MapFilterProject,
276        /// Whether the input is from an arrangement, and if so,
277        /// whether we can seek to a specific value therein
278        input_key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
279    },
280    /// A variable number of output records for each input record.
281    ///
282    /// This stage is a bit of a catch-all for logic that does not easily fit in
283    /// map stages. This includes table valued functions, but also functions of
284    /// multiple arguments, and functions that modify the sign of updates.
285    ///
286    /// This stage allows a `MapFilterProject` operator to be fused to its output,
287    /// and this can be very important as otherwise the output of `func` is just
288    /// appended to the input record, for as many outputs as it has. This has the
289    /// unpleasant default behavior of repeating potentially large records that
290    /// are being unpacked, producing quadratic output in those cases. Instead,
291    /// in these cases use a `mfp` member that projects away these large fields.
292    FlatMap {
293        /// The input collection.
294        input: Box<Plan<T>>,
295        /// The variable-record emitting function.
296        func: TableFunc,
297        /// Expressions that for each row prepare the arguments to `func`.
298        exprs: Vec<MirScalarExpr>,
299        /// Linear operator to apply to each record produced by `func`.
300        mfp_after: MapFilterProject,
301        /// The particular arrangement of the input we expect to use,
302        /// if any
303        input_key: Option<Vec<MirScalarExpr>>,
304    },
305    /// A multiway relational equijoin, with fused map, filter, and projection.
306    ///
307    /// This stage performs a multiway join among `inputs`, using the equality
308    /// constraints expressed in `plan`. The plan also describes the implementation
309    /// strategy we will use, and any pushed down per-record work.
310    Join {
311        /// An ordered list of inputs that will be joined.
312        inputs: Vec<Plan<T>>,
313        /// Detailed information about the implementation of the join.
314        ///
315        /// This includes information about the implementation strategy, but also
316        /// any map, filter, project work that we might follow the join with, but
317        /// potentially pushed down into the implementation of the join.
318        plan: JoinPlan,
319    },
320    /// Aggregation by key.
321    Reduce {
322        /// The input collection.
323        input: Box<Plan<T>>,
324        /// A plan for changing input records into key, value pairs.
325        key_val_plan: KeyValPlan,
326        /// A plan for performing the reduce.
327        ///
328        /// The implementation of reduction has several different strategies based
329        /// on the properties of the reduction, and the input itself. Please check
330        /// out the documentation for this type for more detail.
331        plan: ReducePlan,
332        /// The particular arrangement of the input we expect to use,
333        /// if any
334        input_key: Option<Vec<MirScalarExpr>>,
335        /// An MFP that must be applied to results. The projection part of this
336        /// MFP must preserve the key for the reduction; otherwise, the results
337        /// become undefined. Additionally, the MFP must be free from temporal
338        /// predicates so that it can be readily evaluated.
339        mfp_after: MapFilterProject,
340    },
341    /// Key-based "Top K" operator, retaining the first K records in each group.
342    TopK {
343        /// The input collection.
344        input: Box<Plan<T>>,
345        /// A plan for performing the Top-K.
346        ///
347        /// The implementation of reduction has several different strategies based
348        /// on the properties of the reduction, and the input itself. Please check
349        /// out the documentation for this type for more detail.
350        top_k_plan: TopKPlan,
351    },
352    /// Inverts the sign of each update.
353    Negate {
354        /// The input collection.
355        input: Box<Plan<T>>,
356    },
357    /// Filters records that accumulate negatively.
358    ///
359    /// Although the operator suppresses updates, it is a stateful operator taking
360    /// resources proportional to the number of records with non-zero accumulation.
361    Threshold {
362        /// The input collection.
363        input: Box<Plan<T>>,
364        /// A plan for performing the threshold.
365        ///
366        /// The implementation of reduction has several different strategies based
367        /// on the properties of the reduction, and the input itself. Please check
368        /// out the documentation for this type for more detail.
369        threshold_plan: ThresholdPlan,
370    },
371    /// Adds the contents of the input collections.
372    ///
373    /// Importantly, this is *multiset* union, so the multiplicities of records will
374    /// add. This is in contrast to *set* union, where the multiplicities would be
375    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
376    /// implementing the "distinct" operator.
377    Union {
378        /// The input collections
379        inputs: Vec<Plan<T>>,
380        /// Whether to consolidate the output, e.g., cancel negated records.
381        consolidate_output: bool,
382    },
383    /// The `input` plan, but with additional arrangements.
384    ///
385    /// This operator does not change the logical contents of `input`, but ensures
386    /// that certain arrangements are available in the results. This operator can
387    /// be important for e.g. the `Join` stage which benefits from multiple arrangements
388    /// or to cap a `Plan` so that indexes can be exported.
389    ArrangeBy {
390        /// The input collection.
391        input: Box<Plan<T>>,
392        /// A list of arrangement keys, and possibly a raw collection,
393        /// that will be added to those of the input.
394        ///
395        /// If any of these collection forms are already present in the input, they have no effect.
396        forms: AvailableCollections,
397        /// The key that must be used to access the input.
398        input_key: Option<Vec<MirScalarExpr>>,
399        /// The MFP that must be applied to the input.
400        input_mfp: MapFilterProject,
401    },
402}
403
404impl<T> PlanNode<T> {
405    /// Iterates through references to child expressions.
406    pub fn children(&self) -> impl Iterator<Item = &Plan<T>> {
407        let mut first = None;
408        let mut second = None;
409        let mut rest = None;
410        let mut last = None;
411
412        use PlanNode::*;
413        match self {
414            Constant { .. } | Get { .. } => (),
415            Let { value, body, .. } => {
416                first = Some(&**value);
417                second = Some(&**body);
418            }
419            LetRec { values, body, .. } => {
420                rest = Some(values);
421                last = Some(&**body);
422            }
423            Mfp { input, .. }
424            | FlatMap { input, .. }
425            | Reduce { input, .. }
426            | TopK { input, .. }
427            | Negate { input, .. }
428            | Threshold { input, .. }
429            | ArrangeBy { input, .. } => {
430                first = Some(&**input);
431            }
432            Join { inputs, .. } | Union { inputs, .. } => {
433                rest = Some(inputs);
434            }
435        }
436
437        first
438            .into_iter()
439            .chain(second)
440            .chain(rest.into_iter().flatten())
441            .chain(last)
442    }
443
444    /// Iterates through mutable references to child expressions.
445    pub fn children_mut(&mut self) -> impl Iterator<Item = &mut Plan<T>> {
446        let mut first = None;
447        let mut second = None;
448        let mut rest = None;
449        let mut last = None;
450
451        use PlanNode::*;
452        match self {
453            Constant { .. } | Get { .. } => (),
454            Let { value, body, .. } => {
455                first = Some(&mut **value);
456                second = Some(&mut **body);
457            }
458            LetRec { values, body, .. } => {
459                rest = Some(values);
460                last = Some(&mut **body);
461            }
462            Mfp { input, .. }
463            | FlatMap { input, .. }
464            | Reduce { input, .. }
465            | TopK { input, .. }
466            | Negate { input, .. }
467            | Threshold { input, .. }
468            | ArrangeBy { input, .. } => {
469                first = Some(&mut **input);
470            }
471            Join { inputs, .. } | Union { inputs, .. } => {
472                rest = Some(inputs);
473            }
474        }
475
476        first
477            .into_iter()
478            .chain(second)
479            .chain(rest.into_iter().flatten())
480            .chain(last)
481    }
482}
483
484impl<T> PlanNode<T> {
485    /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`.
486    pub fn as_plan(self, lir_id: LirId) -> Plan<T> {
487        Plan { lir_id, node: self }
488    }
489}
490
491impl Plan {
492    /// Pretty-print this [Plan] to a string.
493    pub fn pretty(&self) -> String {
494        let config = ExplainConfig::default();
495        self.explain(&config, None)
496    }
497
498    /// Pretty-print this [Plan] to a string using a custom
499    /// [ExplainConfig] and an optionally provided [ExprHumanizer].
500    pub fn explain(&self, config: &ExplainConfig, humanizer: Option<&dyn ExprHumanizer>) -> String {
501        text_string_at(self, || PlanRenderingContext {
502            indent: Indent::default(),
503            humanizer: humanizer.unwrap_or(&DummyHumanizer),
504            annotations: BTreeMap::default(),
505            config,
506        })
507    }
508}
509
510impl Arbitrary for LirId {
511    type Strategy = BoxedStrategy<LirId>;
512    type Parameters = ();
513
514    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
515        let lir_id = u64::arbitrary();
516        lir_id.prop_map(LirId).boxed()
517    }
518}
519
520impl Arbitrary for Plan {
521    type Strategy = BoxedStrategy<Plan>;
522    type Parameters = ();
523
524    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
525        let row_diff = prop::collection::vec(
526            (
527                Row::arbitrary_with((1..5).into()),
528                mz_repr::Timestamp::arbitrary(),
529                Diff::arbitrary(),
530            ),
531            0..2,
532        );
533        let rows = prop::result::maybe_ok(row_diff, EvalError::arbitrary());
534        let constant = (rows, any::<LirId>()).prop_map(|(rows, lir_id)| {
535            PlanNode::<mz_repr::Timestamp>::Constant { rows }.as_plan(lir_id)
536        });
537
538        let get = (
539            any::<GlobalId>(),
540            any::<AvailableCollections>(),
541            any::<GetPlan>(),
542            any::<LirId>(),
543        )
544            .prop_map(|(id, keys, plan, lir_id)| {
545                PlanNode::<mz_repr::Timestamp>::Get {
546                    id: Id::Global(id),
547                    keys,
548                    plan,
549                }
550                .as_plan(lir_id)
551            });
552
553        let leaf = prop::strategy::Union::new(vec![constant.boxed(), get.boxed()]).boxed();
554
555        leaf.prop_recursive(2, 4, 5, |inner| {
556            prop::strategy::Union::new(vec![
557                //Plan::Let
558                (
559                    any::<LocalId>(),
560                    inner.clone(),
561                    inner.clone(),
562                    any::<LirId>(),
563                )
564                    .prop_map(|(id, value, body, lir_id)| {
565                        PlanNode::<mz_repr::Timestamp>::Let {
566                            id,
567                            value: value.into(),
568                            body: body.into(),
569                        }
570                        .as_plan(lir_id)
571                    })
572                    .boxed(),
573                //Plan::Mfp
574                (
575                    inner.clone(),
576                    any::<MapFilterProject>(),
577                    any::<Option<(Vec<MirScalarExpr>, Option<Row>)>>(),
578                    any::<LirId>(),
579                )
580                    .prop_map(|(input, mfp, input_key_val, lir_id)| {
581                        PlanNode::Mfp {
582                            input: input.into(),
583                            mfp,
584                            input_key_val,
585                        }
586                        .as_plan(lir_id)
587                    })
588                    .boxed(),
589                //Plan::FlatMap
590                (
591                    inner.clone(),
592                    any::<TableFunc>(),
593                    any::<Vec<MirScalarExpr>>(),
594                    any::<MapFilterProject>(),
595                    any::<Option<Vec<MirScalarExpr>>>(),
596                    any::<LirId>(),
597                )
598                    .prop_map(|(input, func, exprs, mfp, input_key, lir_id)| {
599                        PlanNode::FlatMap {
600                            input: input.into(),
601                            func,
602                            exprs,
603                            mfp_after: mfp,
604                            input_key,
605                        }
606                        .as_plan(lir_id)
607                    })
608                    .boxed(),
609                //Plan::Join
610                (
611                    prop::collection::vec(inner.clone(), 0..2),
612                    any::<JoinPlan>(),
613                    any::<LirId>(),
614                )
615                    .prop_map(|(inputs, plan, lir_id)| {
616                        PlanNode::Join { inputs, plan }.as_plan(lir_id)
617                    })
618                    .boxed(),
619                //Plan::Reduce
620                (
621                    inner.clone(),
622                    any::<KeyValPlan>(),
623                    any::<ReducePlan>(),
624                    any::<Option<Vec<MirScalarExpr>>>(),
625                    any::<MapFilterProject>(),
626                    any::<LirId>(),
627                )
628                    .prop_map(
629                        |(input, key_val_plan, plan, input_key, mfp_after, lir_id)| {
630                            PlanNode::Reduce {
631                                input: input.into(),
632                                key_val_plan,
633                                plan,
634                                input_key,
635                                mfp_after,
636                            }
637                            .as_plan(lir_id)
638                        },
639                    )
640                    .boxed(),
641                //Plan::TopK
642                (inner.clone(), any::<TopKPlan>(), any::<LirId>())
643                    .prop_map(|(input, top_k_plan, lir_id)| {
644                        PlanNode::TopK {
645                            input: input.into(),
646                            top_k_plan,
647                        }
648                        .as_plan(lir_id)
649                    })
650                    .boxed(),
651                //Plan::Negate
652                (inner.clone(), any::<LirId>())
653                    .prop_map(|(x, lir_id)| PlanNode::Negate { input: x.into() }.as_plan(lir_id))
654                    .boxed(),
655                //Plan::Threshold
656                (inner.clone(), any::<ThresholdPlan>(), any::<LirId>())
657                    .prop_map(|(input, threshold_plan, lir_id)| {
658                        PlanNode::Threshold {
659                            input: input.into(),
660                            threshold_plan,
661                        }
662                        .as_plan(lir_id)
663                    })
664                    .boxed(),
665                // Plan::Union
666                (
667                    prop::collection::vec(inner.clone(), 0..2),
668                    any::<bool>(),
669                    any::<LirId>(),
670                )
671                    .prop_map(|(x, b, lir_id)| {
672                        PlanNode::Union {
673                            inputs: x,
674                            consolidate_output: b,
675                        }
676                        .as_plan(lir_id)
677                    })
678                    .boxed(),
679                //Plan::ArrangeBy
680                (
681                    inner,
682                    any::<AvailableCollections>(),
683                    any::<Option<Vec<MirScalarExpr>>>(),
684                    any::<MapFilterProject>(),
685                    any::<LirId>(),
686                )
687                    .prop_map(|(input, forms, input_key, input_mfp, lir_id)| {
688                        PlanNode::ArrangeBy {
689                            input: input.into(),
690                            forms,
691                            input_key,
692                            input_mfp,
693                        }
694                        .as_plan(lir_id)
695                    })
696                    .boxed(),
697            ])
698        })
699        .boxed()
700    }
701}
702
703/// How a `Get` stage will be rendered.
704#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
705pub enum GetPlan {
706    /// Simply pass input arrangements on to the next stage.
707    PassArrangements,
708    /// Using the supplied key, optionally seek the row, and apply the MFP.
709    Arrangement(
710        #[proptest(strategy = "prop::collection::vec(MirScalarExpr::arbitrary(), 0..3)")]
711        Vec<MirScalarExpr>,
712        Option<Row>,
713        MapFilterProject,
714    ),
715    /// Scan the input collection (unarranged) and apply the MFP.
716    Collection(MapFilterProject),
717}
718
719impl RustType<ProtoGetPlan> for GetPlan {
720    fn into_proto(&self) -> ProtoGetPlan {
721        use proto_get_plan::Kind::*;
722
723        ProtoGetPlan {
724            kind: Some(match self {
725                GetPlan::PassArrangements => PassArrangements(()),
726                GetPlan::Arrangement(k, s, m) => {
727                    Arrangement(proto_get_plan::ProtoGetPlanArrangement {
728                        key: k.into_proto(),
729                        seek: s.into_proto(),
730                        mfp: Some(m.into_proto()),
731                    })
732                }
733                GetPlan::Collection(mfp) => Collection(mfp.into_proto()),
734            }),
735        }
736    }
737
738    fn from_proto(proto: ProtoGetPlan) -> Result<Self, TryFromProtoError> {
739        use proto_get_plan::Kind::*;
740        use proto_get_plan::ProtoGetPlanArrangement;
741        match proto.kind {
742            Some(PassArrangements(())) => Ok(GetPlan::PassArrangements),
743            Some(Arrangement(ProtoGetPlanArrangement { key, seek, mfp })) => {
744                Ok(GetPlan::Arrangement(
745                    key.into_rust()?,
746                    seek.into_rust()?,
747                    mfp.into_rust_if_some("ProtoGetPlanArrangement::mfp")?,
748                ))
749            }
750            Some(Collection(mfp)) => Ok(GetPlan::Collection(mfp.into_rust()?)),
751            None => Err(TryFromProtoError::missing_field("ProtoGetPlan::kind")),
752        }
753    }
754}
755
756impl RustType<ProtoLetRecLimit> for LetRecLimit {
757    fn into_proto(&self) -> ProtoLetRecLimit {
758        ProtoLetRecLimit {
759            max_iters: self.max_iters.get(),
760            return_at_limit: self.return_at_limit,
761        }
762    }
763
764    fn from_proto(proto: ProtoLetRecLimit) -> Result<Self, TryFromProtoError> {
765        Ok(LetRecLimit {
766            max_iters: NonZeroU64::new(proto.max_iters).expect("max_iters > 0"),
767            return_at_limit: proto.return_at_limit,
768        })
769    }
770}
771
772impl<T: timely::progress::Timestamp> Plan<T> {
773    /// Convert the dataflow description into one that uses render plans.
774    #[mz_ore::instrument(
775        target = "optimizer",
776        level = "debug",
777        fields(path.segment = "finalize_dataflow")
778    )]
779    pub fn finalize_dataflow(
780        desc: DataflowDescription<OptimizedMirRelationExpr>,
781        features: &OptimizerFeatures,
782    ) -> Result<DataflowDescription<Self>, String> {
783        // First, we lower the dataflow description from MIR to LIR.
784        let mut dataflow = Self::lower_dataflow(desc, features)?;
785
786        // Subsequently, we perform plan refinements for the dataflow.
787        Self::refine_source_mfps(&mut dataflow);
788
789        if features.enable_consolidate_after_union_negate {
790            Self::refine_union_negate_consolidation(&mut dataflow);
791        }
792
793        if dataflow.is_single_time() {
794            Self::refine_single_time_operator_selection(&mut dataflow);
795
796            // The relaxation of the `must_consolidate` flag performs an LIR-based
797            // analysis and transform under checked recursion. By a similar argument
798            // made in `from_mir`, we do not expect the recursion limit to be hit.
799            // However, if that happens, we propagate an error to the caller.
800            // To apply the transform, we first obtain monotonic source and index
801            // global IDs and add them to a `TransformConfig` instance.
802            let monotonic_ids = dataflow
803                .source_imports
804                .iter()
805                .filter_map(|(id, (_, monotonic, _upper))| if *monotonic { Some(id) } else { None })
806                .chain(
807                    dataflow
808                        .index_imports
809                        .iter()
810                        .filter_map(|(id, index_import)| {
811                            if index_import.monotonic {
812                                Some(id)
813                            } else {
814                                None
815                            }
816                        }),
817                )
818                .cloned()
819                .collect::<BTreeSet<_>>();
820
821            let config = TransformConfig { monotonic_ids };
822            Self::refine_single_time_consolidation(&mut dataflow, &config)?;
823        }
824
825        soft_assert_eq_no_log!(dataflow.check_invariants(), Ok(()));
826
827        mz_repr::explain::trace_plan(&dataflow);
828
829        Ok(dataflow)
830    }
831
832    /// Lowers the dataflow description from MIR to LIR. To this end, the
833    /// method collects all available arrangements and based on this information
834    /// creates plans for every object to be built for the dataflow.
835    #[mz_ore::instrument(
836        target = "optimizer",
837        level = "debug",
838        fields(path.segment ="mir_to_lir")
839    )]
840    fn lower_dataflow(
841        desc: DataflowDescription<OptimizedMirRelationExpr>,
842        features: &OptimizerFeatures,
843    ) -> Result<DataflowDescription<Self>, String> {
844        let context = lowering::Context::new(desc.debug_name.clone(), features);
845        let dataflow = context.lower(desc)?;
846
847        mz_repr::explain::trace_plan(&dataflow);
848
849        Ok(dataflow)
850    }
851
852    /// Refines the source instance descriptions for sources imported by `dataflow` to
853    /// push down common MFP expressions.
854    #[mz_ore::instrument(
855        target = "optimizer",
856        level = "debug",
857        fields(path.segment = "refine_source_mfps")
858    )]
859    fn refine_source_mfps(dataflow: &mut DataflowDescription<Self>) {
860        // Extract MFPs from Get operators for sources, and extract what we can for the source.
861        // For each source, we want to find `&mut MapFilterProject` for each `Get` expression.
862        for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
863            let mut identity_present = false;
864            let mut mfps = Vec::new();
865            for build_desc in dataflow.objects_to_build.iter_mut() {
866                let mut todo = vec![&mut build_desc.plan];
867                while let Some(expression) = todo.pop() {
868                    let node = &mut expression.node;
869                    if let PlanNode::Get { id, plan, .. } = node {
870                        if *id == mz_expr::Id::Global(*source_id) {
871                            match plan {
872                                GetPlan::Collection(mfp) => mfps.push(mfp),
873                                GetPlan::PassArrangements => {
874                                    identity_present = true;
875                                }
876                                GetPlan::Arrangement(..) => {
877                                    panic!("Surprising `GetPlan` for imported source: {:?}", plan);
878                                }
879                            }
880                        }
881                    } else {
882                        todo.extend(node.children_mut());
883                    }
884                }
885            }
886
887            // Direct exports of sources are possible, and prevent pushdown.
888            identity_present |= dataflow
889                .index_exports
890                .values()
891                .any(|(x, _)| x.on_id == *source_id);
892            identity_present |= dataflow.sink_exports.values().any(|x| x.from == *source_id);
893
894            if !identity_present && !mfps.is_empty() {
895                // Extract a common prefix `MapFilterProject` from `mfps`.
896                let common = MapFilterProject::extract_common(&mut mfps[..]);
897                // Apply common expressions to the source's `MapFilterProject`.
898                let mut mfp = if let Some(mfp) = source.arguments.operators.take() {
899                    MapFilterProject::compose(mfp, common)
900                } else {
901                    common
902                };
903                mfp.optimize();
904                source.arguments.operators = Some(mfp);
905            }
906        }
907        mz_repr::explain::trace_plan(dataflow);
908    }
909
910    /// Changes the `consolidate_output` flag of such Unions that have at least one Negated input.
911    #[mz_ore::instrument(
912        target = "optimizer",
913        level = "debug",
914        fields(path.segment = "refine_union_negate_consolidation")
915    )]
916    fn refine_union_negate_consolidation(dataflow: &mut DataflowDescription<Self>) {
917        for build_desc in dataflow.objects_to_build.iter_mut() {
918            let mut todo = vec![&mut build_desc.plan];
919            while let Some(expression) = todo.pop() {
920                let node = &mut expression.node;
921                match node {
922                    PlanNode::Union {
923                        inputs,
924                        consolidate_output,
925                        ..
926                    } => {
927                        if inputs
928                            .iter()
929                            .any(|input| matches!(input.node, PlanNode::Negate { .. }))
930                        {
931                            *consolidate_output = true;
932                        }
933                    }
934                    _ => {}
935                }
936                todo.extend(node.children_mut());
937            }
938        }
939        mz_repr::explain::trace_plan(dataflow);
940    }
941
942    /// Refines the plans of objects to be built as part of `dataflow` to take advantage
943    /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
944    /// one-shot SELECT query.
945    #[mz_ore::instrument(
946        target = "optimizer",
947        level = "debug",
948        fields(path.segment = "refine_single_time_operator_selection")
949    )]
950    fn refine_single_time_operator_selection(dataflow: &mut DataflowDescription<Self>) {
951        // We should only reach here if we have a one-shot SELECT query, i.e.,
952        // a single-time dataflow.
953        assert!(dataflow.is_single_time());
954
955        // Upgrade single-time plans to monotonic.
956        for build_desc in dataflow.objects_to_build.iter_mut() {
957            let mut todo = vec![&mut build_desc.plan];
958            while let Some(expression) = todo.pop() {
959                let node = &mut expression.node;
960                match node {
961                    PlanNode::Reduce { plan, .. } => {
962                        // Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
963                        match plan {
964                            ReducePlan::Collation(collation) => {
965                                collation.as_monotonic(true);
966                            }
967                            ReducePlan::Hierarchical(hierarchical) => {
968                                hierarchical.as_monotonic(true);
969                            }
970                            _ => {
971                                // Nothing to do for other plans, and doing nothing is safe for future variants.
972                            }
973                        }
974                        todo.extend(node.children_mut());
975                    }
976                    PlanNode::TopK { top_k_plan, .. } => {
977                        top_k_plan.as_monotonic(true);
978                        todo.extend(node.children_mut());
979                    }
980                    PlanNode::LetRec { body, .. } => {
981                        // Only the non-recursive `body` is restricted to a single time.
982                        todo.push(body);
983                    }
984                    _ => {
985                        // Nothing to do for other expressions, and doing nothing is safe for future expressions.
986                        todo.extend(node.children_mut());
987                    }
988                }
989            }
990        }
991        mz_repr::explain::trace_plan(dataflow);
992    }
993
994    /// Refines the plans of objects to be built as part of a single-time `dataflow` to relax
995    /// the setting of the `must_consolidate` attribute of monotonic operators, if necessary,
996    /// whenever the input is deemed to be physically monotonic.
997    #[mz_ore::instrument(
998        target = "optimizer",
999        level = "debug",
1000        fields(path.segment = "refine_single_time_consolidation")
1001    )]
1002    fn refine_single_time_consolidation(
1003        dataflow: &mut DataflowDescription<Self>,
1004        config: &TransformConfig,
1005    ) -> Result<(), String> {
1006        // We should only reach here if we have a one-shot SELECT query, i.e.,
1007        // a single-time dataflow.
1008        assert!(dataflow.is_single_time());
1009
1010        let transform = transform::RelaxMustConsolidate::<T>::new();
1011        for build_desc in dataflow.objects_to_build.iter_mut() {
1012            transform
1013                .transform(config, &mut build_desc.plan)
1014                .map_err(|_| "Maximum recursion limit error in consolidation relaxation.")?;
1015        }
1016        mz_repr::explain::trace_plan(dataflow);
1017        Ok(())
1018    }
1019}
1020
1021impl<T> CollectionPlan for PlanNode<T> {
1022    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
1023        match self {
1024            PlanNode::Constant { rows: _ } => (),
1025            PlanNode::Get {
1026                id,
1027                keys: _,
1028                plan: _,
1029            } => match id {
1030                Id::Global(id) => {
1031                    out.insert(*id);
1032                }
1033                Id::Local(_) => (),
1034            },
1035            PlanNode::Let { id: _, value, body } => {
1036                value.depends_on_into(out);
1037                body.depends_on_into(out);
1038            }
1039            PlanNode::LetRec {
1040                ids: _,
1041                values,
1042                limits: _,
1043                body,
1044            } => {
1045                for value in values.iter() {
1046                    value.depends_on_into(out);
1047                }
1048                body.depends_on_into(out);
1049            }
1050            PlanNode::Join { inputs, plan: _ }
1051            | PlanNode::Union {
1052                inputs,
1053                consolidate_output: _,
1054            } => {
1055                for input in inputs {
1056                    input.depends_on_into(out);
1057                }
1058            }
1059            PlanNode::Mfp {
1060                input,
1061                mfp: _,
1062                input_key_val: _,
1063            }
1064            | PlanNode::FlatMap {
1065                input,
1066                func: _,
1067                exprs: _,
1068                mfp_after: _,
1069                input_key: _,
1070            }
1071            | PlanNode::ArrangeBy {
1072                input,
1073                forms: _,
1074                input_key: _,
1075                input_mfp: _,
1076            }
1077            | PlanNode::Reduce {
1078                input,
1079                key_val_plan: _,
1080                plan: _,
1081                input_key: _,
1082                mfp_after: _,
1083            }
1084            | PlanNode::TopK {
1085                input,
1086                top_k_plan: _,
1087            }
1088            | PlanNode::Negate { input }
1089            | PlanNode::Threshold {
1090                input,
1091                threshold_plan: _,
1092            } => {
1093                input.depends_on_into(out);
1094            }
1095        }
1096    }
1097}
1098
1099impl<T> CollectionPlan for Plan<T> {
1100    fn depends_on_into(&self, out: &mut BTreeSet<GlobalId>) {
1101        self.node.depends_on_into(out);
1102    }
1103}
1104
1105/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
1106/// on the expected number of rows that will have the same group key.
1107fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
1108    // NOTE(vmarcos): The fan-in of 16 defined below is used in the tuning advice built-in view
1109    // mz_introspection.mz_expected_group_size_advice.
1110    let mut buckets = vec![];
1111    let mut current = 16;
1112
1113    // Plan for 4B records in the expected case if the user didn't specify a group size.
1114    let limit = expected_group_size.unwrap_or(4_000_000_000);
1115
1116    // Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
1117    // each layer gets from the preceding layer, while also limiting the number of layers.
1118    while current < limit {
1119        buckets.push(current);
1120        current = current.saturating_mul(16);
1121    }
1122
1123    buckets.reverse();
1124    buckets
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use mz_ore::assert_ok;
1130    use mz_proto::protobuf_roundtrip;
1131
1132    use super::*;
1133
1134    proptest! {
1135        #![proptest_config(ProptestConfig::with_cases(10))]
1136        #[mz_ore::test]
1137        #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1138        fn available_collections_protobuf_roundtrip(expect in any::<AvailableCollections>() ) {
1139            let actual = protobuf_roundtrip::<_, ProtoAvailableCollections>(&expect);
1140            assert_ok!(actual);
1141            assert_eq!(actual.unwrap(), expect);
1142        }
1143    }
1144
1145    proptest! {
1146        #![proptest_config(ProptestConfig::with_cases(10))]
1147        #[mz_ore::test]
1148        #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1149        fn get_plan_protobuf_roundtrip(expect in any::<GetPlan>()) {
1150            let actual = protobuf_roundtrip::<_, ProtoGetPlan>(&expect);
1151            assert_ok!(actual);
1152            assert_eq!(actual.unwrap(), expect);
1153        }
1154    }
1155}