mz_transform/
analysis.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Traits and types for reusable expression analysis
11
12pub mod equivalences;
13pub mod monotonic;
14
15use mz_expr::MirRelationExpr;
16
17pub use arity::Arity;
18pub use cardinality::Cardinality;
19pub use column_names::{ColumnName, ColumnNames};
20pub use common::{Derived, DerivedBuilder, DerivedView};
21pub use explain::annotate_plan;
22pub use non_negative::NonNegative;
23pub use subtree::SubtreeSize;
24pub use types::SqlRelationType;
25pub use unique_keys::UniqueKeys;
26
27/// An analysis that can be applied bottom-up to a `MirRelationExpr`.
28pub trait Analysis: 'static {
29    /// The type of value this analysis associates with an expression.
30    type Value: std::fmt::Debug;
31    /// Announce any dependencies this analysis has on other analyses.
32    ///
33    /// The method should invoke `builder.require::<Foo>()` for each other
34    /// analysis `Foo` this analysis depends upon.
35    fn announce_dependencies(_builder: &mut DerivedBuilder) {}
36    /// The analysis value derived for an expression, given other analysis results.
37    ///
38    /// The other analysis results include the results of this analysis for all children,
39    /// in `results`, and the results of other analyses this analysis has expressed a
40    /// dependence upon, in `depends`, for children and the expression itself.
41    /// The analysis results for `Self` can only be found in `results`, and are not
42    /// available in `depends`.
43    ///
44    /// Implementors of this method must defensively check references into `results`, as
45    /// it may be invoked on `LetRec` bindings that have not yet been populated. It is up
46    /// to the analysis what to do in that case, but conservative behavior is recommended.
47    ///
48    /// The `index` indicates the post-order index for the expression, for use in finding
49    /// the corresponding information in `results` and `depends`.
50    ///
51    /// The returned result will be associated with this expression for this analysis,
52    /// and the analyses will continue.
53    fn derive(
54        &self,
55        expr: &MirRelationExpr,
56        index: usize,
57        results: &[Self::Value],
58        depends: &Derived,
59    ) -> Self::Value;
60
61    /// When available, provide a lattice interface to allow optimistic recursion.
62    ///
63    /// Providing a non-`None` output indicates that the analysis intends re-iteration.
64    fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
65        None
66    }
67}
68
69/// Lattice methods for repeated analysis
70pub trait Lattice<T> {
71    /// An element greater than all other elements.
72    fn top(&self) -> T;
73    /// Set `a` to the greatest lower bound of `a` and `b`, and indicate if `a` changed as a result.
74    fn meet_assign(&self, a: &mut T, b: T) -> bool;
75}
76
77/// Types common across multiple analyses
78pub mod common {
79
80    use std::any::{Any, TypeId};
81    use std::collections::BTreeMap;
82
83    use mz_expr::LocalId;
84    use mz_expr::MirRelationExpr;
85    use mz_ore::assert_none;
86    use mz_repr::optimize::OptimizerFeatures;
87
88    use super::Analysis;
89    use super::subtree::SubtreeSize;
90
91    /// Container for analysis state and binding context.
92    #[derive(Default)]
93    #[allow(missing_debug_implementations)]
94    pub struct Derived {
95        /// A record of active analyses and their results, indexed by their type id.
96        analyses: BTreeMap<TypeId, Box<dyn AnalysisBundle>>,
97        /// Analyses ordered where each depends only on strictly prior analyses.
98        order: Vec<TypeId>,
99        /// Map from local identifier to result offset for analysis values.
100        bindings: BTreeMap<LocalId, usize>,
101    }
102
103    impl Derived {
104        /// Return the analysis results derived so far.
105        ///
106        /// # Panics
107        ///
108        /// This method panics if `A` was not installed as a required analysis.
109        pub fn results<A: Analysis>(&self) -> &[A::Value] {
110            let type_id = TypeId::of::<Bundle<A>>();
111            if let Some(bundle) = self.analyses.get(&type_id) {
112                if let Some(bundle) = bundle.as_any().downcast_ref::<Bundle<A>>() {
113                    return &bundle.results[..];
114                }
115            }
116            panic!("Analysis {:?} missing", std::any::type_name::<A>());
117        }
118        /// Bindings from local identifiers to result offsets for analysis values.
119        pub fn bindings(&self) -> &BTreeMap<LocalId, usize> {
120            &self.bindings
121        }
122        /// Result offsets for the state of a various number of children of the current expression.
123        ///
124        /// The integers are the zero-offset locations in the `SubtreeSize` analysis. The order of
125        /// the children is descending, from last child to first, because of how the information is
126        /// laid out, and the non-reversibility of the look-ups.
127        ///
128        /// It is an error to call this method with more children than expression has.
129        pub fn children_of_rev<'a>(
130            &'a self,
131            start: usize,
132            count: usize,
133        ) -> impl Iterator<Item = usize> + 'a {
134            let sizes = self.results::<SubtreeSize>();
135            let offset = 1;
136            (0..count).scan(offset, move |offset, _| {
137                let result = start - *offset;
138                *offset += sizes[result];
139                Some(result)
140            })
141        }
142
143        /// Recast the derived data as a view that can be subdivided into views over child state.
144        pub fn as_view<'a>(&'a self) -> DerivedView<'a> {
145            DerivedView {
146                derived: self,
147                lower: 0,
148                upper: self.results::<SubtreeSize>().len(),
149            }
150        }
151    }
152
153    /// The subset of a `Derived` corresponding to an expression and its children.
154    ///
155    /// Specifically, bounds an interval `[lower, upper)` that ends with the state
156    /// of an expression, at `upper-1`, and is preceded by the state of descendents.
157    ///
158    /// This is best thought of as a node in a tree rather
159    #[allow(missing_debug_implementations)]
160    #[derive(Copy, Clone)]
161    pub struct DerivedView<'a> {
162        derived: &'a Derived,
163        lower: usize,
164        upper: usize,
165    }
166
167    impl<'a> DerivedView<'a> {
168        /// The value associated with the expression.
169        pub fn value<A: Analysis>(&self) -> Option<&'a A::Value> {
170            self.results::<A>().last()
171        }
172
173        /// The post-order traversal index for the expression.
174        ///
175        /// This can be used to index into the full set of results, as provided by an
176        /// instance of `Derived`.
177        pub fn index(&self) -> usize {
178            self.upper - 1
179        }
180
181        /// The value bound to an identifier, if it has been derived.
182        ///
183        /// There are several reasons the value could not be derived, and this method
184        /// does not distinguish between them.
185        pub fn bound<A: Analysis>(&self, id: LocalId) -> Option<&'a A::Value> {
186            self.derived
187                .bindings
188                .get(&id)
189                .and_then(|index| self.derived.results::<A>().get(*index))
190        }
191
192        /// The results for expression and its children.
193        ///
194        /// The results for the expression itself will be the last element.
195        ///
196        /// # Panics
197        ///
198        /// This method panics if `A` was not installed as a required analysis.
199        pub fn results<A: Analysis>(&self) -> &'a [A::Value] {
200            &self.derived.results::<A>()[self.lower..self.upper]
201        }
202
203        /// Bindings from local identifiers to result offsets for analysis values.
204        ///
205        /// This method returns all bindings, which may include bindings not in scope for
206        /// the expression and its children; they should be ignored.
207        pub fn bindings(&self) -> &'a BTreeMap<LocalId, usize> {
208            self.derived.bindings()
209        }
210
211        /// Subviews over `self` corresponding to the children of the expression, in reverse order.
212        ///
213        /// These views should disjointly cover the same interval as `self`, except for the last element
214        /// which corresponds to the expression itself.
215        ///
216        /// The number of produced items should exactly match the number of children, which need not
217        /// be provided as an argument. This relies on the well-formedness of the view, which should
218        /// exhaust itself just as it enumerates its last (the first) child view.
219        pub fn children_rev(&self) -> impl Iterator<Item = DerivedView<'a>> + 'a {
220            // This logic is copy/paste from `Derived::children_of_rev` but it was annoying to layer
221            // it over the output of that function, and perhaps clearer to rewrite in any case.
222
223            // Discard the last element (the size of the expression's subtree).
224            // Repeatedly read out the last element, then peel off that many elements.
225            // Each extracted slice corresponds to a child of the current expression.
226            // We should end cleanly with an empty slice, otherwise there is an issue.
227            let sizes = self.results::<SubtreeSize>();
228            let sizes = &sizes[..sizes.len() - 1];
229
230            let offset = self.lower;
231            let derived = self.derived;
232            (0..).scan(sizes, move |sizes, _| {
233                if let Some(size) = sizes.last() {
234                    *sizes = &sizes[..sizes.len() - size];
235                    Some(Self {
236                        derived,
237                        lower: offset + sizes.len(),
238                        upper: offset + sizes.len() + size,
239                    })
240                } else {
241                    None
242                }
243            })
244        }
245
246        /// A convenience method for the view over the expressions last child.
247        ///
248        /// This method is appropriate to call on expressions with multiple children,
249        /// and in particular for `Let` and `LetRecv` variants where the body is the
250        /// last child.
251        ///
252        /// It is an error to call this on a view for an expression with no children.
253        pub fn last_child(&self) -> DerivedView<'a> {
254            self.children_rev().next().unwrap()
255        }
256    }
257
258    /// A builder wrapper to accumulate announced dependencies and construct default state.
259    #[allow(missing_debug_implementations)]
260    pub struct DerivedBuilder<'a> {
261        result: Derived,
262        features: &'a OptimizerFeatures,
263    }
264
265    impl<'a> DerivedBuilder<'a> {
266        /// Create a new [`DerivedBuilder`] parameterized by [`OptimizerFeatures`].
267        pub fn new(features: &'a OptimizerFeatures) -> Self {
268            // The default builder should include `SubtreeSize` to facilitate navigation.
269            let mut builder = DerivedBuilder {
270                result: Derived::default(),
271                features,
272            };
273            builder.require(SubtreeSize);
274            builder
275        }
276    }
277
278    impl<'a> DerivedBuilder<'a> {
279        /// Announces a dependence on an analysis `A`.
280        ///
281        /// This ensures that `A` will be performed, and before any analysis that
282        /// invokes this method.
283        pub fn require<A: Analysis>(&mut self, analysis: A) {
284            // The method recursively descends through required analyses, first
285            // installing each in `result.analyses` and second in `result.order`.
286            // The first is an obligation, and serves as an indication that we have
287            // found a cycle in dependencies.
288            let type_id = TypeId::of::<Bundle<A>>();
289            if !self.result.order.contains(&type_id) {
290                // If we have not sequenced `type_id` but have a bundle, it means
291                // we are in the process of fulfilling its requirements: a cycle.
292                if self.result.analyses.contains_key(&type_id) {
293                    panic!("Cyclic dependency detected: {}", std::any::type_name::<A>());
294                }
295                // Insert the analysis bundle first, so that we can detect cycles.
296                self.result.analyses.insert(
297                    type_id,
298                    Box::new(Bundle::<A> {
299                        analysis,
300                        results: Vec::new(),
301                        fuel: 100,
302                        allow_optimistic: self.features.enable_letrec_fixpoint_analysis,
303                    }),
304                );
305                A::announce_dependencies(self);
306                // All dependencies are successfully sequenced; sequence `type_id`.
307                self.result.order.push(type_id);
308            }
309        }
310        /// Complete the building: perform analyses and return the resulting `Derivation`.
311        pub fn visit(mut self, expr: &MirRelationExpr) -> Derived {
312            // A stack of expressions to process (`Ok`) and let bindings to fill (`Err`).
313            let mut todo = vec![Ok(expr)];
314            // Expressions in reverse post-order: each expression, followed by its children in reverse order.
315            // We will reverse this to get the post order, but must form it in reverse.
316            let mut rev_post_order = Vec::new();
317            while let Some(command) = todo.pop() {
318                match command {
319                    // An expression to visit.
320                    Ok(expr) => {
321                        match expr {
322                            MirRelationExpr::Let { id, value, body } => {
323                                todo.push(Ok(value));
324                                todo.push(Err(*id));
325                                todo.push(Ok(body));
326                            }
327                            MirRelationExpr::LetRec {
328                                ids, values, body, ..
329                            } => {
330                                for (id, value) in ids.iter().zip(values) {
331                                    todo.push(Ok(value));
332                                    todo.push(Err(*id));
333                                }
334                                todo.push(Ok(body));
335                            }
336                            _ => {
337                                todo.extend(expr.children().map(Ok));
338                            }
339                        }
340                        rev_post_order.push(expr);
341                    }
342                    // A local id to install
343                    Err(local_id) => {
344                        // Capture the *remaining* work, which we'll need to flip around.
345                        let prior = self.result.bindings.insert(local_id, rev_post_order.len());
346                        assert_none!(prior, "Shadowing not allowed");
347                    }
348                }
349            }
350            // Flip the offsets now that we know a length.
351            for value in self.result.bindings.values_mut() {
352                *value = rev_post_order.len() - *value - 1;
353            }
354            // Visit the pre-order in reverse order: post-order.
355            rev_post_order.reverse();
356
357            // Apply each analysis to `expr` in order.
358            for id in self.result.order.iter() {
359                if let Some(mut bundle) = self.result.analyses.remove(id) {
360                    bundle.analyse(&rev_post_order[..], &self.result);
361                    self.result.analyses.insert(*id, bundle);
362                }
363            }
364
365            self.result
366        }
367    }
368
369    /// An abstraction for an analysis and associated state.
370    trait AnalysisBundle: Any {
371        /// Populates internal state for all of `exprs`.
372        ///
373        /// Result indicates whether new information was produced for `exprs.last()`.
374        fn analyse(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool;
375        /// Upcasts `self` to a `&dyn Any`.
376        ///
377        /// NOTE: This is required until <https://github.com/rust-lang/rfcs/issues/2765> is fixed
378        fn as_any(&self) -> &dyn std::any::Any;
379    }
380
381    /// A wrapper for analysis state.
382    struct Bundle<A: Analysis> {
383        /// The algorithm instance used to derive the results.
384        analysis: A,
385        /// A vector of results.
386        results: Vec<A::Value>,
387        /// Counts down with each `LetRec` re-iteration, to avoid unbounded effort.
388        /// Should it reach zero, the analysis should discard its results and restart as if pessimistic.
389        fuel: usize,
390        /// Allow optimistic analysis for `A` (otherwise we always do pesimistic
391        /// analysis, even if a [`crate::analysis::Lattice`] is available for `A`).
392        allow_optimistic: bool,
393    }
394
395    impl<A: Analysis> AnalysisBundle for Bundle<A> {
396        fn analyse(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool {
397            self.results.clear();
398            // Attempt optimistic analysis, and if that fails go pessimistic.
399            let update = A::lattice()
400                .filter(|_| self.allow_optimistic)
401                .and_then(|lattice| {
402                    for _ in exprs.iter() {
403                        self.results.push(lattice.top());
404                    }
405                    self.analyse_optimistic(exprs, 0, exprs.len(), depends, &*lattice)
406                        .ok()
407                })
408                .unwrap_or_else(|| {
409                    self.results.clear();
410                    self.analyse_pessimistic(exprs, depends)
411                });
412            assert_eq!(self.results.len(), exprs.len());
413            update
414        }
415        fn as_any(&self) -> &dyn std::any::Any {
416            self
417        }
418    }
419
420    impl<A: Analysis> Bundle<A> {
421        /// Analysis that starts optimistically but is only correct at a fixed point.
422        ///
423        /// Will fail out to `analyse_pessimistic` if the lattice is missing, or `self.fuel` is exhausted.
424        /// When successful, the result indicates whether new information was produced for `exprs.last()`.
425        fn analyse_optimistic(
426            &mut self,
427            exprs: &[&MirRelationExpr],
428            lower: usize,
429            upper: usize,
430            depends: &Derived,
431            lattice: &dyn crate::analysis::Lattice<A::Value>,
432        ) -> Result<bool, ()> {
433            // Repeatedly re-evaluate the whole tree bottom up, until no changes of fuel spent.
434            let mut changed = true;
435            while changed {
436                changed = false;
437
438                // Bail out if we have done too many `LetRec` passes in this analysis.
439                self.fuel -= 1;
440                if self.fuel == 0 {
441                    return Err(());
442                }
443
444                // Track if repetitions may be required, to avoid iteration if they are not.
445                let mut is_recursive = false;
446                // Update each derived value and track if any have changed.
447                for index in lower..upper {
448                    let value = self.derive(exprs[index], index, depends);
449                    changed = lattice.meet_assign(&mut self.results[index], value) || changed;
450                    if let MirRelationExpr::LetRec { .. } = &exprs[index] {
451                        is_recursive = true;
452                    }
453                }
454
455                // Un-set the potential loop if there was no recursion.
456                if !is_recursive {
457                    changed = false;
458                }
459            }
460            Ok(true)
461        }
462
463        /// Analysis that starts conservatively and can be stopped at any point.
464        ///
465        /// Result indicates whether new information was produced for `exprs.last()`.
466        fn analyse_pessimistic(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool {
467            // TODO: consider making iterative, from some `bottom()` up using `join_assign()`.
468            self.results.clear();
469            for (index, expr) in exprs.iter().enumerate() {
470                self.results.push(self.derive(expr, index, depends));
471            }
472            true
473        }
474
475        #[inline]
476        fn derive(&self, expr: &MirRelationExpr, index: usize, depends: &Derived) -> A::Value {
477            self.analysis
478                .derive(expr, index, &self.results[..], depends)
479        }
480    }
481}
482
483/// Expression subtree sizes
484///
485/// This analysis counts the number of expressions in each subtree, and is most useful
486/// for navigating the results of other analyses that are offset by subtree sizes.
487pub mod subtree {
488
489    use super::{Analysis, Derived};
490    use mz_expr::MirRelationExpr;
491
492    /// Analysis that determines the size in child expressions of relation expressions.
493    #[derive(Debug)]
494    pub struct SubtreeSize;
495
496    impl Analysis for SubtreeSize {
497        type Value = usize;
498
499        fn derive(
500            &self,
501            expr: &MirRelationExpr,
502            index: usize,
503            results: &[Self::Value],
504            _depends: &Derived,
505        ) -> Self::Value {
506            match expr {
507                MirRelationExpr::Constant { .. } | MirRelationExpr::Get { .. } => 1,
508                _ => {
509                    let mut offset = 1;
510                    for _ in expr.children() {
511                        offset += results[index - offset];
512                    }
513                    offset
514                }
515            }
516        }
517    }
518}
519
520/// Expression arities
521mod arity {
522
523    use super::{Analysis, Derived};
524    use mz_expr::MirRelationExpr;
525
526    /// Analysis that determines the number of columns of relation expressions.
527    #[derive(Debug)]
528    pub struct Arity;
529
530    impl Analysis for Arity {
531        type Value = usize;
532
533        fn derive(
534            &self,
535            expr: &MirRelationExpr,
536            index: usize,
537            results: &[Self::Value],
538            depends: &Derived,
539        ) -> Self::Value {
540            let mut offsets = depends
541                .children_of_rev(index, expr.children().count())
542                .map(|child| results[child])
543                .collect::<Vec<_>>();
544            offsets.reverse();
545            expr.arity_with_input_arities(offsets.into_iter())
546        }
547    }
548}
549
550/// Expression types
551mod types {
552
553    use super::{Analysis, Derived, Lattice};
554    use mz_expr::MirRelationExpr;
555    use mz_repr::SqlColumnType;
556
557    /// Analysis that determines the type of relation expressions.
558    ///
559    /// The value is `Some` when it discovers column types, and `None` in the case that
560    /// it has discovered no constraining information on the column types. The `None`
561    /// variant should only occur in the course of iteration, and should not be revealed
562    /// as an output of the analysis. One can `unwrap()` the result, and if it errors then
563    /// either the expression is malformed or the analysis has a bug.
564    ///
565    /// The analysis will panic if an expression is not well typed (i.e. if `try_col_with_input_cols`
566    /// returns an error).
567    #[derive(Debug)]
568    pub struct SqlRelationType;
569
570    impl Analysis for SqlRelationType {
571        type Value = Option<Vec<SqlColumnType>>;
572
573        fn derive(
574            &self,
575            expr: &MirRelationExpr,
576            index: usize,
577            results: &[Self::Value],
578            depends: &Derived,
579        ) -> Self::Value {
580            let offsets = depends
581                .children_of_rev(index, expr.children().count())
582                .map(|child| &results[child])
583                .collect::<Vec<_>>();
584
585            // For most expressions we'll apply `try_col_with_input_cols`, but for `Get` expressions
586            // we'll want to combine what we know (iteratively) with the stated `Get::typ`.
587            match expr {
588                MirRelationExpr::Get {
589                    id: mz_expr::Id::Local(i),
590                    typ,
591                    ..
592                } => {
593                    let mut result = typ.column_types.clone();
594                    if let Some(o) = depends.bindings().get(i) {
595                        if let Some(t) = results.get(*o) {
596                            if let Some(rec_typ) = t {
597                                // Reconcile nullability statements.
598                                // Unclear if we should trust `typ`.
599                                assert_eq!(result.len(), rec_typ.len());
600                                result.clone_from(rec_typ);
601                                for (res, col) in result.iter_mut().zip(typ.column_types.iter()) {
602                                    if !col.nullable {
603                                        res.nullable = false;
604                                    }
605                                }
606                            } else {
607                                // Our `None` information indicates that we are optimistically
608                                // assuming the best, including that all columns are non-null.
609                                // This should only happen in the first visit to a `Get` expr.
610                                // Use `typ`, but flatten nullability.
611                                for col in result.iter_mut() {
612                                    col.nullable = false;
613                                }
614                            }
615                        }
616                    }
617                    Some(result)
618                }
619                _ => {
620                    // Every expression with inputs should have non-`None` inputs at this point.
621                    let input_cols = offsets.into_iter().rev().map(|o| {
622                        o.as_ref()
623                            .expect("SqlRelationType analysis discovered type-less expression")
624                    });
625                    Some(expr.try_col_with_input_cols(input_cols).unwrap())
626                }
627            }
628        }
629
630        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
631            Some(Box::new(RTLattice))
632        }
633    }
634
635    struct RTLattice;
636
637    impl Lattice<Option<Vec<SqlColumnType>>> for RTLattice {
638        fn top(&self) -> Option<Vec<SqlColumnType>> {
639            None
640        }
641        fn meet_assign(
642            &self,
643            a: &mut Option<Vec<SqlColumnType>>,
644            b: Option<Vec<SqlColumnType>>,
645        ) -> bool {
646            match (a, b) {
647                (_, None) => false,
648                (Some(a), Some(b)) => {
649                    let mut changed = false;
650                    assert_eq!(a.len(), b.len());
651                    for (at, bt) in a.iter_mut().zip(b.iter()) {
652                        assert_eq!(at.scalar_type, bt.scalar_type);
653                        if !at.nullable && bt.nullable {
654                            at.nullable = true;
655                            changed = true;
656                        }
657                    }
658                    changed
659                }
660                (a, b) => {
661                    *a = b;
662                    true
663                }
664            }
665        }
666    }
667}
668
669/// Expression unique keys
670mod unique_keys {
671
672    use super::arity::Arity;
673    use super::{Analysis, Derived, DerivedBuilder, Lattice};
674    use mz_expr::MirRelationExpr;
675
676    /// Analysis that determines the unique keys of relation expressions.
677    ///
678    /// The analysis value is a `Vec<Vec<usize>>`, which should be interpreted as a list
679    /// of sets of column identifiers, each set of which has the property that there is at
680    /// most one instance of each assignment of values to those columns.
681    ///
682    /// The sets are minimal, in that any superset of another set is removed from the list.
683    /// Any superset of unique key columns are also unique key columns.
684    #[derive(Debug)]
685    pub struct UniqueKeys;
686
687    impl Analysis for UniqueKeys {
688        type Value = Vec<Vec<usize>>;
689
690        fn announce_dependencies(builder: &mut DerivedBuilder) {
691            builder.require(Arity);
692        }
693
694        fn derive(
695            &self,
696            expr: &MirRelationExpr,
697            index: usize,
698            results: &[Self::Value],
699            depends: &Derived,
700        ) -> Self::Value {
701            let mut offsets = depends
702                .children_of_rev(index, expr.children().count())
703                .collect::<Vec<_>>();
704            offsets.reverse();
705
706            match expr {
707                MirRelationExpr::Get {
708                    id: mz_expr::Id::Local(i),
709                    typ,
710                    ..
711                } => {
712                    // We have information from `typ` and from the analysis.
713                    // We should "join" them, unioning and reducing the keys.
714                    let mut keys = typ.keys.clone();
715                    if let Some(o) = depends.bindings().get(i) {
716                        if let Some(ks) = results.get(*o) {
717                            for k in ks.iter() {
718                                antichain_insert(&mut keys, k.clone());
719                            }
720                            keys.extend(ks.iter().cloned());
721                            keys.sort();
722                            keys.dedup();
723                        }
724                    }
725                    keys
726                }
727                _ => {
728                    let arity = depends.results::<Arity>();
729                    expr.keys_with_input_keys(
730                        offsets.iter().map(|o| arity[*o]),
731                        offsets.iter().map(|o| &results[*o]),
732                    )
733                }
734            }
735        }
736
737        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
738            Some(Box::new(UKLattice))
739        }
740    }
741
742    fn antichain_insert(into: &mut Vec<Vec<usize>>, item: Vec<usize>) {
743        // Insert only if there is not a dominating element of `into`.
744        if into.iter().all(|key| !key.iter().all(|k| item.contains(k))) {
745            into.retain(|key| !key.iter().all(|k| item.contains(k)));
746            into.push(item);
747        }
748    }
749
750    /// Lattice for sets of columns that define a unique key.
751    ///
752    /// An element `Vec<Vec<usize>>` describes all sets of columns `Vec<usize>` that are a
753    /// superset of some set of columns in the lattice element.
754    struct UKLattice;
755
756    impl Lattice<Vec<Vec<usize>>> for UKLattice {
757        fn top(&self) -> Vec<Vec<usize>> {
758            vec![vec![]]
759        }
760        fn meet_assign(&self, a: &mut Vec<Vec<usize>>, b: Vec<Vec<usize>>) -> bool {
761            a.sort();
762            a.dedup();
763            let mut c = Vec::new();
764            for cols_a in a.iter_mut() {
765                cols_a.sort();
766                cols_a.dedup();
767                for cols_b in b.iter() {
768                    let mut cols_c = cols_a.iter().chain(cols_b).cloned().collect::<Vec<_>>();
769                    cols_c.sort();
770                    cols_c.dedup();
771                    antichain_insert(&mut c, cols_c);
772                }
773            }
774            c.sort();
775            c.dedup();
776            std::mem::swap(a, &mut c);
777            a != &mut c
778        }
779    }
780}
781
782/// Determines if accumulated frequences can be negative.
783///
784/// This analysis assumes that globally identified collection have the property, and it is
785/// incorrect to apply it to expressions that reference external collections that may have
786/// negative accumulations.
787mod non_negative {
788
789    use super::{Analysis, Derived, Lattice};
790    use crate::analysis::common_lattice::BoolLattice;
791    use mz_expr::{Id, MirRelationExpr};
792
793    /// Analysis that determines if all accumulations at all times are non-negative.
794    ///
795    /// The analysis assumes that `Id::Global` references only refer to non-negative collections.
796    #[derive(Debug)]
797    pub struct NonNegative;
798
799    impl Analysis for NonNegative {
800        type Value = bool;
801
802        fn derive(
803            &self,
804            expr: &MirRelationExpr,
805            index: usize,
806            results: &[Self::Value],
807            depends: &Derived,
808        ) -> Self::Value {
809            match expr {
810                MirRelationExpr::Constant { rows, .. } => rows
811                    .as_ref()
812                    .map(|r| r.iter().all(|(_, diff)| *diff >= mz_repr::Diff::ZERO))
813                    .unwrap_or(true),
814                MirRelationExpr::Get { id, .. } => match id {
815                    Id::Local(id) => {
816                        let index = *depends
817                            .bindings()
818                            .get(id)
819                            .expect("Dependency info not found");
820                        *results.get(index).unwrap_or(&false)
821                    }
822                    Id::Global(_) => true,
823                },
824                // Negate must be false unless input is "non-positive".
825                MirRelationExpr::Negate { .. } => false,
826                // Threshold ensures non-negativity.
827                MirRelationExpr::Threshold { .. } => true,
828                // Reduce errors on negative input.
829                MirRelationExpr::Reduce { .. } => true,
830                MirRelationExpr::Join { .. } => {
831                    // If all inputs are non-negative, the join is non-negative.
832                    depends
833                        .children_of_rev(index, expr.children().count())
834                        .all(|off| results[off])
835                }
836                MirRelationExpr::Union { base, inputs } => {
837                    // If all inputs are non-negative, the union is non-negative.
838                    let all_non_negative = depends
839                        .children_of_rev(index, expr.children().count())
840                        .all(|off| results[off]);
841
842                    if all_non_negative {
843                        return true;
844                    }
845
846                    // We look for the pattern `Union { base, Negate(Subset(base)) }`.
847                    // TODO: take some care to ensure that union fusion does not introduce a regression.
848                    if inputs.len() == 1 {
849                        if let MirRelationExpr::Negate { input } = &inputs[0] {
850                            // If `base` is non-negative, and `is_superset_of(base, input)`, return true.
851                            // TODO: this is not correct until we have `is_superset_of` validate non-negativity
852                            // as it goes, but it matches the current implementation.
853                            let mut children = depends.children_of_rev(index, 2);
854                            let _negate = children.next().unwrap();
855                            let base_id = children.next().unwrap();
856                            debug_assert_eq!(children.next(), None);
857                            if results[base_id] && is_superset_of(&*base, &*input) {
858                                return true;
859                            }
860                        }
861                    }
862
863                    false
864                }
865                _ => results[index - 1],
866            }
867        }
868
869        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
870            Some(Box::new(BoolLattice))
871        }
872    }
873
874    /// Returns true only if `rhs.negate().union(lhs)` contains only non-negative multiplicities
875    /// once consolidated.
876    ///
877    /// Informally, this happens when `rhs` is a multiset subset of `lhs`, meaning the multiplicity
878    /// of any record in `rhs` is at most the multiplicity of the same record in `lhs`.
879    ///
880    /// This method recursively descends each of `lhs` and `rhs` and performs a great many equality
881    /// tests, which has the potential to be quadratic. We should consider restricting its attention
882    /// to `Get` identifiers, under the premise that equal AST nodes would necessarily be identified
883    /// by common subexpression elimination. This requires care around recursively bound identifiers.
884    ///
885    /// These rules are .. somewhat arbitrary, and likely reflect observed opportunities. For example,
886    /// while we do relate `distinct(filter(A)) <= distinct(A)`, we do not relate `distinct(A) <= A`.
887    /// Further thoughts about the class of optimizations, and whether there should be more or fewer,
888    /// can be found here: <https://github.com/MaterializeInc/database-issues/issues/4044>.
889    fn is_superset_of(mut lhs: &MirRelationExpr, mut rhs: &MirRelationExpr) -> bool {
890        // This implementation is iterative.
891        // Before converting this implementation to recursive (e.g. to improve its accuracy)
892        // make sure to use the `CheckedRecursion` struct to avoid blowing the stack.
893        while lhs != rhs {
894            match rhs {
895                MirRelationExpr::Filter { input, .. } => rhs = &**input,
896                MirRelationExpr::TopK { input, .. } => rhs = &**input,
897                // Descend in both sides if the current roots are
898                // projections with the same `outputs` vector.
899                MirRelationExpr::Project {
900                    input: rhs_input,
901                    outputs: rhs_outputs,
902                } => match lhs {
903                    MirRelationExpr::Project {
904                        input: lhs_input,
905                        outputs: lhs_outputs,
906                    } if lhs_outputs == rhs_outputs => {
907                        rhs = &**rhs_input;
908                        lhs = &**lhs_input;
909                    }
910                    _ => return false,
911                },
912                // Descend in both sides if the current roots are reduces with empty aggregates
913                // on the same set of keys (that is, a distinct operation on those keys).
914                MirRelationExpr::Reduce {
915                    input: rhs_input,
916                    group_key: rhs_group_key,
917                    aggregates: rhs_aggregates,
918                    monotonic: _,
919                    expected_group_size: _,
920                } if rhs_aggregates.is_empty() => match lhs {
921                    MirRelationExpr::Reduce {
922                        input: lhs_input,
923                        group_key: lhs_group_key,
924                        aggregates: lhs_aggregates,
925                        monotonic: _,
926                        expected_group_size: _,
927                    } if lhs_aggregates.is_empty() && lhs_group_key == rhs_group_key => {
928                        rhs = &**rhs_input;
929                        lhs = &**lhs_input;
930                    }
931                    _ => return false,
932                },
933                _ => {
934                    // TODO: Imagine more complex reasoning here!
935                    return false;
936                }
937            }
938        }
939        true
940    }
941}
942
943mod column_names {
944    use std::ops::Range;
945    use std::sync::Arc;
946
947    use super::Analysis;
948    use mz_expr::{AggregateFunc, Id, MirRelationExpr, MirScalarExpr, TableFunc};
949    use mz_repr::GlobalId;
950    use mz_repr::explain::ExprHumanizer;
951    use mz_sql::ORDINALITY_COL_NAME;
952
953    /// An abstract type denoting an inferred column name.
954    #[derive(Debug, Clone)]
955    pub enum ColumnName {
956        /// A column with name inferred to be equal to a GlobalId schema column.
957        Global(GlobalId, usize),
958        /// An anonymous expression named after the top-level function name.
959        Aggregate(AggregateFunc, Box<ColumnName>),
960        /// A column with a name that has been saved from the original SQL query.
961        Annotated(Arc<str>),
962        /// An column with an unknown name.
963        Unknown,
964    }
965
966    impl ColumnName {
967        /// Return `true` iff the variant has an inferred name.
968        pub fn is_known(&self) -> bool {
969            match self {
970                Self::Global(..) | Self::Aggregate(..) => true,
971                // We treat annotated columns as unknown because we would rather
972                // override them with inferred names, if we can.
973                Self::Annotated(..) | Self::Unknown => false,
974            }
975        }
976
977        /// Humanize the column to a [`String`], returns an empty [`String`] for
978        /// unknown columns (or columns named `"?column?"`).
979        pub fn humanize(&self, humanizer: &dyn ExprHumanizer) -> String {
980            match self {
981                Self::Global(id, c) => humanizer.humanize_column(*id, *c).unwrap_or_default(),
982                Self::Aggregate(func, expr) => {
983                    let func = func.name();
984                    let expr = expr.humanize(humanizer);
985                    if expr.is_empty() {
986                        String::from(func)
987                    } else {
988                        format!("{func}_{expr}")
989                    }
990                }
991                Self::Annotated(name) => name.to_string(),
992                Self::Unknown => String::new(),
993            }
994        }
995
996        /// Clone this column name if it is known, otherwise try to use the provided
997        /// name if it is available.
998        pub fn cloned_or_annotated(&self, name: &Option<Arc<str>>) -> Self {
999            match self {
1000                Self::Global(..) | Self::Aggregate(..) | Self::Annotated(..) => self.clone(),
1001                Self::Unknown => name
1002                    .as_ref()
1003                    .filter(|name| name.as_ref() != "\"?column?\"")
1004                    .map_or_else(|| Self::Unknown, |name| Self::Annotated(Arc::clone(name))),
1005            }
1006        }
1007    }
1008
1009    /// Compute the column types of each subtree of a [MirRelationExpr] from the
1010    /// bottom-up.
1011    #[derive(Debug)]
1012    pub struct ColumnNames;
1013
1014    impl ColumnNames {
1015        /// fallback schema consisting of ordinal column names: #0, #1, ...
1016        fn anonymous(range: Range<usize>) -> impl Iterator<Item = ColumnName> {
1017            range.map(|_| ColumnName::Unknown)
1018        }
1019
1020        /// fallback schema consisting of ordinal column names: #0, #1, ...
1021        fn extend_with_scalars(column_names: &mut Vec<ColumnName>, scalars: &Vec<MirScalarExpr>) {
1022            for scalar in scalars {
1023                column_names.push(match scalar {
1024                    MirScalarExpr::Column(c, name) => column_names[*c].cloned_or_annotated(&name.0),
1025                    _ => ColumnName::Unknown,
1026                });
1027            }
1028        }
1029    }
1030
1031    impl Analysis for ColumnNames {
1032        type Value = Vec<ColumnName>;
1033
1034        fn derive(
1035            &self,
1036            expr: &MirRelationExpr,
1037            index: usize,
1038            results: &[Self::Value],
1039            depends: &crate::analysis::Derived,
1040        ) -> Self::Value {
1041            use MirRelationExpr::*;
1042
1043            match expr {
1044                Constant { rows: _, typ } => {
1045                    // Fallback to an anonymous schema for constants.
1046                    ColumnNames::anonymous(0..typ.arity()).collect()
1047                }
1048                Get {
1049                    id: Id::Global(id),
1050                    typ,
1051                    access_strategy: _,
1052                } => {
1053                    // Emit ColumnName::Global instances for each column in the
1054                    // `Get` type. Those can be resolved to real names later when an
1055                    // ExpressionHumanizer is available.
1056                    (0..typ.columns().len())
1057                        .map(|c| ColumnName::Global(*id, c))
1058                        .collect()
1059                }
1060                Get {
1061                    id: Id::Local(id),
1062                    typ,
1063                    access_strategy: _,
1064                } => {
1065                    let index_child = *depends.bindings().get(id).expect("id in scope");
1066                    if index_child < results.len() {
1067                        results[index_child].clone()
1068                    } else {
1069                        // Possible because we infer LetRec bindings in order. This
1070                        // can be improved by introducing a fixpoint loop in the
1071                        // Env<A>::schedule_tasks LetRec handling block.
1072                        ColumnNames::anonymous(0..typ.arity()).collect()
1073                    }
1074                }
1075                Let {
1076                    id: _,
1077                    value: _,
1078                    body: _,
1079                } => {
1080                    // Return the column names of the `body`.
1081                    results[index - 1].clone()
1082                }
1083                LetRec {
1084                    ids: _,
1085                    values: _,
1086                    limits: _,
1087                    body: _,
1088                } => {
1089                    // Return the column names of the `body`.
1090                    results[index - 1].clone()
1091                }
1092                Project { input: _, outputs } => {
1093                    // Permute the column names of the input.
1094                    let input_column_names = &results[index - 1];
1095                    let mut column_names = vec![];
1096                    for col in outputs {
1097                        column_names.push(input_column_names[*col].clone());
1098                    }
1099                    column_names
1100                }
1101                Map { input: _, scalars } => {
1102                    // Extend the column names of the input with anonymous columns.
1103                    let mut column_names = results[index - 1].clone();
1104                    Self::extend_with_scalars(&mut column_names, scalars);
1105                    column_names
1106                }
1107                FlatMap {
1108                    input: _,
1109                    func,
1110                    exprs: _,
1111                } => {
1112                    // Extend the column names of the input with anonymous columns.
1113                    let mut column_names = results[index - 1].clone();
1114                    let func_output_start = column_names.len();
1115                    let func_output_end = column_names.len() + func.output_arity();
1116                    column_names.extend(Self::anonymous(func_output_start..func_output_end));
1117                    if let TableFunc::WithOrdinality { .. } = func {
1118                        // We know the name of the last column
1119                        // TODO(ggevay): generalize this to meaningful col names for all table functions
1120                        **column_names.last_mut().as_mut().expect(
1121                            "there is at least one output column, from the WITH ORDINALITY",
1122                        ) = ColumnName::Annotated(ORDINALITY_COL_NAME.into());
1123                    }
1124                    column_names
1125                }
1126                Filter {
1127                    input: _,
1128                    predicates: _,
1129                } => {
1130                    // Return the column names of the `input`.
1131                    results[index - 1].clone()
1132                }
1133                Join {
1134                    inputs: _,
1135                    equivalences: _,
1136                    implementation: _,
1137                } => {
1138                    let mut input_results = depends
1139                        .children_of_rev(index, expr.children().count())
1140                        .map(|child| &results[child])
1141                        .collect::<Vec<_>>();
1142                    input_results.reverse();
1143
1144                    let mut column_names = vec![];
1145                    for input_column_names in input_results {
1146                        column_names.extend(input_column_names.iter().cloned());
1147                    }
1148                    column_names
1149                }
1150                Reduce {
1151                    input: _,
1152                    group_key,
1153                    aggregates,
1154                    monotonic: _,
1155                    expected_group_size: _,
1156                } => {
1157                    // We clone and extend the input vector and then remove the part
1158                    // associated with the input at the end.
1159                    let mut column_names = results[index - 1].clone();
1160                    let input_arity = column_names.len();
1161
1162                    // Infer the group key part.
1163                    Self::extend_with_scalars(&mut column_names, group_key);
1164                    // Infer the aggregates part.
1165                    for aggregate in aggregates.iter() {
1166                        // The inferred name will consist of (1) the aggregate
1167                        // function name and (2) the aggregate expression (iff
1168                        // it is a simple column reference).
1169                        let func = aggregate.func.clone();
1170                        let expr = match aggregate.expr.as_column() {
1171                            Some(c) => column_names.get(c).unwrap_or(&ColumnName::Unknown).clone(),
1172                            None => ColumnName::Unknown,
1173                        };
1174                        column_names.push(ColumnName::Aggregate(func, Box::new(expr)));
1175                    }
1176                    // Remove the prefix associated with the input
1177                    column_names.drain(0..input_arity);
1178
1179                    column_names
1180                }
1181                TopK {
1182                    input: _,
1183                    group_key: _,
1184                    order_key: _,
1185                    limit: _,
1186                    offset: _,
1187                    monotonic: _,
1188                    expected_group_size: _,
1189                } => {
1190                    // Return the column names of the `input`.
1191                    results[index - 1].clone()
1192                }
1193                Negate { input: _ } => {
1194                    // Return the column names of the `input`.
1195                    results[index - 1].clone()
1196                }
1197                Threshold { input: _ } => {
1198                    // Return the column names of the `input`.
1199                    results[index - 1].clone()
1200                }
1201                Union { base: _, inputs: _ } => {
1202                    // Use the first non-empty column across all inputs.
1203                    let mut column_names = vec![];
1204
1205                    let mut inputs_results = depends
1206                        .children_of_rev(index, expr.children().count())
1207                        .map(|child| &results[child])
1208                        .collect::<Vec<_>>();
1209
1210                    let base_results = inputs_results.pop().unwrap();
1211                    inputs_results.reverse();
1212
1213                    for (i, mut column_name) in base_results.iter().cloned().enumerate() {
1214                        for input_results in inputs_results.iter() {
1215                            if !column_name.is_known() && input_results[i].is_known() {
1216                                column_name = input_results[i].clone();
1217                                break;
1218                            }
1219                        }
1220                        column_names.push(column_name);
1221                    }
1222
1223                    column_names
1224                }
1225                ArrangeBy { input: _, keys: _ } => {
1226                    // Return the column names of the `input`.
1227                    results[index - 1].clone()
1228                }
1229            }
1230        }
1231    }
1232}
1233
1234mod explain {
1235    //! Derived Analysis framework and definitions.
1236
1237    use std::collections::BTreeMap;
1238
1239    use mz_expr::MirRelationExpr;
1240    use mz_expr::explain::{ExplainContext, HumanizedExplain, HumanizerMode};
1241    use mz_ore::stack::RecursionLimitError;
1242    use mz_repr::explain::{Analyses, AnnotatedPlan};
1243
1244    use crate::analysis::equivalences::{Equivalences, HumanizedEquivalenceClasses};
1245
1246    // Analyses should have shortened paths when exported.
1247    use super::DerivedBuilder;
1248
1249    impl<'c> From<&ExplainContext<'c>> for DerivedBuilder<'c> {
1250        fn from(context: &ExplainContext<'c>) -> DerivedBuilder<'c> {
1251            let mut builder = DerivedBuilder::new(context.features);
1252            if context.config.subtree_size {
1253                builder.require(super::SubtreeSize);
1254            }
1255            if context.config.non_negative {
1256                builder.require(super::NonNegative);
1257            }
1258            if context.config.types {
1259                builder.require(super::SqlRelationType);
1260            }
1261            if context.config.arity {
1262                builder.require(super::Arity);
1263            }
1264            if context.config.keys {
1265                builder.require(super::UniqueKeys);
1266            }
1267            if context.config.cardinality {
1268                builder.require(super::Cardinality::with_stats(
1269                    context.cardinality_stats.clone(),
1270                ));
1271            }
1272            if context.config.column_names || context.config.humanized_exprs {
1273                builder.require(super::ColumnNames);
1274            }
1275            if context.config.equivalences {
1276                builder.require(Equivalences);
1277            }
1278            builder
1279        }
1280    }
1281
1282    /// Produce an [`AnnotatedPlan`] wrapping the given [`MirRelationExpr`] along
1283    /// with [`Analyses`] derived from the given context configuration.
1284    pub fn annotate_plan<'a>(
1285        plan: &'a MirRelationExpr,
1286        context: &'a ExplainContext,
1287    ) -> Result<AnnotatedPlan<'a, MirRelationExpr>, RecursionLimitError> {
1288        let mut annotations = BTreeMap::<&MirRelationExpr, Analyses>::default();
1289        let config = context.config;
1290
1291        // We want to annotate the plan with analyses in the following cases:
1292        // 1. An Analysis was explicitly requested in the ExplainConfig.
1293        // 2. Humanized expressions were requested in the ExplainConfig (in which
1294        //    case we need to derive the ColumnNames Analysis).
1295        if config.requires_analyses() || config.humanized_exprs {
1296            // get the annotation keys
1297            let subtree_refs = plan.post_order_vec();
1298            // get the annotation values
1299            let builder = DerivedBuilder::from(context);
1300            let derived = builder.visit(plan);
1301
1302            if config.subtree_size {
1303                for (expr, subtree_size) in std::iter::zip(
1304                    subtree_refs.iter(),
1305                    derived.results::<super::SubtreeSize>().into_iter(),
1306                ) {
1307                    let analyses = annotations.entry(expr).or_default();
1308                    analyses.subtree_size = Some(*subtree_size);
1309                }
1310            }
1311            if config.non_negative {
1312                for (expr, non_negative) in std::iter::zip(
1313                    subtree_refs.iter(),
1314                    derived.results::<super::NonNegative>().into_iter(),
1315                ) {
1316                    let analyses = annotations.entry(expr).or_default();
1317                    analyses.non_negative = Some(*non_negative);
1318                }
1319            }
1320
1321            if config.arity {
1322                for (expr, arity) in std::iter::zip(
1323                    subtree_refs.iter(),
1324                    derived.results::<super::Arity>().into_iter(),
1325                ) {
1326                    let analyses = annotations.entry(expr).or_default();
1327                    analyses.arity = Some(*arity);
1328                }
1329            }
1330
1331            if config.types {
1332                for (expr, types) in std::iter::zip(
1333                    subtree_refs.iter(),
1334                    derived.results::<super::SqlRelationType>().into_iter(),
1335                ) {
1336                    let analyses = annotations.entry(expr).or_default();
1337                    analyses.types = Some(types.clone());
1338                }
1339            }
1340
1341            if config.keys {
1342                for (expr, keys) in std::iter::zip(
1343                    subtree_refs.iter(),
1344                    derived.results::<super::UniqueKeys>().into_iter(),
1345                ) {
1346                    let analyses = annotations.entry(expr).or_default();
1347                    analyses.keys = Some(keys.clone());
1348                }
1349            }
1350
1351            if config.cardinality {
1352                for (expr, card) in std::iter::zip(
1353                    subtree_refs.iter(),
1354                    derived.results::<super::Cardinality>().into_iter(),
1355                ) {
1356                    let analyses = annotations.entry(expr).or_default();
1357                    analyses.cardinality = Some(card.to_string());
1358                }
1359            }
1360
1361            if config.column_names || config.humanized_exprs {
1362                for (expr, column_names) in std::iter::zip(
1363                    subtree_refs.iter(),
1364                    derived.results::<super::ColumnNames>().into_iter(),
1365                ) {
1366                    let analyses = annotations.entry(expr).or_default();
1367                    let value = column_names
1368                        .iter()
1369                        .map(|column_name| column_name.humanize(context.humanizer))
1370                        .collect();
1371                    analyses.column_names = Some(value);
1372                }
1373            }
1374
1375            if config.equivalences {
1376                for (expr, equivs) in std::iter::zip(
1377                    subtree_refs.iter(),
1378                    derived.results::<Equivalences>().into_iter(),
1379                ) {
1380                    let analyses = annotations.entry(expr).or_default();
1381                    analyses.equivalences = Some(match equivs.as_ref() {
1382                        Some(equivs) => HumanizedEquivalenceClasses {
1383                            equivalence_classes: equivs,
1384                            cols: analyses.column_names.as_ref(),
1385                            mode: HumanizedExplain::new(config.redacted),
1386                        }
1387                        .to_string(),
1388                        None => "<empty collection>".to_string(),
1389                    });
1390                }
1391            }
1392        }
1393
1394        Ok(AnnotatedPlan { plan, annotations })
1395    }
1396}
1397
1398/// Definition and helper structs for the [`Cardinality`] Analysis.
1399mod cardinality {
1400    use std::collections::{BTreeMap, BTreeSet};
1401
1402    use mz_expr::{
1403        BinaryFunc, Id, JoinImplementation, MirRelationExpr, MirScalarExpr, TableFunc, UnaryFunc,
1404        VariadicFunc,
1405    };
1406    use mz_ore::cast::{CastFrom, CastLossy, TryCastFrom};
1407    use mz_repr::GlobalId;
1408
1409    use ordered_float::OrderedFloat;
1410
1411    use super::{Analysis, Arity, SubtreeSize, UniqueKeys};
1412
1413    /// Compute the estimated cardinality of each subtree of a [MirRelationExpr] from the bottom up.
1414    #[allow(missing_debug_implementations)]
1415    pub struct Cardinality {
1416        /// Cardinalities for globally named entities
1417        pub stats: BTreeMap<GlobalId, usize>,
1418    }
1419
1420    impl Cardinality {
1421        /// A cardinality estimator with provided statistics for the given global identifiers
1422        pub fn with_stats(stats: BTreeMap<GlobalId, usize>) -> Self {
1423            Cardinality { stats }
1424        }
1425    }
1426
1427    impl Default for Cardinality {
1428        fn default() -> Self {
1429            Cardinality {
1430                stats: BTreeMap::new(),
1431            }
1432        }
1433    }
1434
1435    /// Cardinality estimates
1436    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1437    pub enum CardinalityEstimate {
1438        Unknown,
1439        Estimate(OrderedFloat<f64>),
1440    }
1441
1442    impl CardinalityEstimate {
1443        pub fn max(lhs: CardinalityEstimate, rhs: CardinalityEstimate) -> CardinalityEstimate {
1444            use CardinalityEstimate::*;
1445            match (lhs, rhs) {
1446                (Estimate(lhs), Estimate(rhs)) => Estimate(std::cmp::max(lhs, rhs)),
1447                _ => Unknown,
1448            }
1449        }
1450
1451        pub fn rounded(&self) -> Option<usize> {
1452            match self {
1453                CardinalityEstimate::Estimate(OrderedFloat(f)) => {
1454                    let rounded = f.ceil();
1455                    let flattened = usize::cast_from(
1456                        u64::try_cast_from(rounded)
1457                            .expect("positive and representable cardinality estimate"),
1458                    );
1459
1460                    Some(flattened)
1461                }
1462                CardinalityEstimate::Unknown => None,
1463            }
1464        }
1465    }
1466
1467    impl std::ops::Add for CardinalityEstimate {
1468        type Output = CardinalityEstimate;
1469
1470        fn add(self, rhs: Self) -> Self::Output {
1471            use CardinalityEstimate::*;
1472            match (self, rhs) {
1473                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs + rhs),
1474                _ => Unknown,
1475            }
1476        }
1477    }
1478
1479    impl std::ops::Sub for CardinalityEstimate {
1480        type Output = CardinalityEstimate;
1481
1482        fn sub(self, rhs: Self) -> Self::Output {
1483            use CardinalityEstimate::*;
1484            match (self, rhs) {
1485                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs - rhs),
1486                _ => Unknown,
1487            }
1488        }
1489    }
1490
1491    impl std::ops::Sub<CardinalityEstimate> for f64 {
1492        type Output = CardinalityEstimate;
1493
1494        fn sub(self, rhs: CardinalityEstimate) -> Self::Output {
1495            use CardinalityEstimate::*;
1496            if let Estimate(OrderedFloat(rhs)) = rhs {
1497                Estimate(OrderedFloat(self - rhs))
1498            } else {
1499                Unknown
1500            }
1501        }
1502    }
1503
1504    impl std::ops::Mul for CardinalityEstimate {
1505        type Output = CardinalityEstimate;
1506
1507        fn mul(self, rhs: Self) -> Self::Output {
1508            use CardinalityEstimate::*;
1509            match (self, rhs) {
1510                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs * rhs),
1511                _ => Unknown,
1512            }
1513        }
1514    }
1515
1516    impl std::ops::Mul<f64> for CardinalityEstimate {
1517        type Output = CardinalityEstimate;
1518
1519        fn mul(self, rhs: f64) -> Self::Output {
1520            if let CardinalityEstimate::Estimate(OrderedFloat(lhs)) = self {
1521                CardinalityEstimate::Estimate(OrderedFloat(lhs * rhs))
1522            } else {
1523                CardinalityEstimate::Unknown
1524            }
1525        }
1526    }
1527
1528    impl std::ops::Div for CardinalityEstimate {
1529        type Output = CardinalityEstimate;
1530
1531        fn div(self, rhs: Self) -> Self::Output {
1532            use CardinalityEstimate::*;
1533            match (self, rhs) {
1534                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs / rhs),
1535                _ => Unknown,
1536            }
1537        }
1538    }
1539
1540    impl std::ops::Div<f64> for CardinalityEstimate {
1541        type Output = CardinalityEstimate;
1542
1543        fn div(self, rhs: f64) -> Self::Output {
1544            use CardinalityEstimate::*;
1545            if let Estimate(lhs) = self {
1546                Estimate(lhs / OrderedFloat(rhs))
1547            } else {
1548                Unknown
1549            }
1550        }
1551    }
1552
1553    impl std::iter::Sum for CardinalityEstimate {
1554        fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
1555            iter.fold(CardinalityEstimate::from(0.0), |acc, elt| acc + elt)
1556        }
1557    }
1558
1559    impl std::iter::Product for CardinalityEstimate {
1560        fn product<I: Iterator<Item = Self>>(iter: I) -> Self {
1561            iter.fold(CardinalityEstimate::from(1.0), |acc, elt| acc * elt)
1562        }
1563    }
1564
1565    impl From<usize> for CardinalityEstimate {
1566        fn from(value: usize) -> Self {
1567            Self::Estimate(OrderedFloat(f64::cast_lossy(value)))
1568        }
1569    }
1570
1571    impl From<f64> for CardinalityEstimate {
1572        fn from(value: f64) -> Self {
1573            Self::Estimate(OrderedFloat(value))
1574        }
1575    }
1576
1577    /// The default selectivity for predicates we know nothing about.
1578    ///
1579    /// But see also expr/src/scalar.rs for `FilterCharacteristics::worst_case_scaling_factor()` for a more nuanced take.
1580    pub const WORST_CASE_SELECTIVITY: OrderedFloat<f64> = OrderedFloat(0.1);
1581
1582    // This section defines how we estimate cardinality for each syntactic construct.
1583    //
1584    // We split it up into functions to make it all a bit more tractable to work with.
1585    impl Cardinality {
1586        fn flat_map(&self, tf: &TableFunc, input: CardinalityEstimate) -> CardinalityEstimate {
1587            match tf {
1588                TableFunc::Wrap { types, width } => {
1589                    input * (f64::cast_lossy(types.len()) / f64::cast_lossy(*width))
1590                }
1591                _ => {
1592                    // TODO(mgree) what explosion factor should we make up?
1593                    input * CardinalityEstimate::from(4.0)
1594                }
1595            }
1596        }
1597
1598        fn predicate(
1599            &self,
1600            predicate_expr: &MirScalarExpr,
1601            unique_columns: &BTreeSet<usize>,
1602        ) -> OrderedFloat<f64> {
1603            let index_selectivity = |expr: &MirScalarExpr| -> Option<OrderedFloat<f64>> {
1604                match expr {
1605                    MirScalarExpr::Column(col, _) => {
1606                        if unique_columns.contains(col) {
1607                            // TODO(mgree): when we have index cardinality statistics, they should go here when `expr` is a `MirScalarExpr::Column` that's in `unique_columns`
1608                            None
1609                        } else {
1610                            None
1611                        }
1612                    }
1613                    _ => None,
1614                }
1615            };
1616
1617            match predicate_expr {
1618                MirScalarExpr::Column(_, _)
1619                | MirScalarExpr::Literal(_, _)
1620                | MirScalarExpr::CallUnmaterializable(_) => OrderedFloat(1.0),
1621                MirScalarExpr::CallUnary { func, expr } => match func {
1622                    UnaryFunc::Not(_) => OrderedFloat(1.0) - self.predicate(expr, unique_columns),
1623                    UnaryFunc::IsTrue(_) | UnaryFunc::IsFalse(_) => OrderedFloat(0.5),
1624                    UnaryFunc::IsNull(_) => {
1625                        if let Some(icard) = index_selectivity(expr) {
1626                            icard
1627                        } else {
1628                            WORST_CASE_SELECTIVITY
1629                        }
1630                    }
1631                    _ => WORST_CASE_SELECTIVITY,
1632                },
1633                MirScalarExpr::CallBinary { func, expr1, expr2 } => {
1634                    match func {
1635                        BinaryFunc::Eq => {
1636                            match (index_selectivity(expr1), index_selectivity(expr2)) {
1637                                (Some(isel1), Some(isel2)) => std::cmp::max(isel1, isel2),
1638                                (Some(isel), None) | (None, Some(isel)) => isel,
1639                                (None, None) => WORST_CASE_SELECTIVITY,
1640                            }
1641                        }
1642                        // 1.0 - the Eq case
1643                        BinaryFunc::NotEq => {
1644                            match (index_selectivity(expr1), index_selectivity(expr2)) {
1645                                (Some(isel1), Some(isel2)) => {
1646                                    OrderedFloat(1.0) - std::cmp::max(isel1, isel2)
1647                                }
1648                                (Some(isel), None) | (None, Some(isel)) => OrderedFloat(1.0) - isel,
1649                                (None, None) => OrderedFloat(1.0) - WORST_CASE_SELECTIVITY,
1650                            }
1651                        }
1652                        BinaryFunc::Lt | BinaryFunc::Lte | BinaryFunc::Gt | BinaryFunc::Gte => {
1653                            // TODO(mgree) if we have high/low key values and one of the columns is an index, we can do better
1654                            OrderedFloat(0.33)
1655                        }
1656                        _ => OrderedFloat(1.0), // TOOD(mgree): are there other interesting cases?
1657                    }
1658                }
1659                MirScalarExpr::CallVariadic { func, exprs } => match func {
1660                    VariadicFunc::And => exprs
1661                        .iter()
1662                        .map(|expr| self.predicate(expr, unique_columns))
1663                        .product(),
1664                    VariadicFunc::Or => {
1665                        // TODO(mgree): BETWEEN will get compiled down to an AND of appropriate bounds---we could try to detect it and be clever
1666
1667                        // F(expr1 OR expr2) = F(expr1) + F(expr2) - F(expr1) * F(expr2), but generalized
1668                        let mut exprs = exprs.into_iter();
1669
1670                        let mut expr1;
1671
1672                        if let Some(first) = exprs.next() {
1673                            expr1 = self.predicate(first, unique_columns);
1674                        } else {
1675                            return OrderedFloat(1.0);
1676                        }
1677
1678                        for expr2 in exprs {
1679                            let expr2 = self.predicate(expr2, unique_columns);
1680                            expr1 = expr1 + expr2 - expr1 * expr2;
1681                        }
1682                        expr1
1683                    }
1684                    _ => OrderedFloat(1.0),
1685                },
1686                MirScalarExpr::If { cond: _, then, els } => std::cmp::max(
1687                    self.predicate(then, unique_columns),
1688                    self.predicate(els, unique_columns),
1689                ),
1690            }
1691        }
1692
1693        fn filter(
1694            &self,
1695            predicates: &Vec<MirScalarExpr>,
1696            keys: &Vec<Vec<usize>>,
1697            input: CardinalityEstimate,
1698        ) -> CardinalityEstimate {
1699            // TODO(mgree): should we try to do something for indices built on multiple columns?
1700            let mut unique_columns = BTreeSet::new();
1701            for key in keys {
1702                if key.len() == 1 {
1703                    unique_columns.insert(key[0]);
1704                }
1705            }
1706
1707            let mut estimate = input;
1708            for expr in predicates {
1709                let selectivity = self.predicate(expr, &unique_columns);
1710                debug_assert!(
1711                    OrderedFloat(0.0) <= selectivity && selectivity <= OrderedFloat(1.0),
1712                    "predicate selectivity {selectivity} should be in the range [0,1]"
1713                );
1714                estimate = estimate * selectivity.0;
1715            }
1716
1717            estimate
1718        }
1719
1720        fn join(
1721            &self,
1722            equivalences: &Vec<Vec<MirScalarExpr>>,
1723            _implementation: &JoinImplementation,
1724            unique_columns: BTreeMap<usize, usize>,
1725            mut inputs: Vec<CardinalityEstimate>,
1726        ) -> CardinalityEstimate {
1727            if inputs.is_empty() {
1728                return CardinalityEstimate::from(0.0);
1729            }
1730
1731            for equiv in equivalences {
1732                // those sources which have a unique key
1733                let mut unique_sources = BTreeSet::new();
1734                let mut all_unique = true;
1735
1736                for expr in equiv {
1737                    if let MirScalarExpr::Column(col, _) = expr {
1738                        if let Some(idx) = unique_columns.get(col) {
1739                            unique_sources.insert(*idx);
1740                        } else {
1741                            all_unique = false;
1742                        }
1743                    } else {
1744                        all_unique = false;
1745                    }
1746                }
1747
1748                // no unique columns in this equivalence
1749                if unique_sources.is_empty() {
1750                    continue;
1751                }
1752
1753                // ALL unique columns in this equivalence
1754                if all_unique {
1755                    // these inputs have unique keys for _all_ of the equivalence, so they're a bound on how many rows we'll get from those sources
1756                    // we'll find the leftmost such input and use it to hold the minimum; the other sources we set to 1.0 (so they have no effect)
1757                    let mut sources = unique_sources.iter();
1758
1759                    let lhs_idx = *sources.next().unwrap();
1760                    let mut lhs =
1761                        std::mem::replace(&mut inputs[lhs_idx], CardinalityEstimate::from(1.0));
1762                    for &rhs_idx in sources {
1763                        let rhs =
1764                            std::mem::replace(&mut inputs[rhs_idx], CardinalityEstimate::from(1.0));
1765                        lhs = CardinalityEstimate::min(lhs, rhs);
1766                    }
1767
1768                    inputs[lhs_idx] = lhs;
1769
1770                    // best option! go look at the next equivalence
1771                    continue;
1772                }
1773
1774                // some unique columns in this equivalence
1775                for idx in unique_sources {
1776                    // when joining R and S on R.x = S.x, if R.x is unique and S.x is not, we're bounded above by the cardinality of S
1777                    inputs[idx] = CardinalityEstimate::from(1.0);
1778                }
1779            }
1780
1781            let mut product = CardinalityEstimate::from(1.0);
1782            for input in inputs {
1783                product = product * input;
1784            }
1785            product
1786        }
1787
1788        fn reduce(
1789            &self,
1790            group_key: &Vec<MirScalarExpr>,
1791            expected_group_size: &Option<u64>,
1792            input: CardinalityEstimate,
1793        ) -> CardinalityEstimate {
1794            // TODO(mgree): if no `group_key` is present, we can do way better
1795
1796            if let Some(group_size) = expected_group_size {
1797                input / f64::cast_lossy(*group_size)
1798            } else if group_key.is_empty() {
1799                CardinalityEstimate::from(1.0)
1800            } else {
1801                // in the worst case, every row is its own group
1802                input
1803            }
1804        }
1805
1806        fn topk(
1807            &self,
1808            group_key: &Vec<usize>,
1809            limit: &Option<MirScalarExpr>,
1810            expected_group_size: &Option<u64>,
1811            input: CardinalityEstimate,
1812        ) -> CardinalityEstimate {
1813            // TODO: support simple arithmetic expressions
1814            let k = limit
1815                .as_ref()
1816                .and_then(|l| l.as_literal_int64())
1817                .map_or(1, |l| std::cmp::max(0, l));
1818
1819            if let Some(group_size) = expected_group_size {
1820                input * (f64::cast_lossy(k) / f64::cast_lossy(*group_size))
1821            } else if group_key.is_empty() {
1822                CardinalityEstimate::from(f64::cast_lossy(k))
1823            } else {
1824                // in the worst case, every row is its own group
1825                input.clone()
1826            }
1827        }
1828
1829        fn threshold(&self, input: CardinalityEstimate) -> CardinalityEstimate {
1830            // worst case scaling factor is 1
1831            input.clone()
1832        }
1833    }
1834
1835    impl Analysis for Cardinality {
1836        type Value = CardinalityEstimate;
1837
1838        fn announce_dependencies(builder: &mut crate::analysis::DerivedBuilder) {
1839            builder.require(crate::analysis::Arity);
1840            builder.require(crate::analysis::UniqueKeys);
1841        }
1842
1843        fn derive(
1844            &self,
1845            expr: &MirRelationExpr,
1846            index: usize,
1847            results: &[Self::Value],
1848            depends: &crate::analysis::Derived,
1849        ) -> Self::Value {
1850            use MirRelationExpr::*;
1851
1852            let sizes = depends.as_view().results::<SubtreeSize>();
1853            let arity = depends.as_view().results::<Arity>();
1854            let keys = depends.as_view().results::<UniqueKeys>();
1855
1856            match expr {
1857                Constant { rows, .. } => {
1858                    CardinalityEstimate::from(rows.as_ref().map_or_else(|_| 0, |v| v.len()))
1859                }
1860                Get { id, .. } => match id {
1861                    Id::Local(id) => depends
1862                        .bindings()
1863                        .get(id)
1864                        .and_then(|id| results.get(*id))
1865                        .copied()
1866                        .unwrap_or(CardinalityEstimate::Unknown),
1867                    Id::Global(id) => self
1868                        .stats
1869                        .get(id)
1870                        .copied()
1871                        .map(CardinalityEstimate::from)
1872                        .unwrap_or(CardinalityEstimate::Unknown),
1873                },
1874                Let { .. } | Project { .. } | Map { .. } | ArrangeBy { .. } | Negate { .. } => {
1875                    results[index - 1].clone()
1876                }
1877                LetRec { .. } =>
1878                // TODO(mgree): implement a recurrence-based approach (or at least identify common idioms, e.g. transitive closure)
1879                {
1880                    CardinalityEstimate::Unknown
1881                }
1882                Union { base: _, inputs: _ } => depends
1883                    .children_of_rev(index, expr.children().count())
1884                    .map(|off| results[off].clone())
1885                    .sum(),
1886                FlatMap { func, .. } => {
1887                    let input = results[index - 1];
1888                    self.flat_map(func, input)
1889                }
1890                Filter { predicates, .. } => {
1891                    let input = results[index - 1];
1892                    let keys = depends.results::<UniqueKeys>();
1893                    let keys = &keys[index - 1];
1894                    self.filter(predicates, keys, input)
1895                }
1896                Join {
1897                    equivalences,
1898                    implementation,
1899                    inputs,
1900                    ..
1901                } => {
1902                    let mut input_results = Vec::with_capacity(inputs.len());
1903
1904                    // maps a column to the index in `inputs` that it belongs to
1905                    let mut unique_columns = BTreeMap::new();
1906                    let mut key_offset = 0;
1907
1908                    let mut offset = 1;
1909                    for idx in 0..inputs.len() {
1910                        let input = results[index - offset];
1911                        input_results.push(input);
1912
1913                        let arity = arity[index - offset];
1914                        let keys = &keys[index - offset];
1915                        for key in keys {
1916                            if key.len() == 1 {
1917                                unique_columns.insert(key_offset + key[0], idx);
1918                            }
1919                        }
1920                        key_offset += arity;
1921
1922                        offset += &sizes[index - offset];
1923                    }
1924
1925                    self.join(equivalences, implementation, unique_columns, input_results)
1926                }
1927                Reduce {
1928                    group_key,
1929                    expected_group_size,
1930                    ..
1931                } => {
1932                    let input = results[index - 1];
1933                    self.reduce(group_key, expected_group_size, input)
1934                }
1935                TopK {
1936                    group_key,
1937                    limit,
1938                    expected_group_size,
1939                    ..
1940                } => {
1941                    let input = results[index - 1];
1942                    self.topk(group_key, limit, expected_group_size, input)
1943                }
1944                Threshold { .. } => {
1945                    let input = results[index - 1];
1946                    self.threshold(input)
1947                }
1948            }
1949        }
1950    }
1951
1952    impl std::fmt::Display for CardinalityEstimate {
1953        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1954            match self {
1955                CardinalityEstimate::Estimate(OrderedFloat(estimate)) => write!(f, "{estimate}"),
1956                CardinalityEstimate::Unknown => write!(f, "<UNKNOWN>"),
1957            }
1958        }
1959    }
1960}
1961
1962mod common_lattice {
1963    use crate::analysis::Lattice;
1964
1965    pub struct BoolLattice;
1966
1967    impl Lattice<bool> for BoolLattice {
1968        // `true` > `false`.
1969        fn top(&self) -> bool {
1970            true
1971        }
1972        // `false` is the greatest lower bound. `into` changes if it's true and `item` is false.
1973        fn meet_assign(&self, into: &mut bool, item: bool) -> bool {
1974            let changed = *into && !item;
1975            *into = *into && item;
1976            changed
1977        }
1978    }
1979}