Skip to main content

mz_transform/
analysis.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Traits and types for reusable expression analysis
11
12pub mod equivalences;
13pub mod monotonic;
14
15use mz_expr::MirRelationExpr;
16
17pub use arity::Arity;
18pub use cardinality::Cardinality;
19pub use column_names::{ColumnName, ColumnNames};
20pub use common::{Derived, DerivedBuilder, DerivedView};
21pub use explain::annotate_plan;
22pub use non_negative::NonNegative;
23pub use repr_types::ReprRelationType;
24pub use subtree::SubtreeSize;
25pub use unique_keys::UniqueKeys;
26
27/// An analysis that can be applied bottom-up to a `MirRelationExpr`.
28pub trait Analysis: 'static {
29    /// The type of value this analysis associates with an expression.
30    type Value: std::fmt::Debug;
31    /// Announce any dependencies this analysis has on other analyses.
32    ///
33    /// The method should invoke `builder.require::<Foo>()` for each other
34    /// analysis `Foo` this analysis depends upon.
35    fn announce_dependencies(_builder: &mut DerivedBuilder) {}
36    /// The analysis value derived for an expression, given other analysis results.
37    ///
38    /// The other analysis results include the results of this analysis for all children,
39    /// in `results`, and the results of other analyses this analysis has expressed a
40    /// dependence upon, in `depends`, for children and the expression itself.
41    /// The analysis results for `Self` can only be found in `results`, and are not
42    /// available in `depends`.
43    ///
44    /// Implementors of this method must defensively check references into `results`, as
45    /// it may be invoked on `LetRec` bindings that have not yet been populated. It is up
46    /// to the analysis what to do in that case, but conservative behavior is recommended.
47    ///
48    /// The `index` indicates the post-order index for the expression, for use in finding
49    /// the corresponding information in `results` and `depends`.
50    ///
51    /// The returned result will be associated with this expression for this analysis,
52    /// and the analyses will continue.
53    fn derive(
54        &self,
55        expr: &MirRelationExpr,
56        index: usize,
57        results: &[Self::Value],
58        depends: &Derived,
59    ) -> Self::Value;
60
61    /// When available, provide a lattice interface to allow optimistic recursion.
62    ///
63    /// Providing a non-`None` output indicates that the analysis intends re-iteration.
64    fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
65        None
66    }
67}
68
69/// Lattice methods for repeated analysis
70pub trait Lattice<T> {
71    /// An element greater than all other elements.
72    fn top(&self) -> T;
73    /// Set `a` to the greatest lower bound of `a` and `b`, and indicate if `a` changed as a result.
74    fn meet_assign(&self, a: &mut T, b: T) -> bool;
75}
76
77/// Types common across multiple analyses
78pub mod common {
79
80    use std::any::{Any, TypeId};
81    use std::collections::BTreeMap;
82
83    use itertools::Itertools;
84    use mz_expr::LocalId;
85    use mz_expr::MirRelationExpr;
86    use mz_ore::assert_none;
87    use mz_repr::optimize::OptimizerFeatures;
88
89    use super::Analysis;
90    use super::subtree::SubtreeSize;
91
92    /// Container for analysis state and binding context.
93    #[derive(Default)]
94    #[allow(missing_debug_implementations)]
95    pub struct Derived {
96        /// A record of active analyses and their results, indexed by their type id.
97        analyses: BTreeMap<TypeId, Box<dyn AnalysisBundle>>,
98        /// Analyses ordered where each depends only on strictly prior analyses.
99        order: Vec<TypeId>,
100        /// Map from local identifier to result offset for analysis values.
101        bindings: BTreeMap<LocalId, usize>,
102    }
103
104    impl Derived {
105        /// Return the analysis results derived so far.
106        ///
107        /// # Panics
108        ///
109        /// This method panics if `A` was not installed as a required analysis.
110        pub fn results<A: Analysis>(&self) -> &[A::Value] {
111            let type_id = TypeId::of::<Bundle<A>>();
112            if let Some(bundle) = self.analyses.get(&type_id) {
113                if let Some(bundle) = bundle.as_any().downcast_ref::<Bundle<A>>() {
114                    return &bundle.results[..];
115                }
116            }
117            panic!("Analysis {:?} missing", std::any::type_name::<A>());
118        }
119        /// Bindings from local identifiers to result offsets for analysis values.
120        pub fn bindings(&self) -> &BTreeMap<LocalId, usize> {
121            &self.bindings
122        }
123        /// Result offsets for the state of a various number of children of the current expression.
124        ///
125        /// The integers are the zero-offset locations in the `SubtreeSize` analysis. The order of
126        /// the children is descending, from last child to first, because of how the information is
127        /// laid out, and the non-reversibility of the look-ups.
128        ///
129        /// It is an error to call this method with more children than expression has.
130        pub fn children_of_rev<'a>(
131            &'a self,
132            start: usize,
133            count: usize,
134        ) -> impl Iterator<Item = usize> + 'a {
135            let sizes = self.results::<SubtreeSize>();
136            let offset = 1;
137            (0..count).scan(offset, move |offset, _| {
138                let result = start - *offset;
139                *offset += sizes[result];
140                Some(result)
141            })
142        }
143
144        /// Recast the derived data as a view that can be subdivided into views over child state.
145        pub fn as_view<'a>(&'a self) -> DerivedView<'a> {
146            DerivedView {
147                derived: self,
148                lower: 0,
149                upper: self.results::<SubtreeSize>().len(),
150            }
151        }
152    }
153
154    /// The subset of a `Derived` corresponding to an expression and its children.
155    ///
156    /// Specifically, bounds an interval `[lower, upper)` that ends with the state
157    /// of an expression, at `upper-1`, and is preceded by the state of descendents.
158    ///
159    /// This is best thought of as a node in a tree rather
160    #[allow(missing_debug_implementations)]
161    #[derive(Copy, Clone)]
162    pub struct DerivedView<'a> {
163        derived: &'a Derived,
164        lower: usize,
165        upper: usize,
166    }
167
168    impl<'a> DerivedView<'a> {
169        /// The value associated with the expression.
170        pub fn value<A: Analysis>(&self) -> Option<&'a A::Value> {
171            self.results::<A>().last()
172        }
173
174        /// The post-order traversal index for the expression.
175        ///
176        /// This can be used to index into the full set of results, as provided by an
177        /// instance of `Derived`.
178        pub fn index(&self) -> usize {
179            self.upper - 1
180        }
181
182        /// The value bound to an identifier, if it has been derived.
183        ///
184        /// There are several reasons the value could not be derived, and this method
185        /// does not distinguish between them.
186        pub fn bound<A: Analysis>(&self, id: LocalId) -> Option<&'a A::Value> {
187            self.derived
188                .bindings
189                .get(&id)
190                .and_then(|index| self.derived.results::<A>().get(*index))
191        }
192
193        /// The results for expression and its children.
194        ///
195        /// The results for the expression itself will be the last element.
196        ///
197        /// # Panics
198        ///
199        /// This method panics if `A` was not installed as a required analysis.
200        pub fn results<A: Analysis>(&self) -> &'a [A::Value] {
201            &self.derived.results::<A>()[self.lower..self.upper]
202        }
203
204        /// Bindings from local identifiers to result offsets for analysis values.
205        ///
206        /// This method returns all bindings, which may include bindings not in scope for
207        /// the expression and its children; they should be ignored.
208        pub fn bindings(&self) -> &'a BTreeMap<LocalId, usize> {
209            self.derived.bindings()
210        }
211
212        /// Subviews over `self` corresponding to the children of the expression, in reverse order.
213        ///
214        /// These views should disjointly cover the same interval as `self`, except for the last element
215        /// which corresponds to the expression itself.
216        ///
217        /// The number of produced items should exactly match the number of children, which need not
218        /// be provided as an argument. This relies on the well-formedness of the view, which should
219        /// exhaust itself just as it enumerates its last (the first) child view.
220        pub fn children_rev(&self) -> impl Iterator<Item = DerivedView<'a>> + 'a {
221            // This logic is copy/paste from `Derived::children_of_rev` but it was annoying to layer
222            // it over the output of that function, and perhaps clearer to rewrite in any case.
223
224            // Discard the last element (the size of the expression's subtree).
225            // Repeatedly read out the last element, then peel off that many elements.
226            // Each extracted slice corresponds to a child of the current expression.
227            // We should end cleanly with an empty slice, otherwise there is an issue.
228            let sizes = self.results::<SubtreeSize>();
229            let sizes = &sizes[..sizes.len() - 1];
230
231            let offset = self.lower;
232            let derived = self.derived;
233            (0..).scan(sizes, move |sizes, _| {
234                if let Some(size) = sizes.last() {
235                    *sizes = &sizes[..sizes.len() - size];
236                    Some(Self {
237                        derived,
238                        lower: offset + sizes.len(),
239                        upper: offset + sizes.len() + size,
240                    })
241                } else {
242                    None
243                }
244            })
245        }
246
247        /// A convenience method for the view over the expressions last child.
248        ///
249        /// This method is appropriate to call on expressions with multiple children,
250        /// and in particular for `Let` and `LetRecv` variants where the body is the
251        /// last child.
252        ///
253        /// It is an error to call this on a view for an expression with no children.
254        pub fn last_child(&self) -> DerivedView<'a> {
255            self.children_rev().next().unwrap()
256        }
257    }
258
259    /// A builder wrapper to accumulate announced dependencies and construct default state.
260    #[allow(missing_debug_implementations)]
261    pub struct DerivedBuilder<'a> {
262        result: Derived,
263        features: &'a OptimizerFeatures,
264    }
265
266    impl<'a> DerivedBuilder<'a> {
267        /// Create a new [`DerivedBuilder`] parameterized by [`OptimizerFeatures`].
268        pub fn new(features: &'a OptimizerFeatures) -> Self {
269            // The default builder should include `SubtreeSize` to facilitate navigation.
270            let mut builder = DerivedBuilder {
271                result: Derived::default(),
272                features,
273            };
274            builder.require(SubtreeSize);
275            builder
276        }
277    }
278
279    impl<'a> DerivedBuilder<'a> {
280        /// Announces a dependence on an analysis `A`.
281        ///
282        /// This ensures that `A` will be performed, and before any analysis that
283        /// invokes this method.
284        pub fn require<A: Analysis>(&mut self, analysis: A) {
285            // The method recursively descends through required analyses, first
286            // installing each in `result.analyses` and second in `result.order`.
287            // The first is an obligation, and serves as an indication that we have
288            // found a cycle in dependencies.
289            let type_id = TypeId::of::<Bundle<A>>();
290            if !self.result.order.contains(&type_id) {
291                // If we have not sequenced `type_id` but have a bundle, it means
292                // we are in the process of fulfilling its requirements: a cycle.
293                if self.result.analyses.contains_key(&type_id) {
294                    panic!("Cyclic dependency detected: {}", std::any::type_name::<A>());
295                }
296                // Insert the analysis bundle first, so that we can detect cycles.
297                self.result.analyses.insert(
298                    type_id,
299                    Box::new(Bundle::<A> {
300                        analysis,
301                        results: Vec::new(),
302                        fuel: 100,
303                        allow_optimistic: self.features.enable_letrec_fixpoint_analysis,
304                    }),
305                );
306                A::announce_dependencies(self);
307                // All dependencies are successfully sequenced; sequence `type_id`.
308                self.result.order.push(type_id);
309            }
310        }
311        /// Complete the building: perform analyses and return the resulting `Derivation`.
312        pub fn visit(mut self, expr: &MirRelationExpr) -> Derived {
313            // A stack of expressions to process (`Ok`) and let bindings to fill (`Err`).
314            let mut todo = vec![Ok(expr)];
315            // Expressions in reverse post-order: each expression, followed by its children in reverse order.
316            // We will reverse this to get the post order, but must form it in reverse.
317            let mut rev_post_order = Vec::new();
318            while let Some(command) = todo.pop() {
319                match command {
320                    // An expression to visit.
321                    Ok(expr) => {
322                        match expr {
323                            MirRelationExpr::Let { id, value, body } => {
324                                todo.push(Ok(value));
325                                todo.push(Err(*id));
326                                todo.push(Ok(body));
327                            }
328                            MirRelationExpr::LetRec {
329                                ids, values, body, ..
330                            } => {
331                                for (id, value) in ids.iter().zip_eq(values) {
332                                    todo.push(Ok(value));
333                                    todo.push(Err(*id));
334                                }
335                                todo.push(Ok(body));
336                            }
337                            _ => {
338                                todo.extend(expr.children().map(Ok));
339                            }
340                        }
341                        rev_post_order.push(expr);
342                    }
343                    // A local id to install
344                    Err(local_id) => {
345                        // Capture the *remaining* work, which we'll need to flip around.
346                        let prior = self.result.bindings.insert(local_id, rev_post_order.len());
347                        assert_none!(prior, "Shadowing not allowed");
348                    }
349                }
350            }
351            // Flip the offsets now that we know a length.
352            for value in self.result.bindings.values_mut() {
353                *value = rev_post_order.len() - *value - 1;
354            }
355            // Visit the pre-order in reverse order: post-order.
356            rev_post_order.reverse();
357
358            // Apply each analysis to `expr` in order.
359            for id in self.result.order.iter() {
360                if let Some(mut bundle) = self.result.analyses.remove(id) {
361                    bundle.analyse(&rev_post_order[..], &self.result);
362                    self.result.analyses.insert(*id, bundle);
363                }
364            }
365
366            self.result
367        }
368    }
369
370    /// An abstraction for an analysis and associated state.
371    trait AnalysisBundle: Any {
372        /// Populates internal state for all of `exprs`.
373        ///
374        /// Result indicates whether new information was produced for `exprs.last()`.
375        fn analyse(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool;
376        /// Upcasts `self` to a `&dyn Any`.
377        ///
378        /// NOTE: This is required until <https://github.com/rust-lang/rfcs/issues/2765> is fixed
379        fn as_any(&self) -> &dyn std::any::Any;
380    }
381
382    /// A wrapper for analysis state.
383    struct Bundle<A: Analysis> {
384        /// The algorithm instance used to derive the results.
385        analysis: A,
386        /// A vector of results.
387        results: Vec<A::Value>,
388        /// Counts down with each `LetRec` re-iteration, to avoid unbounded effort.
389        /// Should it reach zero, the analysis should discard its results and restart as if pessimistic.
390        fuel: usize,
391        /// Allow optimistic analysis for `A` (otherwise we always do pesimistic
392        /// analysis, even if a [`crate::analysis::Lattice`] is available for `A`).
393        allow_optimistic: bool,
394    }
395
396    impl<A: Analysis> AnalysisBundle for Bundle<A> {
397        fn analyse(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool {
398            self.results.clear();
399            // Attempt optimistic analysis, and if that fails go pessimistic.
400            let update = A::lattice()
401                .filter(|_| self.allow_optimistic)
402                .and_then(|lattice| {
403                    for _ in exprs.iter() {
404                        self.results.push(lattice.top());
405                    }
406                    self.analyse_optimistic(exprs, 0, exprs.len(), depends, &*lattice)
407                        .ok()
408                })
409                .unwrap_or_else(|| {
410                    self.results.clear();
411                    self.analyse_pessimistic(exprs, depends)
412                });
413            assert_eq!(self.results.len(), exprs.len());
414            update
415        }
416        fn as_any(&self) -> &dyn std::any::Any {
417            self
418        }
419    }
420
421    impl<A: Analysis> Bundle<A> {
422        /// Analysis that starts optimistically but is only correct at a fixed point.
423        ///
424        /// Will fail out to `analyse_pessimistic` if the lattice is missing, or `self.fuel` is exhausted.
425        /// When successful, the result indicates whether new information was produced for `exprs.last()`.
426        fn analyse_optimistic(
427            &mut self,
428            exprs: &[&MirRelationExpr],
429            lower: usize,
430            upper: usize,
431            depends: &Derived,
432            lattice: &dyn crate::analysis::Lattice<A::Value>,
433        ) -> Result<bool, ()> {
434            // Repeatedly re-evaluate the whole tree bottom up, until no changes of fuel spent.
435            let mut changed = true;
436            while changed {
437                changed = false;
438
439                // Bail out if we have done too many `LetRec` passes in this analysis.
440                self.fuel -= 1;
441                if self.fuel == 0 {
442                    return Err(());
443                }
444
445                // Track if repetitions may be required, to avoid iteration if they are not.
446                let mut is_recursive = false;
447                // Update each derived value and track if any have changed.
448                for index in lower..upper {
449                    let value = self.derive(exprs[index], index, depends);
450                    changed = lattice.meet_assign(&mut self.results[index], value) || changed;
451                    if let MirRelationExpr::LetRec { .. } = &exprs[index] {
452                        is_recursive = true;
453                    }
454                }
455
456                // Un-set the potential loop if there was no recursion.
457                if !is_recursive {
458                    changed = false;
459                }
460            }
461            Ok(true)
462        }
463
464        /// Analysis that starts conservatively and can be stopped at any point.
465        ///
466        /// Result indicates whether new information was produced for `exprs.last()`.
467        fn analyse_pessimistic(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool {
468            // TODO: consider making iterative, from some `bottom()` up using `join_assign()`.
469            self.results.clear();
470            for (index, expr) in exprs.iter().enumerate() {
471                self.results.push(self.derive(expr, index, depends));
472            }
473            true
474        }
475
476        #[inline]
477        fn derive(&self, expr: &MirRelationExpr, index: usize, depends: &Derived) -> A::Value {
478            self.analysis
479                .derive(expr, index, &self.results[..], depends)
480        }
481    }
482}
483
484/// Expression subtree sizes
485///
486/// This analysis counts the number of expressions in each subtree, and is most useful
487/// for navigating the results of other analyses that are offset by subtree sizes.
488pub mod subtree {
489
490    use super::{Analysis, Derived};
491    use mz_expr::MirRelationExpr;
492
493    /// Analysis that determines the size in child expressions of relation expressions.
494    #[derive(Debug)]
495    pub struct SubtreeSize;
496
497    impl Analysis for SubtreeSize {
498        type Value = usize;
499
500        fn derive(
501            &self,
502            expr: &MirRelationExpr,
503            index: usize,
504            results: &[Self::Value],
505            _depends: &Derived,
506        ) -> Self::Value {
507            match expr {
508                MirRelationExpr::Constant { .. } | MirRelationExpr::Get { .. } => 1,
509                _ => {
510                    let mut offset = 1;
511                    for _ in expr.children() {
512                        offset += results[index - offset];
513                    }
514                    offset
515                }
516            }
517        }
518    }
519}
520
521/// Expression arities
522mod arity {
523
524    use super::{Analysis, Derived};
525    use mz_expr::MirRelationExpr;
526
527    /// Analysis that determines the number of columns of relation expressions.
528    #[derive(Debug)]
529    pub struct Arity;
530
531    impl Analysis for Arity {
532        type Value = usize;
533
534        fn derive(
535            &self,
536            expr: &MirRelationExpr,
537            index: usize,
538            results: &[Self::Value],
539            depends: &Derived,
540        ) -> Self::Value {
541            let mut offsets = depends
542                .children_of_rev(index, expr.children().count())
543                .map(|child| results[child])
544                .collect::<Vec<_>>();
545            offsets.reverse();
546            expr.arity_with_input_arities(offsets.into_iter())
547        }
548    }
549}
550
551/// Expression types
552mod repr_types {
553
554    use super::{Analysis, Derived, Lattice};
555    use itertools::Itertools;
556    use mz_expr::MirRelationExpr;
557    use mz_repr::ReprColumnType;
558
559    /// Analysis that determines the type of relation expressions.
560    ///
561    /// The value is `Some` when it discovers column types, and `None` in the case that
562    /// it has discovered no constraining information on the column types. The `None`
563    /// variant should only occur in the course of iteration, and should not be revealed
564    /// as an output of the analysis. One can `unwrap()` the result, and if it errors then
565    /// either the expression is malformed or the analysis has a bug.
566    ///
567    /// The analysis will panic if an expression is not well typed (i.e. if `try_col_with_input_cols`
568    /// returns an error).
569    #[derive(Debug)]
570    pub struct ReprRelationType;
571
572    impl Analysis for ReprRelationType {
573        type Value = Option<Vec<ReprColumnType>>;
574
575        fn derive(
576            &self,
577            expr: &MirRelationExpr,
578            index: usize,
579            results: &[Self::Value],
580            depends: &Derived,
581        ) -> Self::Value {
582            let offsets = depends
583                .children_of_rev(index, expr.children().count())
584                .map(|child| &results[child])
585                .collect::<Vec<_>>();
586
587            // For most expressions we'll apply `try_col_with_input_cols`, but for `Get` expressions
588            // we'll want to combine what we know (iteratively) with the stated `Get::typ`.
589            match expr {
590                MirRelationExpr::Get {
591                    id: mz_expr::Id::Local(i),
592                    typ,
593                    ..
594                } => {
595                    let mut result = typ.column_types.iter().cloned().collect_vec();
596                    if let Some(o) = depends.bindings().get(i) {
597                        if let Some(t) = results.get(*o) {
598                            if let Some(rec_typ) = t {
599                                // Reconcile nullability statements.
600                                // Unclear if we should trust `typ`.
601                                assert_eq!(result.len(), rec_typ.len());
602                                result.clone_from(rec_typ);
603                                for (res, col) in result.iter_mut().zip_eq(typ.column_types.iter())
604                                {
605                                    if !col.nullable {
606                                        res.nullable = false;
607                                    }
608                                }
609                            } else {
610                                // Our `None` information indicates that we are optimistically
611                                // assuming the best, including that all columns are non-null.
612                                // This should only happen in the first visit to a `Get` expr.
613                                // Use `typ`, but flatten nullability.
614                                for col in result.iter_mut() {
615                                    col.nullable = false;
616                                }
617                            }
618                        }
619                    }
620                    Some(result)
621                }
622                _ => {
623                    // Every expression with inputs should have non-`None` inputs at this point.
624                    let input_cols = offsets.into_iter().rev().map(|o| {
625                        o.as_ref()
626                            .expect("ReprRelationType analysis discovered type-less expression")
627                    });
628
629                    let repr_typ = expr.try_col_with_input_cols(input_cols).unwrap();
630                    Some(repr_typ)
631                }
632            }
633        }
634
635        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
636            Some(Box::new(RTLattice))
637        }
638    }
639
640    struct RTLattice;
641
642    impl Lattice<Option<Vec<ReprColumnType>>> for RTLattice {
643        fn top(&self) -> Option<Vec<ReprColumnType>> {
644            None
645        }
646        fn meet_assign(
647            &self,
648            a: &mut Option<Vec<ReprColumnType>>,
649            b: Option<Vec<ReprColumnType>>,
650        ) -> bool {
651            match (a, b) {
652                (_, None) => false,
653                (Some(a), Some(b)) => {
654                    let mut changed = false;
655                    assert_eq!(a.len(), b.len());
656                    for (at, bt) in a.iter_mut().zip_eq(b.iter()) {
657                        assert_eq!(at.scalar_type, bt.scalar_type);
658                        if !at.nullable && bt.nullable {
659                            at.nullable = true;
660                            changed = true;
661                        }
662                    }
663                    changed
664                }
665                (a, b) => {
666                    *a = b;
667                    true
668                }
669            }
670        }
671    }
672}
673
674/// Expression unique keys
675mod unique_keys {
676
677    use super::arity::Arity;
678    use super::{Analysis, Derived, DerivedBuilder, Lattice};
679    use mz_expr::MirRelationExpr;
680
681    /// Analysis that determines the unique keys of relation expressions.
682    ///
683    /// The analysis value is a `Vec<Vec<usize>>`, which should be interpreted as a list
684    /// of sets of column identifiers, each set of which has the property that there is at
685    /// most one instance of each assignment of values to those columns.
686    ///
687    /// The sets are minimal, in that any superset of another set is removed from the list.
688    /// Any superset of unique key columns are also unique key columns.
689    #[derive(Debug)]
690    pub struct UniqueKeys;
691
692    impl Analysis for UniqueKeys {
693        type Value = Vec<Vec<usize>>;
694
695        fn announce_dependencies(builder: &mut DerivedBuilder) {
696            builder.require(Arity);
697        }
698
699        fn derive(
700            &self,
701            expr: &MirRelationExpr,
702            index: usize,
703            results: &[Self::Value],
704            depends: &Derived,
705        ) -> Self::Value {
706            let mut offsets = depends
707                .children_of_rev(index, expr.children().count())
708                .collect::<Vec<_>>();
709            offsets.reverse();
710
711            match expr {
712                MirRelationExpr::Get {
713                    id: mz_expr::Id::Local(i),
714                    typ,
715                    ..
716                } => {
717                    // We have information from `typ` and from the analysis.
718                    // We should "join" them, unioning and reducing the keys.
719                    let mut keys = typ.keys.clone();
720                    if let Some(o) = depends.bindings().get(i) {
721                        if let Some(ks) = results.get(*o) {
722                            for k in ks.iter() {
723                                antichain_insert(&mut keys, k.clone());
724                            }
725                            keys.extend(ks.iter().cloned());
726                            keys.sort();
727                            keys.dedup();
728                        }
729                    }
730                    keys
731                }
732                _ => {
733                    let arity = depends.results::<Arity>();
734                    expr.keys_with_input_keys(
735                        offsets.iter().map(|o| arity[*o]),
736                        offsets.iter().map(|o| &results[*o]),
737                    )
738                }
739            }
740        }
741
742        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
743            Some(Box::new(UKLattice))
744        }
745    }
746
747    fn antichain_insert(into: &mut Vec<Vec<usize>>, item: Vec<usize>) {
748        // Insert only if there is not a dominating element of `into`.
749        if into.iter().all(|key| !key.iter().all(|k| item.contains(k))) {
750            into.retain(|key| !key.iter().all(|k| item.contains(k)));
751            into.push(item);
752        }
753    }
754
755    /// Lattice for sets of columns that define a unique key.
756    ///
757    /// An element `Vec<Vec<usize>>` describes all sets of columns `Vec<usize>` that are a
758    /// superset of some set of columns in the lattice element.
759    struct UKLattice;
760
761    impl Lattice<Vec<Vec<usize>>> for UKLattice {
762        fn top(&self) -> Vec<Vec<usize>> {
763            vec![vec![]]
764        }
765        fn meet_assign(&self, a: &mut Vec<Vec<usize>>, b: Vec<Vec<usize>>) -> bool {
766            a.sort();
767            a.dedup();
768            let mut c = Vec::new();
769            for cols_a in a.iter_mut() {
770                cols_a.sort();
771                cols_a.dedup();
772                for cols_b in b.iter() {
773                    let mut cols_c = cols_a.iter().chain(cols_b).cloned().collect::<Vec<_>>();
774                    cols_c.sort();
775                    cols_c.dedup();
776                    antichain_insert(&mut c, cols_c);
777                }
778            }
779            c.sort();
780            c.dedup();
781            std::mem::swap(a, &mut c);
782            a != &mut c
783        }
784    }
785}
786
787/// Determines if accumulated frequences can be negative.
788///
789/// This analysis assumes that globally identified collection have the property, and it is
790/// incorrect to apply it to expressions that reference external collections that may have
791/// negative accumulations.
792mod non_negative {
793
794    use super::{Analysis, Derived, Lattice};
795    use crate::analysis::common_lattice::BoolLattice;
796    use mz_expr::{Id, MirRelationExpr};
797
798    /// Analysis that determines if all accumulations at all times are non-negative.
799    ///
800    /// The analysis assumes that `Id::Global` references only refer to non-negative collections.
801    #[derive(Debug)]
802    pub struct NonNegative;
803
804    impl Analysis for NonNegative {
805        type Value = bool;
806
807        fn derive(
808            &self,
809            expr: &MirRelationExpr,
810            index: usize,
811            results: &[Self::Value],
812            depends: &Derived,
813        ) -> Self::Value {
814            match expr {
815                MirRelationExpr::Constant { rows, .. } => rows
816                    .as_ref()
817                    .map(|r| r.iter().all(|(_, diff)| *diff >= mz_repr::Diff::ZERO))
818                    .unwrap_or(true),
819                MirRelationExpr::Get { id, .. } => match id {
820                    Id::Local(id) => {
821                        let index = *depends
822                            .bindings()
823                            .get(id)
824                            .expect("Dependency info not found");
825                        *results.get(index).unwrap_or(&false)
826                    }
827                    Id::Global(_) => true,
828                },
829                // Negate must be false unless input is "non-positive".
830                MirRelationExpr::Negate { .. } => false,
831                // Threshold ensures non-negativity.
832                MirRelationExpr::Threshold { .. } => true,
833                // Reduce errors on negative input.
834                MirRelationExpr::Reduce { .. } => true,
835                MirRelationExpr::Join { .. } => {
836                    // If all inputs are non-negative, the join is non-negative.
837                    depends
838                        .children_of_rev(index, expr.children().count())
839                        .all(|off| results[off])
840                }
841                MirRelationExpr::Union { base, inputs } => {
842                    // If all inputs are non-negative, the union is non-negative.
843                    let all_non_negative = depends
844                        .children_of_rev(index, expr.children().count())
845                        .all(|off| results[off]);
846
847                    if all_non_negative {
848                        return true;
849                    }
850
851                    // We look for the pattern `Union { base, Negate(Subset(base)) }`.
852                    // TODO: take some care to ensure that union fusion does not introduce a regression.
853                    if inputs.len() == 1 {
854                        if let MirRelationExpr::Negate { input } = &inputs[0] {
855                            // If `base` is non-negative, and `is_superset_of(base, input)`, return true.
856                            // TODO: this is not correct until we have `is_superset_of` validate non-negativity
857                            // as it goes, but it matches the current implementation.
858                            let mut children = depends.children_of_rev(index, 2);
859                            let _negate = children.next().unwrap();
860                            let base_id = children.next().unwrap();
861                            let extra = children.next();
862                            debug_assert_eq!(extra, None);
863                            if results[base_id] && is_superset_of(&*base, &*input) {
864                                return true;
865                            }
866                        }
867                    }
868
869                    false
870                }
871                _ => results[index - 1],
872            }
873        }
874
875        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
876            Some(Box::new(BoolLattice))
877        }
878    }
879
880    /// Returns true only if `rhs.negate().union(lhs)` contains only non-negative multiplicities
881    /// once consolidated.
882    ///
883    /// Informally, this happens when `rhs` is a multiset subset of `lhs`, meaning the multiplicity
884    /// of any record in `rhs` is at most the multiplicity of the same record in `lhs`.
885    ///
886    /// This method recursively descends each of `lhs` and `rhs` and performs a great many equality
887    /// tests, which has the potential to be quadratic. We should consider restricting its attention
888    /// to `Get` identifiers, under the premise that equal AST nodes would necessarily be identified
889    /// by common subexpression elimination. This requires care around recursively bound identifiers.
890    ///
891    /// These rules are .. somewhat arbitrary, and likely reflect observed opportunities. For example,
892    /// while we do relate `distinct(filter(A)) <= distinct(A)`, we do not relate `distinct(A) <= A`.
893    /// Further thoughts about the class of optimizations, and whether there should be more or fewer,
894    /// can be found here: <https://github.com/MaterializeInc/database-issues/issues/4044>.
895    fn is_superset_of(mut lhs: &MirRelationExpr, mut rhs: &MirRelationExpr) -> bool {
896        // This implementation is iterative.
897        // Before converting this implementation to recursive (e.g. to improve its accuracy)
898        // make sure to use the `CheckedRecursion` struct to avoid blowing the stack.
899        while lhs != rhs {
900            match rhs {
901                MirRelationExpr::Filter { input, .. } => rhs = &**input,
902                MirRelationExpr::TopK { input, .. } => rhs = &**input,
903                // Descend in both sides if the current roots are
904                // projections with the same `outputs` vector.
905                MirRelationExpr::Project {
906                    input: rhs_input,
907                    outputs: rhs_outputs,
908                } => match lhs {
909                    MirRelationExpr::Project {
910                        input: lhs_input,
911                        outputs: lhs_outputs,
912                    } if lhs_outputs == rhs_outputs => {
913                        rhs = &**rhs_input;
914                        lhs = &**lhs_input;
915                    }
916                    _ => return false,
917                },
918                // Descend in both sides if the current roots are reduces with empty aggregates
919                // on the same set of keys (that is, a distinct operation on those keys).
920                MirRelationExpr::Reduce {
921                    input: rhs_input,
922                    group_key: rhs_group_key,
923                    aggregates: rhs_aggregates,
924                    monotonic: _,
925                    expected_group_size: _,
926                } if rhs_aggregates.is_empty() => match lhs {
927                    MirRelationExpr::Reduce {
928                        input: lhs_input,
929                        group_key: lhs_group_key,
930                        aggregates: lhs_aggregates,
931                        monotonic: _,
932                        expected_group_size: _,
933                    } if lhs_aggregates.is_empty() && lhs_group_key == rhs_group_key => {
934                        rhs = &**rhs_input;
935                        lhs = &**lhs_input;
936                    }
937                    _ => return false,
938                },
939                _ => {
940                    // TODO: Imagine more complex reasoning here!
941                    return false;
942                }
943            }
944        }
945        true
946    }
947}
948
949mod column_names {
950    use std::ops::Range;
951    use std::sync::Arc;
952
953    use super::Analysis;
954    use mz_expr::{AggregateFunc, Id, MirRelationExpr, MirScalarExpr, TableFunc};
955    use mz_repr::explain::ExprHumanizer;
956    use mz_repr::{GlobalId, UNKNOWN_COLUMN_NAME};
957    use mz_sql::ORDINALITY_COL_NAME;
958
959    /// An abstract type denoting an inferred column name.
960    #[derive(Debug, Clone)]
961    pub enum ColumnName {
962        /// A column with name inferred to be equal to a GlobalId schema column.
963        Global(GlobalId, usize),
964        /// An anonymous expression named after the top-level function name.
965        Aggregate(AggregateFunc, Box<ColumnName>),
966        /// A column with a name that has been saved from the original SQL query.
967        Annotated(Arc<str>),
968        /// An column with an unknown name.
969        Unknown,
970    }
971
972    impl ColumnName {
973        /// Return `true` iff the variant has an inferred name.
974        pub fn is_known(&self) -> bool {
975            match self {
976                Self::Global(..) | Self::Aggregate(..) => true,
977                // We treat annotated columns as unknown because we would rather
978                // override them with inferred names, if we can.
979                Self::Annotated(..) | Self::Unknown => false,
980            }
981        }
982
983        /// Humanize the column to a [`String`], returns an empty [`String`] for
984        /// unknown columns (or columns named `UNKNOWN_COLUMN_NAME`).
985        pub fn humanize(&self, humanizer: &dyn ExprHumanizer) -> String {
986            match self {
987                Self::Global(id, c) => humanizer.humanize_column(*id, *c).unwrap_or_default(),
988                Self::Aggregate(func, expr) => {
989                    let func = func.name();
990                    let expr = expr.humanize(humanizer);
991                    if expr.is_empty() {
992                        String::from(func)
993                    } else {
994                        format!("{func}_{expr}")
995                    }
996                }
997                Self::Annotated(name) => name.to_string(),
998                Self::Unknown => String::new(),
999            }
1000        }
1001
1002        /// Clone this column name if it is known, otherwise try to use the provided
1003        /// name if it is available.
1004        pub fn cloned_or_annotated(&self, name: &Option<Arc<str>>) -> Self {
1005            match self {
1006                Self::Global(..) | Self::Aggregate(..) | Self::Annotated(..) => self.clone(),
1007                Self::Unknown => name
1008                    .as_ref()
1009                    .filter(|name| name.as_ref() != UNKNOWN_COLUMN_NAME)
1010                    .map_or_else(|| Self::Unknown, |name| Self::Annotated(Arc::clone(name))),
1011            }
1012        }
1013    }
1014
1015    /// Compute the column types of each subtree of a [MirRelationExpr] from the
1016    /// bottom-up.
1017    #[derive(Debug)]
1018    pub struct ColumnNames;
1019
1020    impl ColumnNames {
1021        /// fallback schema consisting of ordinal column names: #0, #1, ...
1022        fn anonymous(range: Range<usize>) -> impl Iterator<Item = ColumnName> {
1023            range.map(|_| ColumnName::Unknown)
1024        }
1025
1026        /// fallback schema consisting of ordinal column names: #0, #1, ...
1027        fn extend_with_scalars(column_names: &mut Vec<ColumnName>, scalars: &Vec<MirScalarExpr>) {
1028            for scalar in scalars {
1029                column_names.push(match scalar {
1030                    MirScalarExpr::Column(c, name) => column_names[*c].cloned_or_annotated(&name.0),
1031                    _ => ColumnName::Unknown,
1032                });
1033            }
1034        }
1035    }
1036
1037    impl Analysis for ColumnNames {
1038        type Value = Vec<ColumnName>;
1039
1040        fn derive(
1041            &self,
1042            expr: &MirRelationExpr,
1043            index: usize,
1044            results: &[Self::Value],
1045            depends: &crate::analysis::Derived,
1046        ) -> Self::Value {
1047            use MirRelationExpr::*;
1048
1049            match expr {
1050                Constant { rows: _, typ } => {
1051                    // Fallback to an anonymous schema for constants.
1052                    ColumnNames::anonymous(0..typ.arity()).collect()
1053                }
1054                Get {
1055                    id: Id::Global(id),
1056                    typ,
1057                    access_strategy: _,
1058                } => {
1059                    // Emit ColumnName::Global instances for each column in the
1060                    // `Get` type. Those can be resolved to real names later when an
1061                    // ExpressionHumanizer is available.
1062                    (0..typ.columns().len())
1063                        .map(|c| ColumnName::Global(*id, c))
1064                        .collect()
1065                }
1066                Get {
1067                    id: Id::Local(id),
1068                    typ,
1069                    access_strategy: _,
1070                } => {
1071                    let index_child = *depends.bindings().get(id).expect("id in scope");
1072                    if index_child < results.len() {
1073                        results[index_child].clone()
1074                    } else {
1075                        // Possible because we infer LetRec bindings in order. This
1076                        // can be improved by introducing a fixpoint loop in the
1077                        // Env<A>::schedule_tasks LetRec handling block.
1078                        ColumnNames::anonymous(0..typ.arity()).collect()
1079                    }
1080                }
1081                Let {
1082                    id: _,
1083                    value: _,
1084                    body: _,
1085                } => {
1086                    // Return the column names of the `body`.
1087                    results[index - 1].clone()
1088                }
1089                LetRec {
1090                    ids: _,
1091                    values: _,
1092                    limits: _,
1093                    body: _,
1094                } => {
1095                    // Return the column names of the `body`.
1096                    results[index - 1].clone()
1097                }
1098                Project { input: _, outputs } => {
1099                    // Permute the column names of the input.
1100                    let input_column_names = &results[index - 1];
1101                    let mut column_names = vec![];
1102                    for col in outputs {
1103                        column_names.push(input_column_names[*col].clone());
1104                    }
1105                    column_names
1106                }
1107                Map { input: _, scalars } => {
1108                    // Extend the column names of the input with anonymous columns.
1109                    let mut column_names = results[index - 1].clone();
1110                    Self::extend_with_scalars(&mut column_names, scalars);
1111                    column_names
1112                }
1113                FlatMap {
1114                    input: _,
1115                    func,
1116                    exprs: _,
1117                } => {
1118                    // Extend the column names of the input with anonymous columns.
1119                    let mut column_names = results[index - 1].clone();
1120                    let func_output_start = column_names.len();
1121                    let func_output_end = column_names.len() + func.output_arity();
1122                    column_names.extend(Self::anonymous(func_output_start..func_output_end));
1123                    if let TableFunc::WithOrdinality { .. } = func {
1124                        // We know the name of the last column
1125                        // TODO(ggevay): generalize this to meaningful col names for all table functions
1126                        **column_names.last_mut().as_mut().expect(
1127                            "there is at least one output column, from the WITH ORDINALITY",
1128                        ) = ColumnName::Annotated(ORDINALITY_COL_NAME.into());
1129                    }
1130                    column_names
1131                }
1132                Filter {
1133                    input: _,
1134                    predicates: _,
1135                } => {
1136                    // Return the column names of the `input`.
1137                    results[index - 1].clone()
1138                }
1139                Join {
1140                    inputs: _,
1141                    equivalences: _,
1142                    implementation: _,
1143                } => {
1144                    let mut input_results = depends
1145                        .children_of_rev(index, expr.children().count())
1146                        .map(|child| &results[child])
1147                        .collect::<Vec<_>>();
1148                    input_results.reverse();
1149
1150                    let mut column_names = vec![];
1151                    for input_column_names in input_results {
1152                        column_names.extend(input_column_names.iter().cloned());
1153                    }
1154                    column_names
1155                }
1156                Reduce {
1157                    input: _,
1158                    group_key,
1159                    aggregates,
1160                    monotonic: _,
1161                    expected_group_size: _,
1162                } => {
1163                    // We clone and extend the input vector and then remove the part
1164                    // associated with the input at the end.
1165                    let mut column_names = results[index - 1].clone();
1166                    let input_arity = column_names.len();
1167
1168                    // Infer the group key part.
1169                    Self::extend_with_scalars(&mut column_names, group_key);
1170                    // Infer the aggregates part.
1171                    for aggregate in aggregates.iter() {
1172                        // The inferred name will consist of (1) the aggregate
1173                        // function name and (2) the aggregate expression (iff
1174                        // it is a simple column reference).
1175                        let func = aggregate.func.clone();
1176                        let expr = match aggregate.expr.as_column() {
1177                            Some(c) => column_names.get(c).unwrap_or(&ColumnName::Unknown).clone(),
1178                            None => ColumnName::Unknown,
1179                        };
1180                        column_names.push(ColumnName::Aggregate(func, Box::new(expr)));
1181                    }
1182                    // Remove the prefix associated with the input
1183                    column_names.drain(0..input_arity);
1184
1185                    column_names
1186                }
1187                TopK {
1188                    input: _,
1189                    group_key: _,
1190                    order_key: _,
1191                    limit: _,
1192                    offset: _,
1193                    monotonic: _,
1194                    expected_group_size: _,
1195                } => {
1196                    // Return the column names of the `input`.
1197                    results[index - 1].clone()
1198                }
1199                Negate { input: _ } => {
1200                    // Return the column names of the `input`.
1201                    results[index - 1].clone()
1202                }
1203                Threshold { input: _ } => {
1204                    // Return the column names of the `input`.
1205                    results[index - 1].clone()
1206                }
1207                Union { base: _, inputs: _ } => {
1208                    // Use the first non-empty column across all inputs.
1209                    let mut column_names = vec![];
1210
1211                    let mut inputs_results = depends
1212                        .children_of_rev(index, expr.children().count())
1213                        .map(|child| &results[child])
1214                        .collect::<Vec<_>>();
1215
1216                    let base_results = inputs_results.pop().unwrap();
1217                    inputs_results.reverse();
1218
1219                    for (i, mut column_name) in base_results.iter().cloned().enumerate() {
1220                        for input_results in inputs_results.iter() {
1221                            if !column_name.is_known() && input_results[i].is_known() {
1222                                column_name = input_results[i].clone();
1223                                break;
1224                            }
1225                        }
1226                        column_names.push(column_name);
1227                    }
1228
1229                    column_names
1230                }
1231                ArrangeBy { input: _, keys: _ } => {
1232                    // Return the column names of the `input`.
1233                    results[index - 1].clone()
1234                }
1235            }
1236        }
1237    }
1238}
1239
1240mod explain {
1241    //! Derived Analysis framework and definitions.
1242
1243    use std::collections::BTreeMap;
1244
1245    use mz_expr::MirRelationExpr;
1246    use mz_expr::explain::{ExplainContext, HumanizedExplain, HumanizerMode};
1247    use mz_ore::stack::RecursionLimitError;
1248    use mz_repr::explain::{Analyses, AnnotatedPlan};
1249
1250    use crate::analysis::equivalences::{Equivalences, HumanizedEquivalenceClasses};
1251
1252    // Analyses should have shortened paths when exported.
1253    use super::DerivedBuilder;
1254
1255    impl<'c> From<&ExplainContext<'c>> for DerivedBuilder<'c> {
1256        fn from(context: &ExplainContext<'c>) -> DerivedBuilder<'c> {
1257            let mut builder = DerivedBuilder::new(context.features);
1258            if context.config.subtree_size {
1259                builder.require(super::SubtreeSize);
1260            }
1261            if context.config.non_negative {
1262                builder.require(super::NonNegative);
1263            }
1264            if context.config.types {
1265                builder.require(super::ReprRelationType);
1266            }
1267            if context.config.arity {
1268                builder.require(super::Arity);
1269            }
1270            if context.config.keys {
1271                builder.require(super::UniqueKeys);
1272            }
1273            if context.config.cardinality {
1274                builder.require(super::Cardinality::with_stats(
1275                    context.cardinality_stats.clone(),
1276                ));
1277            }
1278            if context.config.column_names || context.config.humanized_exprs {
1279                builder.require(super::ColumnNames);
1280            }
1281            if context.config.equivalences {
1282                builder.require(Equivalences);
1283            }
1284            builder
1285        }
1286    }
1287
1288    /// Produce an [`AnnotatedPlan`] wrapping the given [`MirRelationExpr`] along
1289    /// with [`Analyses`] derived from the given context configuration.
1290    pub fn annotate_plan<'a>(
1291        plan: &'a MirRelationExpr,
1292        context: &'a ExplainContext,
1293    ) -> Result<AnnotatedPlan<'a, MirRelationExpr>, RecursionLimitError> {
1294        let mut annotations = BTreeMap::<&MirRelationExpr, Analyses>::default();
1295        let config = context.config;
1296
1297        // We want to annotate the plan with analyses in the following cases:
1298        // 1. An Analysis was explicitly requested in the ExplainConfig.
1299        // 2. Humanized expressions were requested in the ExplainConfig (in which
1300        //    case we need to derive the ColumnNames Analysis).
1301        if config.requires_analyses() || config.humanized_exprs {
1302            // get the annotation keys
1303            let subtree_refs = plan.post_order_vec();
1304            // get the annotation values
1305            let builder = DerivedBuilder::from(context);
1306            let derived = builder.visit(plan);
1307
1308            if config.subtree_size {
1309                for (expr, subtree_size) in std::iter::zip(
1310                    subtree_refs.iter(),
1311                    derived.results::<super::SubtreeSize>().into_iter(),
1312                ) {
1313                    let analyses = annotations.entry(expr).or_default();
1314                    analyses.subtree_size = Some(*subtree_size);
1315                }
1316            }
1317            if config.non_negative {
1318                for (expr, non_negative) in std::iter::zip(
1319                    subtree_refs.iter(),
1320                    derived.results::<super::NonNegative>().into_iter(),
1321                ) {
1322                    let analyses = annotations.entry(expr).or_default();
1323                    analyses.non_negative = Some(*non_negative);
1324                }
1325            }
1326
1327            if config.arity {
1328                for (expr, arity) in std::iter::zip(
1329                    subtree_refs.iter(),
1330                    derived.results::<super::Arity>().into_iter(),
1331                ) {
1332                    let analyses = annotations.entry(expr).or_default();
1333                    analyses.arity = Some(*arity);
1334                }
1335            }
1336
1337            if config.types {
1338                for (expr, types) in std::iter::zip(
1339                    subtree_refs.iter(),
1340                    derived.results::<super::ReprRelationType>().into_iter(),
1341                ) {
1342                    let analyses = annotations.entry(expr).or_default();
1343                    analyses.types = Some(types.clone());
1344                }
1345            }
1346
1347            if config.keys {
1348                for (expr, keys) in std::iter::zip(
1349                    subtree_refs.iter(),
1350                    derived.results::<super::UniqueKeys>().into_iter(),
1351                ) {
1352                    let analyses = annotations.entry(expr).or_default();
1353                    analyses.keys = Some(keys.clone());
1354                }
1355            }
1356
1357            if config.cardinality {
1358                for (expr, card) in std::iter::zip(
1359                    subtree_refs.iter(),
1360                    derived.results::<super::Cardinality>().into_iter(),
1361                ) {
1362                    let analyses = annotations.entry(expr).or_default();
1363                    analyses.cardinality = Some(card.to_string());
1364                }
1365            }
1366
1367            if config.column_names || config.humanized_exprs {
1368                for (expr, column_names) in std::iter::zip(
1369                    subtree_refs.iter(),
1370                    derived.results::<super::ColumnNames>().into_iter(),
1371                ) {
1372                    let analyses = annotations.entry(expr).or_default();
1373                    let value = column_names
1374                        .iter()
1375                        .map(|column_name| column_name.humanize(context.humanizer))
1376                        .collect();
1377                    analyses.column_names = Some(value);
1378                }
1379            }
1380
1381            if config.equivalences {
1382                for (expr, equivs) in std::iter::zip(
1383                    subtree_refs.iter(),
1384                    derived.results::<Equivalences>().into_iter(),
1385                ) {
1386                    let analyses = annotations.entry(expr).or_default();
1387                    analyses.equivalences = Some(match equivs.as_ref() {
1388                        Some(equivs) => HumanizedEquivalenceClasses {
1389                            equivalence_classes: equivs,
1390                            cols: analyses.column_names.as_ref(),
1391                            mode: HumanizedExplain::new(config.redacted),
1392                        }
1393                        .to_string(),
1394                        None => "<empty collection>".to_string(),
1395                    });
1396                }
1397            }
1398        }
1399
1400        Ok(AnnotatedPlan { plan, annotations })
1401    }
1402}
1403
1404/// Definition and helper structs for the [`Cardinality`] Analysis.
1405mod cardinality {
1406    use std::collections::{BTreeMap, BTreeSet};
1407
1408    use mz_expr::{
1409        BinaryFunc, Id, JoinImplementation, MirRelationExpr, MirScalarExpr, TableFunc, UnaryFunc,
1410        VariadicFunc,
1411    };
1412    use mz_ore::cast::{CastFrom, CastLossy, TryCastFrom};
1413    use mz_repr::GlobalId;
1414
1415    use ordered_float::OrderedFloat;
1416
1417    use super::{Analysis, Arity, SubtreeSize, UniqueKeys};
1418
1419    /// Compute the estimated cardinality of each subtree of a [MirRelationExpr] from the bottom up.
1420    #[allow(missing_debug_implementations)]
1421    pub struct Cardinality {
1422        /// Cardinalities for globally named entities
1423        pub stats: BTreeMap<GlobalId, usize>,
1424    }
1425
1426    impl Cardinality {
1427        /// A cardinality estimator with provided statistics for the given global identifiers
1428        pub fn with_stats(stats: BTreeMap<GlobalId, usize>) -> Self {
1429            Cardinality { stats }
1430        }
1431    }
1432
1433    impl Default for Cardinality {
1434        fn default() -> Self {
1435            Cardinality {
1436                stats: BTreeMap::new(),
1437            }
1438        }
1439    }
1440
1441    /// Cardinality estimates
1442    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1443    pub enum CardinalityEstimate {
1444        Unknown,
1445        Estimate(OrderedFloat<f64>),
1446    }
1447
1448    impl CardinalityEstimate {
1449        pub fn max(lhs: CardinalityEstimate, rhs: CardinalityEstimate) -> CardinalityEstimate {
1450            use CardinalityEstimate::*;
1451            match (lhs, rhs) {
1452                (Estimate(lhs), Estimate(rhs)) => Estimate(std::cmp::max(lhs, rhs)),
1453                _ => Unknown,
1454            }
1455        }
1456
1457        pub fn rounded(&self) -> Option<usize> {
1458            match self {
1459                CardinalityEstimate::Estimate(OrderedFloat(f)) => {
1460                    let rounded = f.ceil();
1461                    let flattened = usize::cast_from(
1462                        u64::try_cast_from(rounded)
1463                            .expect("positive and representable cardinality estimate"),
1464                    );
1465
1466                    Some(flattened)
1467                }
1468                CardinalityEstimate::Unknown => None,
1469            }
1470        }
1471    }
1472
1473    impl std::ops::Add for CardinalityEstimate {
1474        type Output = CardinalityEstimate;
1475
1476        fn add(self, rhs: Self) -> Self::Output {
1477            use CardinalityEstimate::*;
1478            match (self, rhs) {
1479                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs + rhs),
1480                _ => Unknown,
1481            }
1482        }
1483    }
1484
1485    impl std::ops::Sub for CardinalityEstimate {
1486        type Output = CardinalityEstimate;
1487
1488        fn sub(self, rhs: Self) -> Self::Output {
1489            use CardinalityEstimate::*;
1490            match (self, rhs) {
1491                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs - rhs),
1492                _ => Unknown,
1493            }
1494        }
1495    }
1496
1497    impl std::ops::Sub<CardinalityEstimate> for f64 {
1498        type Output = CardinalityEstimate;
1499
1500        fn sub(self, rhs: CardinalityEstimate) -> Self::Output {
1501            use CardinalityEstimate::*;
1502            if let Estimate(OrderedFloat(rhs)) = rhs {
1503                Estimate(OrderedFloat(self - rhs))
1504            } else {
1505                Unknown
1506            }
1507        }
1508    }
1509
1510    impl std::ops::Mul for CardinalityEstimate {
1511        type Output = CardinalityEstimate;
1512
1513        fn mul(self, rhs: Self) -> Self::Output {
1514            use CardinalityEstimate::*;
1515            match (self, rhs) {
1516                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs * rhs),
1517                _ => Unknown,
1518            }
1519        }
1520    }
1521
1522    impl std::ops::Mul<f64> for CardinalityEstimate {
1523        type Output = CardinalityEstimate;
1524
1525        fn mul(self, rhs: f64) -> Self::Output {
1526            if let CardinalityEstimate::Estimate(OrderedFloat(lhs)) = self {
1527                CardinalityEstimate::Estimate(OrderedFloat(lhs * rhs))
1528            } else {
1529                CardinalityEstimate::Unknown
1530            }
1531        }
1532    }
1533
1534    impl std::ops::Div for CardinalityEstimate {
1535        type Output = CardinalityEstimate;
1536
1537        fn div(self, rhs: Self) -> Self::Output {
1538            use CardinalityEstimate::*;
1539            match (self, rhs) {
1540                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs / rhs),
1541                _ => Unknown,
1542            }
1543        }
1544    }
1545
1546    impl std::ops::Div<f64> for CardinalityEstimate {
1547        type Output = CardinalityEstimate;
1548
1549        fn div(self, rhs: f64) -> Self::Output {
1550            use CardinalityEstimate::*;
1551            if let Estimate(lhs) = self {
1552                Estimate(lhs / OrderedFloat(rhs))
1553            } else {
1554                Unknown
1555            }
1556        }
1557    }
1558
1559    impl std::iter::Sum for CardinalityEstimate {
1560        fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
1561            iter.fold(CardinalityEstimate::from(0.0), |acc, elt| acc + elt)
1562        }
1563    }
1564
1565    impl std::iter::Product for CardinalityEstimate {
1566        fn product<I: Iterator<Item = Self>>(iter: I) -> Self {
1567            iter.fold(CardinalityEstimate::from(1.0), |acc, elt| acc * elt)
1568        }
1569    }
1570
1571    impl From<usize> for CardinalityEstimate {
1572        fn from(value: usize) -> Self {
1573            Self::Estimate(OrderedFloat(f64::cast_lossy(value)))
1574        }
1575    }
1576
1577    impl From<f64> for CardinalityEstimate {
1578        fn from(value: f64) -> Self {
1579            Self::Estimate(OrderedFloat(value))
1580        }
1581    }
1582
1583    /// The default selectivity for predicates we know nothing about.
1584    ///
1585    /// But see also expr/src/scalar.rs for `FilterCharacteristics::worst_case_scaling_factor()` for a more nuanced take.
1586    pub const WORST_CASE_SELECTIVITY: OrderedFloat<f64> = OrderedFloat(0.1);
1587
1588    // This section defines how we estimate cardinality for each syntactic construct.
1589    //
1590    // We split it up into functions to make it all a bit more tractable to work with.
1591    impl Cardinality {
1592        fn flat_map(&self, tf: &TableFunc, input: CardinalityEstimate) -> CardinalityEstimate {
1593            match tf {
1594                TableFunc::Wrap { types, width } => {
1595                    input * (f64::cast_lossy(types.len()) / f64::cast_lossy(*width))
1596                }
1597                _ => {
1598                    // TODO(mgree) what explosion factor should we make up?
1599                    input * CardinalityEstimate::from(4.0)
1600                }
1601            }
1602        }
1603
1604        fn predicate(
1605            &self,
1606            predicate_expr: &MirScalarExpr,
1607            unique_columns: &BTreeSet<usize>,
1608        ) -> OrderedFloat<f64> {
1609            let index_selectivity = |expr: &MirScalarExpr| -> Option<OrderedFloat<f64>> {
1610                match expr {
1611                    MirScalarExpr::Column(col, _) => {
1612                        if unique_columns.contains(col) {
1613                            // TODO(mgree): when we have index cardinality statistics, they should go here when `expr` is a `MirScalarExpr::Column` that's in `unique_columns`
1614                            None
1615                        } else {
1616                            None
1617                        }
1618                    }
1619                    _ => None,
1620                }
1621            };
1622
1623            match predicate_expr {
1624                MirScalarExpr::Column(_, _)
1625                | MirScalarExpr::Literal(_, _)
1626                | MirScalarExpr::CallUnmaterializable(_) => OrderedFloat(1.0),
1627                MirScalarExpr::CallUnary { func, expr } => match func {
1628                    UnaryFunc::Not(_) => OrderedFloat(1.0) - self.predicate(expr, unique_columns),
1629                    UnaryFunc::IsTrue(_) | UnaryFunc::IsFalse(_) => OrderedFloat(0.5),
1630                    UnaryFunc::IsNull(_) => {
1631                        if let Some(icard) = index_selectivity(expr) {
1632                            icard
1633                        } else {
1634                            WORST_CASE_SELECTIVITY
1635                        }
1636                    }
1637                    _ => WORST_CASE_SELECTIVITY,
1638                },
1639                MirScalarExpr::CallBinary { func, expr1, expr2 } => {
1640                    match func {
1641                        BinaryFunc::Eq(_) => {
1642                            match (index_selectivity(expr1), index_selectivity(expr2)) {
1643                                (Some(isel1), Some(isel2)) => std::cmp::max(isel1, isel2),
1644                                (Some(isel), None) | (None, Some(isel)) => isel,
1645                                (None, None) => WORST_CASE_SELECTIVITY,
1646                            }
1647                        }
1648                        // 1.0 - the Eq case
1649                        BinaryFunc::NotEq(_) => {
1650                            match (index_selectivity(expr1), index_selectivity(expr2)) {
1651                                (Some(isel1), Some(isel2)) => {
1652                                    OrderedFloat(1.0) - std::cmp::max(isel1, isel2)
1653                                }
1654                                (Some(isel), None) | (None, Some(isel)) => OrderedFloat(1.0) - isel,
1655                                (None, None) => OrderedFloat(1.0) - WORST_CASE_SELECTIVITY,
1656                            }
1657                        }
1658                        BinaryFunc::Lt(_)
1659                        | BinaryFunc::Lte(_)
1660                        | BinaryFunc::Gt(_)
1661                        | BinaryFunc::Gte(_) => {
1662                            // TODO(mgree) if we have high/low key values and one of the columns is an index, we can do better
1663                            OrderedFloat(0.33)
1664                        }
1665                        _ => OrderedFloat(1.0), // TOOD(mgree): are there other interesting cases?
1666                    }
1667                }
1668                MirScalarExpr::CallVariadic { func, exprs } => match func {
1669                    VariadicFunc::And(_) => exprs
1670                        .iter()
1671                        .map(|expr| self.predicate(expr, unique_columns))
1672                        .product(),
1673                    VariadicFunc::Or(_) => {
1674                        // TODO(mgree): BETWEEN will get compiled down to an AND of appropriate bounds---we could try to detect it and be clever
1675
1676                        // F(expr1 OR expr2) = F(expr1) + F(expr2) - F(expr1) * F(expr2), but generalized
1677                        let mut exprs = exprs.into_iter();
1678
1679                        let mut expr1;
1680
1681                        if let Some(first) = exprs.next() {
1682                            expr1 = self.predicate(first, unique_columns);
1683                        } else {
1684                            return OrderedFloat(1.0);
1685                        }
1686
1687                        for expr2 in exprs {
1688                            let expr2 = self.predicate(expr2, unique_columns);
1689                            expr1 = expr1 + expr2 - expr1 * expr2;
1690                        }
1691                        expr1
1692                    }
1693                    _ => OrderedFloat(1.0),
1694                },
1695                MirScalarExpr::If { cond: _, then, els } => std::cmp::max(
1696                    self.predicate(then, unique_columns),
1697                    self.predicate(els, unique_columns),
1698                ),
1699            }
1700        }
1701
1702        fn filter(
1703            &self,
1704            predicates: &Vec<MirScalarExpr>,
1705            keys: &Vec<Vec<usize>>,
1706            input: CardinalityEstimate,
1707        ) -> CardinalityEstimate {
1708            // TODO(mgree): should we try to do something for indices built on multiple columns?
1709            let mut unique_columns = BTreeSet::new();
1710            for key in keys {
1711                if key.len() == 1 {
1712                    unique_columns.insert(key[0]);
1713                }
1714            }
1715
1716            let mut estimate = input;
1717            for expr in predicates {
1718                let selectivity = self.predicate(expr, &unique_columns);
1719                debug_assert!(
1720                    OrderedFloat(0.0) <= selectivity && selectivity <= OrderedFloat(1.0),
1721                    "predicate selectivity {selectivity} should be in the range [0,1]"
1722                );
1723                estimate = estimate * selectivity.0;
1724            }
1725
1726            estimate
1727        }
1728
1729        fn join(
1730            &self,
1731            equivalences: &Vec<Vec<MirScalarExpr>>,
1732            _implementation: &JoinImplementation,
1733            unique_columns: BTreeMap<usize, usize>,
1734            mut inputs: Vec<CardinalityEstimate>,
1735        ) -> CardinalityEstimate {
1736            if inputs.is_empty() {
1737                return CardinalityEstimate::from(0.0);
1738            }
1739
1740            for equiv in equivalences {
1741                // those sources which have a unique key
1742                let mut unique_sources = BTreeSet::new();
1743                let mut all_unique = true;
1744
1745                for expr in equiv {
1746                    if let MirScalarExpr::Column(col, _) = expr {
1747                        if let Some(idx) = unique_columns.get(col) {
1748                            unique_sources.insert(*idx);
1749                        } else {
1750                            all_unique = false;
1751                        }
1752                    } else {
1753                        all_unique = false;
1754                    }
1755                }
1756
1757                // no unique columns in this equivalence
1758                if unique_sources.is_empty() {
1759                    continue;
1760                }
1761
1762                // ALL unique columns in this equivalence
1763                if all_unique {
1764                    // these inputs have unique keys for _all_ of the equivalence, so they're a bound on how many rows we'll get from those sources
1765                    // we'll find the leftmost such input and use it to hold the minimum; the other sources we set to 1.0 (so they have no effect)
1766                    let mut sources = unique_sources.iter();
1767
1768                    let lhs_idx = *sources.next().unwrap();
1769                    let mut lhs =
1770                        std::mem::replace(&mut inputs[lhs_idx], CardinalityEstimate::from(1.0));
1771                    for &rhs_idx in sources {
1772                        let rhs =
1773                            std::mem::replace(&mut inputs[rhs_idx], CardinalityEstimate::from(1.0));
1774                        lhs = CardinalityEstimate::min(lhs, rhs);
1775                    }
1776
1777                    inputs[lhs_idx] = lhs;
1778
1779                    // best option! go look at the next equivalence
1780                    continue;
1781                }
1782
1783                // some unique columns in this equivalence
1784                for idx in unique_sources {
1785                    // when joining R and S on R.x = S.x, if R.x is unique and S.x is not, we're bounded above by the cardinality of S
1786                    inputs[idx] = CardinalityEstimate::from(1.0);
1787                }
1788            }
1789
1790            let mut product = CardinalityEstimate::from(1.0);
1791            for input in inputs {
1792                product = product * input;
1793            }
1794            product
1795        }
1796
1797        fn reduce(
1798            &self,
1799            group_key: &Vec<MirScalarExpr>,
1800            expected_group_size: &Option<u64>,
1801            input: CardinalityEstimate,
1802        ) -> CardinalityEstimate {
1803            // TODO(mgree): if no `group_key` is present, we can do way better
1804
1805            if let Some(group_size) = expected_group_size {
1806                input / f64::cast_lossy(*group_size)
1807            } else if group_key.is_empty() {
1808                CardinalityEstimate::from(1.0)
1809            } else {
1810                // in the worst case, every row is its own group
1811                input
1812            }
1813        }
1814
1815        fn topk(
1816            &self,
1817            group_key: &Vec<usize>,
1818            limit: &Option<MirScalarExpr>,
1819            expected_group_size: &Option<u64>,
1820            input: CardinalityEstimate,
1821        ) -> CardinalityEstimate {
1822            // TODO: support simple arithmetic expressions
1823            let k = limit
1824                .as_ref()
1825                .and_then(|l| l.as_literal_int64())
1826                .map_or(1, |l| std::cmp::max(0, l));
1827
1828            if let Some(group_size) = expected_group_size {
1829                input * (f64::cast_lossy(k) / f64::cast_lossy(*group_size))
1830            } else if group_key.is_empty() {
1831                CardinalityEstimate::from(f64::cast_lossy(k))
1832            } else {
1833                // in the worst case, every row is its own group
1834                input.clone()
1835            }
1836        }
1837
1838        fn threshold(&self, input: CardinalityEstimate) -> CardinalityEstimate {
1839            // worst case scaling factor is 1
1840            input.clone()
1841        }
1842    }
1843
1844    impl Analysis for Cardinality {
1845        type Value = CardinalityEstimate;
1846
1847        fn announce_dependencies(builder: &mut crate::analysis::DerivedBuilder) {
1848            builder.require(crate::analysis::Arity);
1849            builder.require(crate::analysis::UniqueKeys);
1850        }
1851
1852        fn derive(
1853            &self,
1854            expr: &MirRelationExpr,
1855            index: usize,
1856            results: &[Self::Value],
1857            depends: &crate::analysis::Derived,
1858        ) -> Self::Value {
1859            use MirRelationExpr::*;
1860
1861            let sizes = depends.as_view().results::<SubtreeSize>();
1862            let arity = depends.as_view().results::<Arity>();
1863            let keys = depends.as_view().results::<UniqueKeys>();
1864
1865            match expr {
1866                Constant { rows, .. } => {
1867                    CardinalityEstimate::from(rows.as_ref().map_or_else(|_| 0, |v| v.len()))
1868                }
1869                Get { id, .. } => match id {
1870                    Id::Local(id) => depends
1871                        .bindings()
1872                        .get(id)
1873                        .and_then(|id| results.get(*id))
1874                        .copied()
1875                        .unwrap_or(CardinalityEstimate::Unknown),
1876                    Id::Global(id) => self
1877                        .stats
1878                        .get(id)
1879                        .copied()
1880                        .map(CardinalityEstimate::from)
1881                        .unwrap_or(CardinalityEstimate::Unknown),
1882                },
1883                Let { .. } | Project { .. } | Map { .. } | ArrangeBy { .. } | Negate { .. } => {
1884                    results[index - 1].clone()
1885                }
1886                LetRec { .. } =>
1887                // TODO(mgree): implement a recurrence-based approach (or at least identify common idioms, e.g. transitive closure)
1888                {
1889                    CardinalityEstimate::Unknown
1890                }
1891                Union { base: _, inputs: _ } => depends
1892                    .children_of_rev(index, expr.children().count())
1893                    .map(|off| results[off].clone())
1894                    .sum(),
1895                FlatMap { func, .. } => {
1896                    let input = results[index - 1];
1897                    self.flat_map(func, input)
1898                }
1899                Filter { predicates, .. } => {
1900                    let input = results[index - 1];
1901                    let keys = depends.results::<UniqueKeys>();
1902                    let keys = &keys[index - 1];
1903                    self.filter(predicates, keys, input)
1904                }
1905                Join {
1906                    equivalences,
1907                    implementation,
1908                    inputs,
1909                    ..
1910                } => {
1911                    let mut input_results = Vec::with_capacity(inputs.len());
1912
1913                    // maps a column to the index in `inputs` that it belongs to
1914                    let mut unique_columns = BTreeMap::new();
1915                    let mut key_offset = 0;
1916
1917                    let mut offset = 1;
1918                    for idx in 0..inputs.len() {
1919                        let input = results[index - offset];
1920                        input_results.push(input);
1921
1922                        let arity = arity[index - offset];
1923                        let keys = &keys[index - offset];
1924                        for key in keys {
1925                            if key.len() == 1 {
1926                                unique_columns.insert(key_offset + key[0], idx);
1927                            }
1928                        }
1929                        key_offset += arity;
1930
1931                        offset += &sizes[index - offset];
1932                    }
1933
1934                    self.join(equivalences, implementation, unique_columns, input_results)
1935                }
1936                Reduce {
1937                    group_key,
1938                    expected_group_size,
1939                    ..
1940                } => {
1941                    let input = results[index - 1];
1942                    self.reduce(group_key, expected_group_size, input)
1943                }
1944                TopK {
1945                    group_key,
1946                    limit,
1947                    expected_group_size,
1948                    ..
1949                } => {
1950                    let input = results[index - 1];
1951                    self.topk(group_key, limit, expected_group_size, input)
1952                }
1953                Threshold { .. } => {
1954                    let input = results[index - 1];
1955                    self.threshold(input)
1956                }
1957            }
1958        }
1959    }
1960
1961    impl std::fmt::Display for CardinalityEstimate {
1962        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1963            match self {
1964                CardinalityEstimate::Estimate(OrderedFloat(estimate)) => write!(f, "{estimate}"),
1965                CardinalityEstimate::Unknown => write!(f, "<UNKNOWN>"),
1966            }
1967        }
1968    }
1969}
1970
1971mod common_lattice {
1972    use crate::analysis::Lattice;
1973
1974    pub struct BoolLattice;
1975
1976    impl Lattice<bool> for BoolLattice {
1977        // `true` > `false`.
1978        fn top(&self) -> bool {
1979            true
1980        }
1981        // `false` is the greatest lower bound. `into` changes if it's true and `item` is false.
1982        fn meet_assign(&self, into: &mut bool, item: bool) -> bool {
1983            let changed = *into && !item;
1984            *into = *into && item;
1985            changed
1986        }
1987    }
1988}