Skip to main content

mz_transform/
analysis.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Traits and types for reusable expression analysis
11
12pub mod equivalences;
13pub mod monotonic;
14
15use mz_expr::MirRelationExpr;
16
17pub use arity::Arity;
18pub use cardinality::Cardinality;
19pub use column_names::{ColumnName, ColumnNames};
20pub use common::{Derived, DerivedBuilder, DerivedView};
21pub use explain::annotate_plan;
22pub use non_negative::NonNegative;
23pub use repr_types::ReprRelationType;
24pub use subtree::SubtreeSize;
25pub use types::SqlRelationType;
26pub use unique_keys::UniqueKeys;
27
28/// An analysis that can be applied bottom-up to a `MirRelationExpr`.
29pub trait Analysis: 'static {
30    /// The type of value this analysis associates with an expression.
31    type Value: std::fmt::Debug;
32    /// Announce any dependencies this analysis has on other analyses.
33    ///
34    /// The method should invoke `builder.require::<Foo>()` for each other
35    /// analysis `Foo` this analysis depends upon.
36    fn announce_dependencies(_builder: &mut DerivedBuilder) {}
37    /// The analysis value derived for an expression, given other analysis results.
38    ///
39    /// The other analysis results include the results of this analysis for all children,
40    /// in `results`, and the results of other analyses this analysis has expressed a
41    /// dependence upon, in `depends`, for children and the expression itself.
42    /// The analysis results for `Self` can only be found in `results`, and are not
43    /// available in `depends`.
44    ///
45    /// Implementors of this method must defensively check references into `results`, as
46    /// it may be invoked on `LetRec` bindings that have not yet been populated. It is up
47    /// to the analysis what to do in that case, but conservative behavior is recommended.
48    ///
49    /// The `index` indicates the post-order index for the expression, for use in finding
50    /// the corresponding information in `results` and `depends`.
51    ///
52    /// The returned result will be associated with this expression for this analysis,
53    /// and the analyses will continue.
54    fn derive(
55        &self,
56        expr: &MirRelationExpr,
57        index: usize,
58        results: &[Self::Value],
59        depends: &Derived,
60    ) -> Self::Value;
61
62    /// When available, provide a lattice interface to allow optimistic recursion.
63    ///
64    /// Providing a non-`None` output indicates that the analysis intends re-iteration.
65    fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
66        None
67    }
68}
69
70/// Lattice methods for repeated analysis
71pub trait Lattice<T> {
72    /// An element greater than all other elements.
73    fn top(&self) -> T;
74    /// Set `a` to the greatest lower bound of `a` and `b`, and indicate if `a` changed as a result.
75    fn meet_assign(&self, a: &mut T, b: T) -> bool;
76}
77
78/// Types common across multiple analyses
79pub mod common {
80
81    use std::any::{Any, TypeId};
82    use std::collections::BTreeMap;
83
84    use itertools::Itertools;
85    use mz_expr::LocalId;
86    use mz_expr::MirRelationExpr;
87    use mz_ore::assert_none;
88    use mz_repr::optimize::OptimizerFeatures;
89
90    use super::Analysis;
91    use super::subtree::SubtreeSize;
92
93    /// Container for analysis state and binding context.
94    #[derive(Default)]
95    #[allow(missing_debug_implementations)]
96    pub struct Derived {
97        /// A record of active analyses and their results, indexed by their type id.
98        analyses: BTreeMap<TypeId, Box<dyn AnalysisBundle>>,
99        /// Analyses ordered where each depends only on strictly prior analyses.
100        order: Vec<TypeId>,
101        /// Map from local identifier to result offset for analysis values.
102        bindings: BTreeMap<LocalId, usize>,
103    }
104
105    impl Derived {
106        /// Return the analysis results derived so far.
107        ///
108        /// # Panics
109        ///
110        /// This method panics if `A` was not installed as a required analysis.
111        pub fn results<A: Analysis>(&self) -> &[A::Value] {
112            let type_id = TypeId::of::<Bundle<A>>();
113            if let Some(bundle) = self.analyses.get(&type_id) {
114                if let Some(bundle) = bundle.as_any().downcast_ref::<Bundle<A>>() {
115                    return &bundle.results[..];
116                }
117            }
118            panic!("Analysis {:?} missing", std::any::type_name::<A>());
119        }
120        /// Bindings from local identifiers to result offsets for analysis values.
121        pub fn bindings(&self) -> &BTreeMap<LocalId, usize> {
122            &self.bindings
123        }
124        /// Result offsets for the state of a various number of children of the current expression.
125        ///
126        /// The integers are the zero-offset locations in the `SubtreeSize` analysis. The order of
127        /// the children is descending, from last child to first, because of how the information is
128        /// laid out, and the non-reversibility of the look-ups.
129        ///
130        /// It is an error to call this method with more children than expression has.
131        pub fn children_of_rev<'a>(
132            &'a self,
133            start: usize,
134            count: usize,
135        ) -> impl Iterator<Item = usize> + 'a {
136            let sizes = self.results::<SubtreeSize>();
137            let offset = 1;
138            (0..count).scan(offset, move |offset, _| {
139                let result = start - *offset;
140                *offset += sizes[result];
141                Some(result)
142            })
143        }
144
145        /// Recast the derived data as a view that can be subdivided into views over child state.
146        pub fn as_view<'a>(&'a self) -> DerivedView<'a> {
147            DerivedView {
148                derived: self,
149                lower: 0,
150                upper: self.results::<SubtreeSize>().len(),
151            }
152        }
153    }
154
155    /// The subset of a `Derived` corresponding to an expression and its children.
156    ///
157    /// Specifically, bounds an interval `[lower, upper)` that ends with the state
158    /// of an expression, at `upper-1`, and is preceded by the state of descendents.
159    ///
160    /// This is best thought of as a node in a tree rather
161    #[allow(missing_debug_implementations)]
162    #[derive(Copy, Clone)]
163    pub struct DerivedView<'a> {
164        derived: &'a Derived,
165        lower: usize,
166        upper: usize,
167    }
168
169    impl<'a> DerivedView<'a> {
170        /// The value associated with the expression.
171        pub fn value<A: Analysis>(&self) -> Option<&'a A::Value> {
172            self.results::<A>().last()
173        }
174
175        /// The post-order traversal index for the expression.
176        ///
177        /// This can be used to index into the full set of results, as provided by an
178        /// instance of `Derived`.
179        pub fn index(&self) -> usize {
180            self.upper - 1
181        }
182
183        /// The value bound to an identifier, if it has been derived.
184        ///
185        /// There are several reasons the value could not be derived, and this method
186        /// does not distinguish between them.
187        pub fn bound<A: Analysis>(&self, id: LocalId) -> Option<&'a A::Value> {
188            self.derived
189                .bindings
190                .get(&id)
191                .and_then(|index| self.derived.results::<A>().get(*index))
192        }
193
194        /// The results for expression and its children.
195        ///
196        /// The results for the expression itself will be the last element.
197        ///
198        /// # Panics
199        ///
200        /// This method panics if `A` was not installed as a required analysis.
201        pub fn results<A: Analysis>(&self) -> &'a [A::Value] {
202            &self.derived.results::<A>()[self.lower..self.upper]
203        }
204
205        /// Bindings from local identifiers to result offsets for analysis values.
206        ///
207        /// This method returns all bindings, which may include bindings not in scope for
208        /// the expression and its children; they should be ignored.
209        pub fn bindings(&self) -> &'a BTreeMap<LocalId, usize> {
210            self.derived.bindings()
211        }
212
213        /// Subviews over `self` corresponding to the children of the expression, in reverse order.
214        ///
215        /// These views should disjointly cover the same interval as `self`, except for the last element
216        /// which corresponds to the expression itself.
217        ///
218        /// The number of produced items should exactly match the number of children, which need not
219        /// be provided as an argument. This relies on the well-formedness of the view, which should
220        /// exhaust itself just as it enumerates its last (the first) child view.
221        pub fn children_rev(&self) -> impl Iterator<Item = DerivedView<'a>> + 'a {
222            // This logic is copy/paste from `Derived::children_of_rev` but it was annoying to layer
223            // it over the output of that function, and perhaps clearer to rewrite in any case.
224
225            // Discard the last element (the size of the expression's subtree).
226            // Repeatedly read out the last element, then peel off that many elements.
227            // Each extracted slice corresponds to a child of the current expression.
228            // We should end cleanly with an empty slice, otherwise there is an issue.
229            let sizes = self.results::<SubtreeSize>();
230            let sizes = &sizes[..sizes.len() - 1];
231
232            let offset = self.lower;
233            let derived = self.derived;
234            (0..).scan(sizes, move |sizes, _| {
235                if let Some(size) = sizes.last() {
236                    *sizes = &sizes[..sizes.len() - size];
237                    Some(Self {
238                        derived,
239                        lower: offset + sizes.len(),
240                        upper: offset + sizes.len() + size,
241                    })
242                } else {
243                    None
244                }
245            })
246        }
247
248        /// A convenience method for the view over the expressions last child.
249        ///
250        /// This method is appropriate to call on expressions with multiple children,
251        /// and in particular for `Let` and `LetRecv` variants where the body is the
252        /// last child.
253        ///
254        /// It is an error to call this on a view for an expression with no children.
255        pub fn last_child(&self) -> DerivedView<'a> {
256            self.children_rev().next().unwrap()
257        }
258    }
259
260    /// A builder wrapper to accumulate announced dependencies and construct default state.
261    #[allow(missing_debug_implementations)]
262    pub struct DerivedBuilder<'a> {
263        result: Derived,
264        features: &'a OptimizerFeatures,
265    }
266
267    impl<'a> DerivedBuilder<'a> {
268        /// Create a new [`DerivedBuilder`] parameterized by [`OptimizerFeatures`].
269        pub fn new(features: &'a OptimizerFeatures) -> Self {
270            // The default builder should include `SubtreeSize` to facilitate navigation.
271            let mut builder = DerivedBuilder {
272                result: Derived::default(),
273                features,
274            };
275            builder.require(SubtreeSize);
276            builder
277        }
278    }
279
280    impl<'a> DerivedBuilder<'a> {
281        /// Announces a dependence on an analysis `A`.
282        ///
283        /// This ensures that `A` will be performed, and before any analysis that
284        /// invokes this method.
285        pub fn require<A: Analysis>(&mut self, analysis: A) {
286            // The method recursively descends through required analyses, first
287            // installing each in `result.analyses` and second in `result.order`.
288            // The first is an obligation, and serves as an indication that we have
289            // found a cycle in dependencies.
290            let type_id = TypeId::of::<Bundle<A>>();
291            if !self.result.order.contains(&type_id) {
292                // If we have not sequenced `type_id` but have a bundle, it means
293                // we are in the process of fulfilling its requirements: a cycle.
294                if self.result.analyses.contains_key(&type_id) {
295                    panic!("Cyclic dependency detected: {}", std::any::type_name::<A>());
296                }
297                // Insert the analysis bundle first, so that we can detect cycles.
298                self.result.analyses.insert(
299                    type_id,
300                    Box::new(Bundle::<A> {
301                        analysis,
302                        results: Vec::new(),
303                        fuel: 100,
304                        allow_optimistic: self.features.enable_letrec_fixpoint_analysis,
305                    }),
306                );
307                A::announce_dependencies(self);
308                // All dependencies are successfully sequenced; sequence `type_id`.
309                self.result.order.push(type_id);
310            }
311        }
312        /// Complete the building: perform analyses and return the resulting `Derivation`.
313        pub fn visit(mut self, expr: &MirRelationExpr) -> Derived {
314            // A stack of expressions to process (`Ok`) and let bindings to fill (`Err`).
315            let mut todo = vec![Ok(expr)];
316            // Expressions in reverse post-order: each expression, followed by its children in reverse order.
317            // We will reverse this to get the post order, but must form it in reverse.
318            let mut rev_post_order = Vec::new();
319            while let Some(command) = todo.pop() {
320                match command {
321                    // An expression to visit.
322                    Ok(expr) => {
323                        match expr {
324                            MirRelationExpr::Let { id, value, body } => {
325                                todo.push(Ok(value));
326                                todo.push(Err(*id));
327                                todo.push(Ok(body));
328                            }
329                            MirRelationExpr::LetRec {
330                                ids, values, body, ..
331                            } => {
332                                for (id, value) in ids.iter().zip_eq(values) {
333                                    todo.push(Ok(value));
334                                    todo.push(Err(*id));
335                                }
336                                todo.push(Ok(body));
337                            }
338                            _ => {
339                                todo.extend(expr.children().map(Ok));
340                            }
341                        }
342                        rev_post_order.push(expr);
343                    }
344                    // A local id to install
345                    Err(local_id) => {
346                        // Capture the *remaining* work, which we'll need to flip around.
347                        let prior = self.result.bindings.insert(local_id, rev_post_order.len());
348                        assert_none!(prior, "Shadowing not allowed");
349                    }
350                }
351            }
352            // Flip the offsets now that we know a length.
353            for value in self.result.bindings.values_mut() {
354                *value = rev_post_order.len() - *value - 1;
355            }
356            // Visit the pre-order in reverse order: post-order.
357            rev_post_order.reverse();
358
359            // Apply each analysis to `expr` in order.
360            for id in self.result.order.iter() {
361                if let Some(mut bundle) = self.result.analyses.remove(id) {
362                    bundle.analyse(&rev_post_order[..], &self.result);
363                    self.result.analyses.insert(*id, bundle);
364                }
365            }
366
367            self.result
368        }
369    }
370
371    /// An abstraction for an analysis and associated state.
372    trait AnalysisBundle: Any {
373        /// Populates internal state for all of `exprs`.
374        ///
375        /// Result indicates whether new information was produced for `exprs.last()`.
376        fn analyse(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool;
377        /// Upcasts `self` to a `&dyn Any`.
378        ///
379        /// NOTE: This is required until <https://github.com/rust-lang/rfcs/issues/2765> is fixed
380        fn as_any(&self) -> &dyn std::any::Any;
381    }
382
383    /// A wrapper for analysis state.
384    struct Bundle<A: Analysis> {
385        /// The algorithm instance used to derive the results.
386        analysis: A,
387        /// A vector of results.
388        results: Vec<A::Value>,
389        /// Counts down with each `LetRec` re-iteration, to avoid unbounded effort.
390        /// Should it reach zero, the analysis should discard its results and restart as if pessimistic.
391        fuel: usize,
392        /// Allow optimistic analysis for `A` (otherwise we always do pesimistic
393        /// analysis, even if a [`crate::analysis::Lattice`] is available for `A`).
394        allow_optimistic: bool,
395    }
396
397    impl<A: Analysis> AnalysisBundle for Bundle<A> {
398        fn analyse(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool {
399            self.results.clear();
400            // Attempt optimistic analysis, and if that fails go pessimistic.
401            let update = A::lattice()
402                .filter(|_| self.allow_optimistic)
403                .and_then(|lattice| {
404                    for _ in exprs.iter() {
405                        self.results.push(lattice.top());
406                    }
407                    self.analyse_optimistic(exprs, 0, exprs.len(), depends, &*lattice)
408                        .ok()
409                })
410                .unwrap_or_else(|| {
411                    self.results.clear();
412                    self.analyse_pessimistic(exprs, depends)
413                });
414            assert_eq!(self.results.len(), exprs.len());
415            update
416        }
417        fn as_any(&self) -> &dyn std::any::Any {
418            self
419        }
420    }
421
422    impl<A: Analysis> Bundle<A> {
423        /// Analysis that starts optimistically but is only correct at a fixed point.
424        ///
425        /// Will fail out to `analyse_pessimistic` if the lattice is missing, or `self.fuel` is exhausted.
426        /// When successful, the result indicates whether new information was produced for `exprs.last()`.
427        fn analyse_optimistic(
428            &mut self,
429            exprs: &[&MirRelationExpr],
430            lower: usize,
431            upper: usize,
432            depends: &Derived,
433            lattice: &dyn crate::analysis::Lattice<A::Value>,
434        ) -> Result<bool, ()> {
435            // Repeatedly re-evaluate the whole tree bottom up, until no changes of fuel spent.
436            let mut changed = true;
437            while changed {
438                changed = false;
439
440                // Bail out if we have done too many `LetRec` passes in this analysis.
441                self.fuel -= 1;
442                if self.fuel == 0 {
443                    return Err(());
444                }
445
446                // Track if repetitions may be required, to avoid iteration if they are not.
447                let mut is_recursive = false;
448                // Update each derived value and track if any have changed.
449                for index in lower..upper {
450                    let value = self.derive(exprs[index], index, depends);
451                    changed = lattice.meet_assign(&mut self.results[index], value) || changed;
452                    if let MirRelationExpr::LetRec { .. } = &exprs[index] {
453                        is_recursive = true;
454                    }
455                }
456
457                // Un-set the potential loop if there was no recursion.
458                if !is_recursive {
459                    changed = false;
460                }
461            }
462            Ok(true)
463        }
464
465        /// Analysis that starts conservatively and can be stopped at any point.
466        ///
467        /// Result indicates whether new information was produced for `exprs.last()`.
468        fn analyse_pessimistic(&mut self, exprs: &[&MirRelationExpr], depends: &Derived) -> bool {
469            // TODO: consider making iterative, from some `bottom()` up using `join_assign()`.
470            self.results.clear();
471            for (index, expr) in exprs.iter().enumerate() {
472                self.results.push(self.derive(expr, index, depends));
473            }
474            true
475        }
476
477        #[inline]
478        fn derive(&self, expr: &MirRelationExpr, index: usize, depends: &Derived) -> A::Value {
479            self.analysis
480                .derive(expr, index, &self.results[..], depends)
481        }
482    }
483}
484
485/// Expression subtree sizes
486///
487/// This analysis counts the number of expressions in each subtree, and is most useful
488/// for navigating the results of other analyses that are offset by subtree sizes.
489pub mod subtree {
490
491    use super::{Analysis, Derived};
492    use mz_expr::MirRelationExpr;
493
494    /// Analysis that determines the size in child expressions of relation expressions.
495    #[derive(Debug)]
496    pub struct SubtreeSize;
497
498    impl Analysis for SubtreeSize {
499        type Value = usize;
500
501        fn derive(
502            &self,
503            expr: &MirRelationExpr,
504            index: usize,
505            results: &[Self::Value],
506            _depends: &Derived,
507        ) -> Self::Value {
508            match expr {
509                MirRelationExpr::Constant { .. } | MirRelationExpr::Get { .. } => 1,
510                _ => {
511                    let mut offset = 1;
512                    for _ in expr.children() {
513                        offset += results[index - offset];
514                    }
515                    offset
516                }
517            }
518        }
519    }
520}
521
522/// Expression arities
523mod arity {
524
525    use super::{Analysis, Derived};
526    use mz_expr::MirRelationExpr;
527
528    /// Analysis that determines the number of columns of relation expressions.
529    #[derive(Debug)]
530    pub struct Arity;
531
532    impl Analysis for Arity {
533        type Value = usize;
534
535        fn derive(
536            &self,
537            expr: &MirRelationExpr,
538            index: usize,
539            results: &[Self::Value],
540            depends: &Derived,
541        ) -> Self::Value {
542            let mut offsets = depends
543                .children_of_rev(index, expr.children().count())
544                .map(|child| results[child])
545                .collect::<Vec<_>>();
546            offsets.reverse();
547            expr.arity_with_input_arities(offsets.into_iter())
548        }
549    }
550}
551
552/// Expression types
553mod types {
554
555    use super::{Analysis, Derived, Lattice};
556    use itertools::Itertools;
557    use mz_expr::MirRelationExpr;
558    use mz_repr::SqlColumnType;
559
560    /// Analysis that determines the type of relation expressions.
561    ///
562    /// The value is `Some` when it discovers column types, and `None` in the case that
563    /// it has discovered no constraining information on the column types. The `None`
564    /// variant should only occur in the course of iteration, and should not be revealed
565    /// as an output of the analysis. One can `unwrap()` the result, and if it errors then
566    /// either the expression is malformed or the analysis has a bug.
567    ///
568    /// The analysis will panic if an expression is not well typed (i.e. if `try_col_with_input_cols`
569    /// returns an error).
570    #[derive(Debug)]
571    pub struct SqlRelationType;
572
573    impl Analysis for SqlRelationType {
574        type Value = Option<Vec<SqlColumnType>>;
575
576        fn derive(
577            &self,
578            expr: &MirRelationExpr,
579            index: usize,
580            results: &[Self::Value],
581            depends: &Derived,
582        ) -> Self::Value {
583            let offsets = depends
584                .children_of_rev(index, expr.children().count())
585                .map(|child| &results[child])
586                .collect::<Vec<_>>();
587
588            // For most expressions we'll apply `try_col_with_input_cols`, but for `Get` expressions
589            // we'll want to combine what we know (iteratively) with the stated `Get::typ`.
590            match expr {
591                MirRelationExpr::Get {
592                    id: mz_expr::Id::Local(i),
593                    typ,
594                    ..
595                } => {
596                    let mut result = typ.column_types.clone();
597                    if let Some(o) = depends.bindings().get(i) {
598                        if let Some(t) = results.get(*o) {
599                            if let Some(rec_typ) = t {
600                                // Reconcile nullability statements.
601                                // Unclear if we should trust `typ`.
602                                assert_eq!(result.len(), rec_typ.len());
603                                result.clone_from(rec_typ);
604                                for (res, col) in result.iter_mut().zip_eq(typ.column_types.iter())
605                                {
606                                    if !col.nullable {
607                                        res.nullable = false;
608                                    }
609                                }
610                            } else {
611                                // Our `None` information indicates that we are optimistically
612                                // assuming the best, including that all columns are non-null.
613                                // This should only happen in the first visit to a `Get` expr.
614                                // Use `typ`, but flatten nullability.
615                                for col in result.iter_mut() {
616                                    col.nullable = false;
617                                }
618                            }
619                        }
620                    }
621                    Some(result)
622                }
623                _ => {
624                    // Every expression with inputs should have non-`None` inputs at this point.
625                    let input_cols = offsets.into_iter().rev().map(|o| {
626                        o.as_ref()
627                            .expect("SqlRelationType analysis discovered type-less expression")
628                    });
629                    Some(expr.try_col_with_input_cols(input_cols).unwrap())
630                }
631            }
632        }
633
634        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
635            Some(Box::new(RTLattice))
636        }
637    }
638
639    struct RTLattice;
640
641    impl Lattice<Option<Vec<SqlColumnType>>> for RTLattice {
642        fn top(&self) -> Option<Vec<SqlColumnType>> {
643            None
644        }
645        fn meet_assign(
646            &self,
647            a: &mut Option<Vec<SqlColumnType>>,
648            b: Option<Vec<SqlColumnType>>,
649        ) -> bool {
650            match (a, b) {
651                (_, None) => false,
652                (Some(a), Some(b)) => {
653                    let mut changed = false;
654                    assert_eq!(a.len(), b.len());
655                    for (at, bt) in a.iter_mut().zip_eq(b.iter()) {
656                        assert_eq!(at.scalar_type, bt.scalar_type);
657                        if !at.nullable && bt.nullable {
658                            at.nullable = true;
659                            changed = true;
660                        }
661                    }
662                    changed
663                }
664                (a, b) => {
665                    *a = b;
666                    true
667                }
668            }
669        }
670    }
671}
672
673/// Expression types
674mod repr_types {
675
676    use super::{Analysis, Derived, Lattice};
677    use itertools::Itertools;
678    use mz_expr::MirRelationExpr;
679    use mz_repr::ReprColumnType;
680
681    /// Analysis that determines the type of relation expressions.
682    ///
683    /// The value is `Some` when it discovers column types, and `None` in the case that
684    /// it has discovered no constraining information on the column types. The `None`
685    /// variant should only occur in the course of iteration, and should not be revealed
686    /// as an output of the analysis. One can `unwrap()` the result, and if it errors then
687    /// either the expression is malformed or the analysis has a bug.
688    ///
689    /// The analysis will panic if an expression is not well typed (i.e. if `try_col_with_input_cols`
690    /// returns an error).
691    #[derive(Debug)]
692    pub struct ReprRelationType;
693
694    impl Analysis for ReprRelationType {
695        type Value = Option<Vec<ReprColumnType>>;
696
697        fn derive(
698            &self,
699            expr: &MirRelationExpr,
700            index: usize,
701            results: &[Self::Value],
702            depends: &Derived,
703        ) -> Self::Value {
704            let offsets = depends
705                .children_of_rev(index, expr.children().count())
706                .map(|child| &results[child])
707                .collect::<Vec<_>>();
708
709            // For most expressions we'll apply `try_col_with_input_cols`, but for `Get` expressions
710            // we'll want to combine what we know (iteratively) with the stated `Get::typ`.
711            match expr {
712                MirRelationExpr::Get {
713                    id: mz_expr::Id::Local(i),
714                    typ,
715                    ..
716                } => {
717                    let mut result = typ
718                        .column_types
719                        .iter()
720                        .map(ReprColumnType::from)
721                        .collect_vec();
722                    if let Some(o) = depends.bindings().get(i) {
723                        if let Some(t) = results.get(*o) {
724                            if let Some(rec_typ) = t {
725                                // Reconcile nullability statements.
726                                // Unclear if we should trust `typ`.
727                                assert_eq!(result.len(), rec_typ.len());
728                                result.clone_from(rec_typ);
729                                for (res, col) in result.iter_mut().zip_eq(typ.column_types.iter())
730                                {
731                                    if !col.nullable {
732                                        res.nullable = false;
733                                    }
734                                }
735                            } else {
736                                // Our `None` information indicates that we are optimistically
737                                // assuming the best, including that all columns are non-null.
738                                // This should only happen in the first visit to a `Get` expr.
739                                // Use `typ`, but flatten nullability.
740                                for col in result.iter_mut() {
741                                    col.nullable = false;
742                                }
743                            }
744                        }
745                    }
746                    Some(result)
747                }
748                _ => {
749                    // Every expression with inputs should have non-`None` inputs at this point.
750                    let input_cols = offsets.into_iter().rev().map(|o| {
751                        o.as_ref()
752                            .expect("ReprRelationType analysis discovered type-less expression")
753                    });
754
755                    let repr_typ = expr.try_repr_col_with_input_repr_cols(input_cols).unwrap();
756                    Some(repr_typ)
757                }
758            }
759        }
760
761        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
762            Some(Box::new(RTLattice))
763        }
764    }
765
766    struct RTLattice;
767
768    impl Lattice<Option<Vec<ReprColumnType>>> for RTLattice {
769        fn top(&self) -> Option<Vec<ReprColumnType>> {
770            None
771        }
772        fn meet_assign(
773            &self,
774            a: &mut Option<Vec<ReprColumnType>>,
775            b: Option<Vec<ReprColumnType>>,
776        ) -> bool {
777            match (a, b) {
778                (_, None) => false,
779                (Some(a), Some(b)) => {
780                    let mut changed = false;
781                    assert_eq!(a.len(), b.len());
782                    for (at, bt) in a.iter_mut().zip_eq(b.iter()) {
783                        assert_eq!(at.scalar_type, bt.scalar_type);
784                        if !at.nullable && bt.nullable {
785                            at.nullable = true;
786                            changed = true;
787                        }
788                    }
789                    changed
790                }
791                (a, b) => {
792                    *a = b;
793                    true
794                }
795            }
796        }
797    }
798}
799
800/// Expression unique keys
801mod unique_keys {
802
803    use super::arity::Arity;
804    use super::{Analysis, Derived, DerivedBuilder, Lattice};
805    use mz_expr::MirRelationExpr;
806
807    /// Analysis that determines the unique keys of relation expressions.
808    ///
809    /// The analysis value is a `Vec<Vec<usize>>`, which should be interpreted as a list
810    /// of sets of column identifiers, each set of which has the property that there is at
811    /// most one instance of each assignment of values to those columns.
812    ///
813    /// The sets are minimal, in that any superset of another set is removed from the list.
814    /// Any superset of unique key columns are also unique key columns.
815    #[derive(Debug)]
816    pub struct UniqueKeys;
817
818    impl Analysis for UniqueKeys {
819        type Value = Vec<Vec<usize>>;
820
821        fn announce_dependencies(builder: &mut DerivedBuilder) {
822            builder.require(Arity);
823        }
824
825        fn derive(
826            &self,
827            expr: &MirRelationExpr,
828            index: usize,
829            results: &[Self::Value],
830            depends: &Derived,
831        ) -> Self::Value {
832            let mut offsets = depends
833                .children_of_rev(index, expr.children().count())
834                .collect::<Vec<_>>();
835            offsets.reverse();
836
837            match expr {
838                MirRelationExpr::Get {
839                    id: mz_expr::Id::Local(i),
840                    typ,
841                    ..
842                } => {
843                    // We have information from `typ` and from the analysis.
844                    // We should "join" them, unioning and reducing the keys.
845                    let mut keys = typ.keys.clone();
846                    if let Some(o) = depends.bindings().get(i) {
847                        if let Some(ks) = results.get(*o) {
848                            for k in ks.iter() {
849                                antichain_insert(&mut keys, k.clone());
850                            }
851                            keys.extend(ks.iter().cloned());
852                            keys.sort();
853                            keys.dedup();
854                        }
855                    }
856                    keys
857                }
858                _ => {
859                    let arity = depends.results::<Arity>();
860                    expr.keys_with_input_keys(
861                        offsets.iter().map(|o| arity[*o]),
862                        offsets.iter().map(|o| &results[*o]),
863                    )
864                }
865            }
866        }
867
868        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
869            Some(Box::new(UKLattice))
870        }
871    }
872
873    fn antichain_insert(into: &mut Vec<Vec<usize>>, item: Vec<usize>) {
874        // Insert only if there is not a dominating element of `into`.
875        if into.iter().all(|key| !key.iter().all(|k| item.contains(k))) {
876            into.retain(|key| !key.iter().all(|k| item.contains(k)));
877            into.push(item);
878        }
879    }
880
881    /// Lattice for sets of columns that define a unique key.
882    ///
883    /// An element `Vec<Vec<usize>>` describes all sets of columns `Vec<usize>` that are a
884    /// superset of some set of columns in the lattice element.
885    struct UKLattice;
886
887    impl Lattice<Vec<Vec<usize>>> for UKLattice {
888        fn top(&self) -> Vec<Vec<usize>> {
889            vec![vec![]]
890        }
891        fn meet_assign(&self, a: &mut Vec<Vec<usize>>, b: Vec<Vec<usize>>) -> bool {
892            a.sort();
893            a.dedup();
894            let mut c = Vec::new();
895            for cols_a in a.iter_mut() {
896                cols_a.sort();
897                cols_a.dedup();
898                for cols_b in b.iter() {
899                    let mut cols_c = cols_a.iter().chain(cols_b).cloned().collect::<Vec<_>>();
900                    cols_c.sort();
901                    cols_c.dedup();
902                    antichain_insert(&mut c, cols_c);
903                }
904            }
905            c.sort();
906            c.dedup();
907            std::mem::swap(a, &mut c);
908            a != &mut c
909        }
910    }
911}
912
913/// Determines if accumulated frequences can be negative.
914///
915/// This analysis assumes that globally identified collection have the property, and it is
916/// incorrect to apply it to expressions that reference external collections that may have
917/// negative accumulations.
918mod non_negative {
919
920    use super::{Analysis, Derived, Lattice};
921    use crate::analysis::common_lattice::BoolLattice;
922    use mz_expr::{Id, MirRelationExpr};
923
924    /// Analysis that determines if all accumulations at all times are non-negative.
925    ///
926    /// The analysis assumes that `Id::Global` references only refer to non-negative collections.
927    #[derive(Debug)]
928    pub struct NonNegative;
929
930    impl Analysis for NonNegative {
931        type Value = bool;
932
933        fn derive(
934            &self,
935            expr: &MirRelationExpr,
936            index: usize,
937            results: &[Self::Value],
938            depends: &Derived,
939        ) -> Self::Value {
940            match expr {
941                MirRelationExpr::Constant { rows, .. } => rows
942                    .as_ref()
943                    .map(|r| r.iter().all(|(_, diff)| *diff >= mz_repr::Diff::ZERO))
944                    .unwrap_or(true),
945                MirRelationExpr::Get { id, .. } => match id {
946                    Id::Local(id) => {
947                        let index = *depends
948                            .bindings()
949                            .get(id)
950                            .expect("Dependency info not found");
951                        *results.get(index).unwrap_or(&false)
952                    }
953                    Id::Global(_) => true,
954                },
955                // Negate must be false unless input is "non-positive".
956                MirRelationExpr::Negate { .. } => false,
957                // Threshold ensures non-negativity.
958                MirRelationExpr::Threshold { .. } => true,
959                // Reduce errors on negative input.
960                MirRelationExpr::Reduce { .. } => true,
961                MirRelationExpr::Join { .. } => {
962                    // If all inputs are non-negative, the join is non-negative.
963                    depends
964                        .children_of_rev(index, expr.children().count())
965                        .all(|off| results[off])
966                }
967                MirRelationExpr::Union { base, inputs } => {
968                    // If all inputs are non-negative, the union is non-negative.
969                    let all_non_negative = depends
970                        .children_of_rev(index, expr.children().count())
971                        .all(|off| results[off]);
972
973                    if all_non_negative {
974                        return true;
975                    }
976
977                    // We look for the pattern `Union { base, Negate(Subset(base)) }`.
978                    // TODO: take some care to ensure that union fusion does not introduce a regression.
979                    if inputs.len() == 1 {
980                        if let MirRelationExpr::Negate { input } = &inputs[0] {
981                            // If `base` is non-negative, and `is_superset_of(base, input)`, return true.
982                            // TODO: this is not correct until we have `is_superset_of` validate non-negativity
983                            // as it goes, but it matches the current implementation.
984                            let mut children = depends.children_of_rev(index, 2);
985                            let _negate = children.next().unwrap();
986                            let base_id = children.next().unwrap();
987                            debug_assert_eq!(children.next(), None);
988                            if results[base_id] && is_superset_of(&*base, &*input) {
989                                return true;
990                            }
991                        }
992                    }
993
994                    false
995                }
996                _ => results[index - 1],
997            }
998        }
999
1000        fn lattice() -> Option<Box<dyn Lattice<Self::Value>>> {
1001            Some(Box::new(BoolLattice))
1002        }
1003    }
1004
1005    /// Returns true only if `rhs.negate().union(lhs)` contains only non-negative multiplicities
1006    /// once consolidated.
1007    ///
1008    /// Informally, this happens when `rhs` is a multiset subset of `lhs`, meaning the multiplicity
1009    /// of any record in `rhs` is at most the multiplicity of the same record in `lhs`.
1010    ///
1011    /// This method recursively descends each of `lhs` and `rhs` and performs a great many equality
1012    /// tests, which has the potential to be quadratic. We should consider restricting its attention
1013    /// to `Get` identifiers, under the premise that equal AST nodes would necessarily be identified
1014    /// by common subexpression elimination. This requires care around recursively bound identifiers.
1015    ///
1016    /// These rules are .. somewhat arbitrary, and likely reflect observed opportunities. For example,
1017    /// while we do relate `distinct(filter(A)) <= distinct(A)`, we do not relate `distinct(A) <= A`.
1018    /// Further thoughts about the class of optimizations, and whether there should be more or fewer,
1019    /// can be found here: <https://github.com/MaterializeInc/database-issues/issues/4044>.
1020    fn is_superset_of(mut lhs: &MirRelationExpr, mut rhs: &MirRelationExpr) -> bool {
1021        // This implementation is iterative.
1022        // Before converting this implementation to recursive (e.g. to improve its accuracy)
1023        // make sure to use the `CheckedRecursion` struct to avoid blowing the stack.
1024        while lhs != rhs {
1025            match rhs {
1026                MirRelationExpr::Filter { input, .. } => rhs = &**input,
1027                MirRelationExpr::TopK { input, .. } => rhs = &**input,
1028                // Descend in both sides if the current roots are
1029                // projections with the same `outputs` vector.
1030                MirRelationExpr::Project {
1031                    input: rhs_input,
1032                    outputs: rhs_outputs,
1033                } => match lhs {
1034                    MirRelationExpr::Project {
1035                        input: lhs_input,
1036                        outputs: lhs_outputs,
1037                    } if lhs_outputs == rhs_outputs => {
1038                        rhs = &**rhs_input;
1039                        lhs = &**lhs_input;
1040                    }
1041                    _ => return false,
1042                },
1043                // Descend in both sides if the current roots are reduces with empty aggregates
1044                // on the same set of keys (that is, a distinct operation on those keys).
1045                MirRelationExpr::Reduce {
1046                    input: rhs_input,
1047                    group_key: rhs_group_key,
1048                    aggregates: rhs_aggregates,
1049                    monotonic: _,
1050                    expected_group_size: _,
1051                } if rhs_aggregates.is_empty() => match lhs {
1052                    MirRelationExpr::Reduce {
1053                        input: lhs_input,
1054                        group_key: lhs_group_key,
1055                        aggregates: lhs_aggregates,
1056                        monotonic: _,
1057                        expected_group_size: _,
1058                    } if lhs_aggregates.is_empty() && lhs_group_key == rhs_group_key => {
1059                        rhs = &**rhs_input;
1060                        lhs = &**lhs_input;
1061                    }
1062                    _ => return false,
1063                },
1064                _ => {
1065                    // TODO: Imagine more complex reasoning here!
1066                    return false;
1067                }
1068            }
1069        }
1070        true
1071    }
1072}
1073
1074mod column_names {
1075    use std::ops::Range;
1076    use std::sync::Arc;
1077
1078    use super::Analysis;
1079    use mz_expr::{AggregateFunc, Id, MirRelationExpr, MirScalarExpr, TableFunc};
1080    use mz_repr::explain::ExprHumanizer;
1081    use mz_repr::{GlobalId, UNKNOWN_COLUMN_NAME};
1082    use mz_sql::ORDINALITY_COL_NAME;
1083
1084    /// An abstract type denoting an inferred column name.
1085    #[derive(Debug, Clone)]
1086    pub enum ColumnName {
1087        /// A column with name inferred to be equal to a GlobalId schema column.
1088        Global(GlobalId, usize),
1089        /// An anonymous expression named after the top-level function name.
1090        Aggregate(AggregateFunc, Box<ColumnName>),
1091        /// A column with a name that has been saved from the original SQL query.
1092        Annotated(Arc<str>),
1093        /// An column with an unknown name.
1094        Unknown,
1095    }
1096
1097    impl ColumnName {
1098        /// Return `true` iff the variant has an inferred name.
1099        pub fn is_known(&self) -> bool {
1100            match self {
1101                Self::Global(..) | Self::Aggregate(..) => true,
1102                // We treat annotated columns as unknown because we would rather
1103                // override them with inferred names, if we can.
1104                Self::Annotated(..) | Self::Unknown => false,
1105            }
1106        }
1107
1108        /// Humanize the column to a [`String`], returns an empty [`String`] for
1109        /// unknown columns (or columns named `UNKNOWN_COLUMN_NAME`).
1110        pub fn humanize(&self, humanizer: &dyn ExprHumanizer) -> String {
1111            match self {
1112                Self::Global(id, c) => humanizer.humanize_column(*id, *c).unwrap_or_default(),
1113                Self::Aggregate(func, expr) => {
1114                    let func = func.name();
1115                    let expr = expr.humanize(humanizer);
1116                    if expr.is_empty() {
1117                        String::from(func)
1118                    } else {
1119                        format!("{func}_{expr}")
1120                    }
1121                }
1122                Self::Annotated(name) => name.to_string(),
1123                Self::Unknown => String::new(),
1124            }
1125        }
1126
1127        /// Clone this column name if it is known, otherwise try to use the provided
1128        /// name if it is available.
1129        pub fn cloned_or_annotated(&self, name: &Option<Arc<str>>) -> Self {
1130            match self {
1131                Self::Global(..) | Self::Aggregate(..) | Self::Annotated(..) => self.clone(),
1132                Self::Unknown => name
1133                    .as_ref()
1134                    .filter(|name| name.as_ref() != UNKNOWN_COLUMN_NAME)
1135                    .map_or_else(|| Self::Unknown, |name| Self::Annotated(Arc::clone(name))),
1136            }
1137        }
1138    }
1139
1140    /// Compute the column types of each subtree of a [MirRelationExpr] from the
1141    /// bottom-up.
1142    #[derive(Debug)]
1143    pub struct ColumnNames;
1144
1145    impl ColumnNames {
1146        /// fallback schema consisting of ordinal column names: #0, #1, ...
1147        fn anonymous(range: Range<usize>) -> impl Iterator<Item = ColumnName> {
1148            range.map(|_| ColumnName::Unknown)
1149        }
1150
1151        /// fallback schema consisting of ordinal column names: #0, #1, ...
1152        fn extend_with_scalars(column_names: &mut Vec<ColumnName>, scalars: &Vec<MirScalarExpr>) {
1153            for scalar in scalars {
1154                column_names.push(match scalar {
1155                    MirScalarExpr::Column(c, name) => column_names[*c].cloned_or_annotated(&name.0),
1156                    _ => ColumnName::Unknown,
1157                });
1158            }
1159        }
1160    }
1161
1162    impl Analysis for ColumnNames {
1163        type Value = Vec<ColumnName>;
1164
1165        fn derive(
1166            &self,
1167            expr: &MirRelationExpr,
1168            index: usize,
1169            results: &[Self::Value],
1170            depends: &crate::analysis::Derived,
1171        ) -> Self::Value {
1172            use MirRelationExpr::*;
1173
1174            match expr {
1175                Constant { rows: _, typ } => {
1176                    // Fallback to an anonymous schema for constants.
1177                    ColumnNames::anonymous(0..typ.arity()).collect()
1178                }
1179                Get {
1180                    id: Id::Global(id),
1181                    typ,
1182                    access_strategy: _,
1183                } => {
1184                    // Emit ColumnName::Global instances for each column in the
1185                    // `Get` type. Those can be resolved to real names later when an
1186                    // ExpressionHumanizer is available.
1187                    (0..typ.columns().len())
1188                        .map(|c| ColumnName::Global(*id, c))
1189                        .collect()
1190                }
1191                Get {
1192                    id: Id::Local(id),
1193                    typ,
1194                    access_strategy: _,
1195                } => {
1196                    let index_child = *depends.bindings().get(id).expect("id in scope");
1197                    if index_child < results.len() {
1198                        results[index_child].clone()
1199                    } else {
1200                        // Possible because we infer LetRec bindings in order. This
1201                        // can be improved by introducing a fixpoint loop in the
1202                        // Env<A>::schedule_tasks LetRec handling block.
1203                        ColumnNames::anonymous(0..typ.arity()).collect()
1204                    }
1205                }
1206                Let {
1207                    id: _,
1208                    value: _,
1209                    body: _,
1210                } => {
1211                    // Return the column names of the `body`.
1212                    results[index - 1].clone()
1213                }
1214                LetRec {
1215                    ids: _,
1216                    values: _,
1217                    limits: _,
1218                    body: _,
1219                } => {
1220                    // Return the column names of the `body`.
1221                    results[index - 1].clone()
1222                }
1223                Project { input: _, outputs } => {
1224                    // Permute the column names of the input.
1225                    let input_column_names = &results[index - 1];
1226                    let mut column_names = vec![];
1227                    for col in outputs {
1228                        column_names.push(input_column_names[*col].clone());
1229                    }
1230                    column_names
1231                }
1232                Map { input: _, scalars } => {
1233                    // Extend the column names of the input with anonymous columns.
1234                    let mut column_names = results[index - 1].clone();
1235                    Self::extend_with_scalars(&mut column_names, scalars);
1236                    column_names
1237                }
1238                FlatMap {
1239                    input: _,
1240                    func,
1241                    exprs: _,
1242                } => {
1243                    // Extend the column names of the input with anonymous columns.
1244                    let mut column_names = results[index - 1].clone();
1245                    let func_output_start = column_names.len();
1246                    let func_output_end = column_names.len() + func.output_arity();
1247                    column_names.extend(Self::anonymous(func_output_start..func_output_end));
1248                    if let TableFunc::WithOrdinality { .. } = func {
1249                        // We know the name of the last column
1250                        // TODO(ggevay): generalize this to meaningful col names for all table functions
1251                        **column_names.last_mut().as_mut().expect(
1252                            "there is at least one output column, from the WITH ORDINALITY",
1253                        ) = ColumnName::Annotated(ORDINALITY_COL_NAME.into());
1254                    }
1255                    column_names
1256                }
1257                Filter {
1258                    input: _,
1259                    predicates: _,
1260                } => {
1261                    // Return the column names of the `input`.
1262                    results[index - 1].clone()
1263                }
1264                Join {
1265                    inputs: _,
1266                    equivalences: _,
1267                    implementation: _,
1268                } => {
1269                    let mut input_results = depends
1270                        .children_of_rev(index, expr.children().count())
1271                        .map(|child| &results[child])
1272                        .collect::<Vec<_>>();
1273                    input_results.reverse();
1274
1275                    let mut column_names = vec![];
1276                    for input_column_names in input_results {
1277                        column_names.extend(input_column_names.iter().cloned());
1278                    }
1279                    column_names
1280                }
1281                Reduce {
1282                    input: _,
1283                    group_key,
1284                    aggregates,
1285                    monotonic: _,
1286                    expected_group_size: _,
1287                } => {
1288                    // We clone and extend the input vector and then remove the part
1289                    // associated with the input at the end.
1290                    let mut column_names = results[index - 1].clone();
1291                    let input_arity = column_names.len();
1292
1293                    // Infer the group key part.
1294                    Self::extend_with_scalars(&mut column_names, group_key);
1295                    // Infer the aggregates part.
1296                    for aggregate in aggregates.iter() {
1297                        // The inferred name will consist of (1) the aggregate
1298                        // function name and (2) the aggregate expression (iff
1299                        // it is a simple column reference).
1300                        let func = aggregate.func.clone();
1301                        let expr = match aggregate.expr.as_column() {
1302                            Some(c) => column_names.get(c).unwrap_or(&ColumnName::Unknown).clone(),
1303                            None => ColumnName::Unknown,
1304                        };
1305                        column_names.push(ColumnName::Aggregate(func, Box::new(expr)));
1306                    }
1307                    // Remove the prefix associated with the input
1308                    column_names.drain(0..input_arity);
1309
1310                    column_names
1311                }
1312                TopK {
1313                    input: _,
1314                    group_key: _,
1315                    order_key: _,
1316                    limit: _,
1317                    offset: _,
1318                    monotonic: _,
1319                    expected_group_size: _,
1320                } => {
1321                    // Return the column names of the `input`.
1322                    results[index - 1].clone()
1323                }
1324                Negate { input: _ } => {
1325                    // Return the column names of the `input`.
1326                    results[index - 1].clone()
1327                }
1328                Threshold { input: _ } => {
1329                    // Return the column names of the `input`.
1330                    results[index - 1].clone()
1331                }
1332                Union { base: _, inputs: _ } => {
1333                    // Use the first non-empty column across all inputs.
1334                    let mut column_names = vec![];
1335
1336                    let mut inputs_results = depends
1337                        .children_of_rev(index, expr.children().count())
1338                        .map(|child| &results[child])
1339                        .collect::<Vec<_>>();
1340
1341                    let base_results = inputs_results.pop().unwrap();
1342                    inputs_results.reverse();
1343
1344                    for (i, mut column_name) in base_results.iter().cloned().enumerate() {
1345                        for input_results in inputs_results.iter() {
1346                            if !column_name.is_known() && input_results[i].is_known() {
1347                                column_name = input_results[i].clone();
1348                                break;
1349                            }
1350                        }
1351                        column_names.push(column_name);
1352                    }
1353
1354                    column_names
1355                }
1356                ArrangeBy { input: _, keys: _ } => {
1357                    // Return the column names of the `input`.
1358                    results[index - 1].clone()
1359                }
1360            }
1361        }
1362    }
1363}
1364
1365mod explain {
1366    //! Derived Analysis framework and definitions.
1367
1368    use std::collections::BTreeMap;
1369
1370    use mz_expr::MirRelationExpr;
1371    use mz_expr::explain::{ExplainContext, HumanizedExplain, HumanizerMode};
1372    use mz_ore::stack::RecursionLimitError;
1373    use mz_repr::explain::{Analyses, AnnotatedPlan};
1374
1375    use crate::analysis::equivalences::{Equivalences, HumanizedEquivalenceClasses};
1376
1377    // Analyses should have shortened paths when exported.
1378    use super::DerivedBuilder;
1379
1380    impl<'c> From<&ExplainContext<'c>> for DerivedBuilder<'c> {
1381        fn from(context: &ExplainContext<'c>) -> DerivedBuilder<'c> {
1382            let mut builder = DerivedBuilder::new(context.features);
1383            if context.config.subtree_size {
1384                builder.require(super::SubtreeSize);
1385            }
1386            if context.config.non_negative {
1387                builder.require(super::NonNegative);
1388            }
1389            if context.config.types {
1390                builder.require(super::SqlRelationType);
1391            }
1392            if context.config.arity {
1393                builder.require(super::Arity);
1394            }
1395            if context.config.keys {
1396                builder.require(super::UniqueKeys);
1397            }
1398            if context.config.cardinality {
1399                builder.require(super::Cardinality::with_stats(
1400                    context.cardinality_stats.clone(),
1401                ));
1402            }
1403            if context.config.column_names || context.config.humanized_exprs {
1404                builder.require(super::ColumnNames);
1405            }
1406            if context.config.equivalences {
1407                builder.require(Equivalences);
1408            }
1409            builder
1410        }
1411    }
1412
1413    /// Produce an [`AnnotatedPlan`] wrapping the given [`MirRelationExpr`] along
1414    /// with [`Analyses`] derived from the given context configuration.
1415    pub fn annotate_plan<'a>(
1416        plan: &'a MirRelationExpr,
1417        context: &'a ExplainContext,
1418    ) -> Result<AnnotatedPlan<'a, MirRelationExpr>, RecursionLimitError> {
1419        let mut annotations = BTreeMap::<&MirRelationExpr, Analyses>::default();
1420        let config = context.config;
1421
1422        // We want to annotate the plan with analyses in the following cases:
1423        // 1. An Analysis was explicitly requested in the ExplainConfig.
1424        // 2. Humanized expressions were requested in the ExplainConfig (in which
1425        //    case we need to derive the ColumnNames Analysis).
1426        if config.requires_analyses() || config.humanized_exprs {
1427            // get the annotation keys
1428            let subtree_refs = plan.post_order_vec();
1429            // get the annotation values
1430            let builder = DerivedBuilder::from(context);
1431            let derived = builder.visit(plan);
1432
1433            if config.subtree_size {
1434                for (expr, subtree_size) in std::iter::zip(
1435                    subtree_refs.iter(),
1436                    derived.results::<super::SubtreeSize>().into_iter(),
1437                ) {
1438                    let analyses = annotations.entry(expr).or_default();
1439                    analyses.subtree_size = Some(*subtree_size);
1440                }
1441            }
1442            if config.non_negative {
1443                for (expr, non_negative) in std::iter::zip(
1444                    subtree_refs.iter(),
1445                    derived.results::<super::NonNegative>().into_iter(),
1446                ) {
1447                    let analyses = annotations.entry(expr).or_default();
1448                    analyses.non_negative = Some(*non_negative);
1449                }
1450            }
1451
1452            if config.arity {
1453                for (expr, arity) in std::iter::zip(
1454                    subtree_refs.iter(),
1455                    derived.results::<super::Arity>().into_iter(),
1456                ) {
1457                    let analyses = annotations.entry(expr).or_default();
1458                    analyses.arity = Some(*arity);
1459                }
1460            }
1461
1462            if config.types {
1463                for (expr, types) in std::iter::zip(
1464                    subtree_refs.iter(),
1465                    derived.results::<super::SqlRelationType>().into_iter(),
1466                ) {
1467                    let analyses = annotations.entry(expr).or_default();
1468                    analyses.types = Some(types.clone());
1469                }
1470            }
1471
1472            if config.keys {
1473                for (expr, keys) in std::iter::zip(
1474                    subtree_refs.iter(),
1475                    derived.results::<super::UniqueKeys>().into_iter(),
1476                ) {
1477                    let analyses = annotations.entry(expr).or_default();
1478                    analyses.keys = Some(keys.clone());
1479                }
1480            }
1481
1482            if config.cardinality {
1483                for (expr, card) in std::iter::zip(
1484                    subtree_refs.iter(),
1485                    derived.results::<super::Cardinality>().into_iter(),
1486                ) {
1487                    let analyses = annotations.entry(expr).or_default();
1488                    analyses.cardinality = Some(card.to_string());
1489                }
1490            }
1491
1492            if config.column_names || config.humanized_exprs {
1493                for (expr, column_names) in std::iter::zip(
1494                    subtree_refs.iter(),
1495                    derived.results::<super::ColumnNames>().into_iter(),
1496                ) {
1497                    let analyses = annotations.entry(expr).or_default();
1498                    let value = column_names
1499                        .iter()
1500                        .map(|column_name| column_name.humanize(context.humanizer))
1501                        .collect();
1502                    analyses.column_names = Some(value);
1503                }
1504            }
1505
1506            if config.equivalences {
1507                for (expr, equivs) in std::iter::zip(
1508                    subtree_refs.iter(),
1509                    derived.results::<Equivalences>().into_iter(),
1510                ) {
1511                    let analyses = annotations.entry(expr).or_default();
1512                    analyses.equivalences = Some(match equivs.as_ref() {
1513                        Some(equivs) => HumanizedEquivalenceClasses {
1514                            equivalence_classes: equivs,
1515                            cols: analyses.column_names.as_ref(),
1516                            mode: HumanizedExplain::new(config.redacted),
1517                        }
1518                        .to_string(),
1519                        None => "<empty collection>".to_string(),
1520                    });
1521                }
1522            }
1523        }
1524
1525        Ok(AnnotatedPlan { plan, annotations })
1526    }
1527}
1528
1529/// Definition and helper structs for the [`Cardinality`] Analysis.
1530mod cardinality {
1531    use std::collections::{BTreeMap, BTreeSet};
1532
1533    use mz_expr::{
1534        BinaryFunc, Id, JoinImplementation, MirRelationExpr, MirScalarExpr, TableFunc, UnaryFunc,
1535        VariadicFunc,
1536    };
1537    use mz_ore::cast::{CastFrom, CastLossy, TryCastFrom};
1538    use mz_repr::GlobalId;
1539
1540    use ordered_float::OrderedFloat;
1541
1542    use super::{Analysis, Arity, SubtreeSize, UniqueKeys};
1543
1544    /// Compute the estimated cardinality of each subtree of a [MirRelationExpr] from the bottom up.
1545    #[allow(missing_debug_implementations)]
1546    pub struct Cardinality {
1547        /// Cardinalities for globally named entities
1548        pub stats: BTreeMap<GlobalId, usize>,
1549    }
1550
1551    impl Cardinality {
1552        /// A cardinality estimator with provided statistics for the given global identifiers
1553        pub fn with_stats(stats: BTreeMap<GlobalId, usize>) -> Self {
1554            Cardinality { stats }
1555        }
1556    }
1557
1558    impl Default for Cardinality {
1559        fn default() -> Self {
1560            Cardinality {
1561                stats: BTreeMap::new(),
1562            }
1563        }
1564    }
1565
1566    /// Cardinality estimates
1567    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1568    pub enum CardinalityEstimate {
1569        Unknown,
1570        Estimate(OrderedFloat<f64>),
1571    }
1572
1573    impl CardinalityEstimate {
1574        pub fn max(lhs: CardinalityEstimate, rhs: CardinalityEstimate) -> CardinalityEstimate {
1575            use CardinalityEstimate::*;
1576            match (lhs, rhs) {
1577                (Estimate(lhs), Estimate(rhs)) => Estimate(std::cmp::max(lhs, rhs)),
1578                _ => Unknown,
1579            }
1580        }
1581
1582        pub fn rounded(&self) -> Option<usize> {
1583            match self {
1584                CardinalityEstimate::Estimate(OrderedFloat(f)) => {
1585                    let rounded = f.ceil();
1586                    let flattened = usize::cast_from(
1587                        u64::try_cast_from(rounded)
1588                            .expect("positive and representable cardinality estimate"),
1589                    );
1590
1591                    Some(flattened)
1592                }
1593                CardinalityEstimate::Unknown => None,
1594            }
1595        }
1596    }
1597
1598    impl std::ops::Add for CardinalityEstimate {
1599        type Output = CardinalityEstimate;
1600
1601        fn add(self, rhs: Self) -> Self::Output {
1602            use CardinalityEstimate::*;
1603            match (self, rhs) {
1604                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs + rhs),
1605                _ => Unknown,
1606            }
1607        }
1608    }
1609
1610    impl std::ops::Sub for CardinalityEstimate {
1611        type Output = CardinalityEstimate;
1612
1613        fn sub(self, rhs: Self) -> Self::Output {
1614            use CardinalityEstimate::*;
1615            match (self, rhs) {
1616                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs - rhs),
1617                _ => Unknown,
1618            }
1619        }
1620    }
1621
1622    impl std::ops::Sub<CardinalityEstimate> for f64 {
1623        type Output = CardinalityEstimate;
1624
1625        fn sub(self, rhs: CardinalityEstimate) -> Self::Output {
1626            use CardinalityEstimate::*;
1627            if let Estimate(OrderedFloat(rhs)) = rhs {
1628                Estimate(OrderedFloat(self - rhs))
1629            } else {
1630                Unknown
1631            }
1632        }
1633    }
1634
1635    impl std::ops::Mul for CardinalityEstimate {
1636        type Output = CardinalityEstimate;
1637
1638        fn mul(self, rhs: Self) -> Self::Output {
1639            use CardinalityEstimate::*;
1640            match (self, rhs) {
1641                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs * rhs),
1642                _ => Unknown,
1643            }
1644        }
1645    }
1646
1647    impl std::ops::Mul<f64> for CardinalityEstimate {
1648        type Output = CardinalityEstimate;
1649
1650        fn mul(self, rhs: f64) -> Self::Output {
1651            if let CardinalityEstimate::Estimate(OrderedFloat(lhs)) = self {
1652                CardinalityEstimate::Estimate(OrderedFloat(lhs * rhs))
1653            } else {
1654                CardinalityEstimate::Unknown
1655            }
1656        }
1657    }
1658
1659    impl std::ops::Div for CardinalityEstimate {
1660        type Output = CardinalityEstimate;
1661
1662        fn div(self, rhs: Self) -> Self::Output {
1663            use CardinalityEstimate::*;
1664            match (self, rhs) {
1665                (Estimate(lhs), Estimate(rhs)) => Estimate(lhs / rhs),
1666                _ => Unknown,
1667            }
1668        }
1669    }
1670
1671    impl std::ops::Div<f64> for CardinalityEstimate {
1672        type Output = CardinalityEstimate;
1673
1674        fn div(self, rhs: f64) -> Self::Output {
1675            use CardinalityEstimate::*;
1676            if let Estimate(lhs) = self {
1677                Estimate(lhs / OrderedFloat(rhs))
1678            } else {
1679                Unknown
1680            }
1681        }
1682    }
1683
1684    impl std::iter::Sum for CardinalityEstimate {
1685        fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
1686            iter.fold(CardinalityEstimate::from(0.0), |acc, elt| acc + elt)
1687        }
1688    }
1689
1690    impl std::iter::Product for CardinalityEstimate {
1691        fn product<I: Iterator<Item = Self>>(iter: I) -> Self {
1692            iter.fold(CardinalityEstimate::from(1.0), |acc, elt| acc * elt)
1693        }
1694    }
1695
1696    impl From<usize> for CardinalityEstimate {
1697        fn from(value: usize) -> Self {
1698            Self::Estimate(OrderedFloat(f64::cast_lossy(value)))
1699        }
1700    }
1701
1702    impl From<f64> for CardinalityEstimate {
1703        fn from(value: f64) -> Self {
1704            Self::Estimate(OrderedFloat(value))
1705        }
1706    }
1707
1708    /// The default selectivity for predicates we know nothing about.
1709    ///
1710    /// But see also expr/src/scalar.rs for `FilterCharacteristics::worst_case_scaling_factor()` for a more nuanced take.
1711    pub const WORST_CASE_SELECTIVITY: OrderedFloat<f64> = OrderedFloat(0.1);
1712
1713    // This section defines how we estimate cardinality for each syntactic construct.
1714    //
1715    // We split it up into functions to make it all a bit more tractable to work with.
1716    impl Cardinality {
1717        fn flat_map(&self, tf: &TableFunc, input: CardinalityEstimate) -> CardinalityEstimate {
1718            match tf {
1719                TableFunc::Wrap { types, width } => {
1720                    input * (f64::cast_lossy(types.len()) / f64::cast_lossy(*width))
1721                }
1722                _ => {
1723                    // TODO(mgree) what explosion factor should we make up?
1724                    input * CardinalityEstimate::from(4.0)
1725                }
1726            }
1727        }
1728
1729        fn predicate(
1730            &self,
1731            predicate_expr: &MirScalarExpr,
1732            unique_columns: &BTreeSet<usize>,
1733        ) -> OrderedFloat<f64> {
1734            let index_selectivity = |expr: &MirScalarExpr| -> Option<OrderedFloat<f64>> {
1735                match expr {
1736                    MirScalarExpr::Column(col, _) => {
1737                        if unique_columns.contains(col) {
1738                            // TODO(mgree): when we have index cardinality statistics, they should go here when `expr` is a `MirScalarExpr::Column` that's in `unique_columns`
1739                            None
1740                        } else {
1741                            None
1742                        }
1743                    }
1744                    _ => None,
1745                }
1746            };
1747
1748            match predicate_expr {
1749                MirScalarExpr::Column(_, _)
1750                | MirScalarExpr::Literal(_, _)
1751                | MirScalarExpr::CallUnmaterializable(_) => OrderedFloat(1.0),
1752                MirScalarExpr::CallUnary { func, expr } => match func {
1753                    UnaryFunc::Not(_) => OrderedFloat(1.0) - self.predicate(expr, unique_columns),
1754                    UnaryFunc::IsTrue(_) | UnaryFunc::IsFalse(_) => OrderedFloat(0.5),
1755                    UnaryFunc::IsNull(_) => {
1756                        if let Some(icard) = index_selectivity(expr) {
1757                            icard
1758                        } else {
1759                            WORST_CASE_SELECTIVITY
1760                        }
1761                    }
1762                    _ => WORST_CASE_SELECTIVITY,
1763                },
1764                MirScalarExpr::CallBinary { func, expr1, expr2 } => {
1765                    match func {
1766                        BinaryFunc::Eq(_) => {
1767                            match (index_selectivity(expr1), index_selectivity(expr2)) {
1768                                (Some(isel1), Some(isel2)) => std::cmp::max(isel1, isel2),
1769                                (Some(isel), None) | (None, Some(isel)) => isel,
1770                                (None, None) => WORST_CASE_SELECTIVITY,
1771                            }
1772                        }
1773                        // 1.0 - the Eq case
1774                        BinaryFunc::NotEq(_) => {
1775                            match (index_selectivity(expr1), index_selectivity(expr2)) {
1776                                (Some(isel1), Some(isel2)) => {
1777                                    OrderedFloat(1.0) - std::cmp::max(isel1, isel2)
1778                                }
1779                                (Some(isel), None) | (None, Some(isel)) => OrderedFloat(1.0) - isel,
1780                                (None, None) => OrderedFloat(1.0) - WORST_CASE_SELECTIVITY,
1781                            }
1782                        }
1783                        BinaryFunc::Lt(_)
1784                        | BinaryFunc::Lte(_)
1785                        | BinaryFunc::Gt(_)
1786                        | BinaryFunc::Gte(_) => {
1787                            // TODO(mgree) if we have high/low key values and one of the columns is an index, we can do better
1788                            OrderedFloat(0.33)
1789                        }
1790                        _ => OrderedFloat(1.0), // TOOD(mgree): are there other interesting cases?
1791                    }
1792                }
1793                MirScalarExpr::CallVariadic { func, exprs } => match func {
1794                    VariadicFunc::And => exprs
1795                        .iter()
1796                        .map(|expr| self.predicate(expr, unique_columns))
1797                        .product(),
1798                    VariadicFunc::Or => {
1799                        // TODO(mgree): BETWEEN will get compiled down to an AND of appropriate bounds---we could try to detect it and be clever
1800
1801                        // F(expr1 OR expr2) = F(expr1) + F(expr2) - F(expr1) * F(expr2), but generalized
1802                        let mut exprs = exprs.into_iter();
1803
1804                        let mut expr1;
1805
1806                        if let Some(first) = exprs.next() {
1807                            expr1 = self.predicate(first, unique_columns);
1808                        } else {
1809                            return OrderedFloat(1.0);
1810                        }
1811
1812                        for expr2 in exprs {
1813                            let expr2 = self.predicate(expr2, unique_columns);
1814                            expr1 = expr1 + expr2 - expr1 * expr2;
1815                        }
1816                        expr1
1817                    }
1818                    _ => OrderedFloat(1.0),
1819                },
1820                MirScalarExpr::If { cond: _, then, els } => std::cmp::max(
1821                    self.predicate(then, unique_columns),
1822                    self.predicate(els, unique_columns),
1823                ),
1824            }
1825        }
1826
1827        fn filter(
1828            &self,
1829            predicates: &Vec<MirScalarExpr>,
1830            keys: &Vec<Vec<usize>>,
1831            input: CardinalityEstimate,
1832        ) -> CardinalityEstimate {
1833            // TODO(mgree): should we try to do something for indices built on multiple columns?
1834            let mut unique_columns = BTreeSet::new();
1835            for key in keys {
1836                if key.len() == 1 {
1837                    unique_columns.insert(key[0]);
1838                }
1839            }
1840
1841            let mut estimate = input;
1842            for expr in predicates {
1843                let selectivity = self.predicate(expr, &unique_columns);
1844                debug_assert!(
1845                    OrderedFloat(0.0) <= selectivity && selectivity <= OrderedFloat(1.0),
1846                    "predicate selectivity {selectivity} should be in the range [0,1]"
1847                );
1848                estimate = estimate * selectivity.0;
1849            }
1850
1851            estimate
1852        }
1853
1854        fn join(
1855            &self,
1856            equivalences: &Vec<Vec<MirScalarExpr>>,
1857            _implementation: &JoinImplementation,
1858            unique_columns: BTreeMap<usize, usize>,
1859            mut inputs: Vec<CardinalityEstimate>,
1860        ) -> CardinalityEstimate {
1861            if inputs.is_empty() {
1862                return CardinalityEstimate::from(0.0);
1863            }
1864
1865            for equiv in equivalences {
1866                // those sources which have a unique key
1867                let mut unique_sources = BTreeSet::new();
1868                let mut all_unique = true;
1869
1870                for expr in equiv {
1871                    if let MirScalarExpr::Column(col, _) = expr {
1872                        if let Some(idx) = unique_columns.get(col) {
1873                            unique_sources.insert(*idx);
1874                        } else {
1875                            all_unique = false;
1876                        }
1877                    } else {
1878                        all_unique = false;
1879                    }
1880                }
1881
1882                // no unique columns in this equivalence
1883                if unique_sources.is_empty() {
1884                    continue;
1885                }
1886
1887                // ALL unique columns in this equivalence
1888                if all_unique {
1889                    // these inputs have unique keys for _all_ of the equivalence, so they're a bound on how many rows we'll get from those sources
1890                    // we'll find the leftmost such input and use it to hold the minimum; the other sources we set to 1.0 (so they have no effect)
1891                    let mut sources = unique_sources.iter();
1892
1893                    let lhs_idx = *sources.next().unwrap();
1894                    let mut lhs =
1895                        std::mem::replace(&mut inputs[lhs_idx], CardinalityEstimate::from(1.0));
1896                    for &rhs_idx in sources {
1897                        let rhs =
1898                            std::mem::replace(&mut inputs[rhs_idx], CardinalityEstimate::from(1.0));
1899                        lhs = CardinalityEstimate::min(lhs, rhs);
1900                    }
1901
1902                    inputs[lhs_idx] = lhs;
1903
1904                    // best option! go look at the next equivalence
1905                    continue;
1906                }
1907
1908                // some unique columns in this equivalence
1909                for idx in unique_sources {
1910                    // when joining R and S on R.x = S.x, if R.x is unique and S.x is not, we're bounded above by the cardinality of S
1911                    inputs[idx] = CardinalityEstimate::from(1.0);
1912                }
1913            }
1914
1915            let mut product = CardinalityEstimate::from(1.0);
1916            for input in inputs {
1917                product = product * input;
1918            }
1919            product
1920        }
1921
1922        fn reduce(
1923            &self,
1924            group_key: &Vec<MirScalarExpr>,
1925            expected_group_size: &Option<u64>,
1926            input: CardinalityEstimate,
1927        ) -> CardinalityEstimate {
1928            // TODO(mgree): if no `group_key` is present, we can do way better
1929
1930            if let Some(group_size) = expected_group_size {
1931                input / f64::cast_lossy(*group_size)
1932            } else if group_key.is_empty() {
1933                CardinalityEstimate::from(1.0)
1934            } else {
1935                // in the worst case, every row is its own group
1936                input
1937            }
1938        }
1939
1940        fn topk(
1941            &self,
1942            group_key: &Vec<usize>,
1943            limit: &Option<MirScalarExpr>,
1944            expected_group_size: &Option<u64>,
1945            input: CardinalityEstimate,
1946        ) -> CardinalityEstimate {
1947            // TODO: support simple arithmetic expressions
1948            let k = limit
1949                .as_ref()
1950                .and_then(|l| l.as_literal_int64())
1951                .map_or(1, |l| std::cmp::max(0, l));
1952
1953            if let Some(group_size) = expected_group_size {
1954                input * (f64::cast_lossy(k) / f64::cast_lossy(*group_size))
1955            } else if group_key.is_empty() {
1956                CardinalityEstimate::from(f64::cast_lossy(k))
1957            } else {
1958                // in the worst case, every row is its own group
1959                input.clone()
1960            }
1961        }
1962
1963        fn threshold(&self, input: CardinalityEstimate) -> CardinalityEstimate {
1964            // worst case scaling factor is 1
1965            input.clone()
1966        }
1967    }
1968
1969    impl Analysis for Cardinality {
1970        type Value = CardinalityEstimate;
1971
1972        fn announce_dependencies(builder: &mut crate::analysis::DerivedBuilder) {
1973            builder.require(crate::analysis::Arity);
1974            builder.require(crate::analysis::UniqueKeys);
1975        }
1976
1977        fn derive(
1978            &self,
1979            expr: &MirRelationExpr,
1980            index: usize,
1981            results: &[Self::Value],
1982            depends: &crate::analysis::Derived,
1983        ) -> Self::Value {
1984            use MirRelationExpr::*;
1985
1986            let sizes = depends.as_view().results::<SubtreeSize>();
1987            let arity = depends.as_view().results::<Arity>();
1988            let keys = depends.as_view().results::<UniqueKeys>();
1989
1990            match expr {
1991                Constant { rows, .. } => {
1992                    CardinalityEstimate::from(rows.as_ref().map_or_else(|_| 0, |v| v.len()))
1993                }
1994                Get { id, .. } => match id {
1995                    Id::Local(id) => depends
1996                        .bindings()
1997                        .get(id)
1998                        .and_then(|id| results.get(*id))
1999                        .copied()
2000                        .unwrap_or(CardinalityEstimate::Unknown),
2001                    Id::Global(id) => self
2002                        .stats
2003                        .get(id)
2004                        .copied()
2005                        .map(CardinalityEstimate::from)
2006                        .unwrap_or(CardinalityEstimate::Unknown),
2007                },
2008                Let { .. } | Project { .. } | Map { .. } | ArrangeBy { .. } | Negate { .. } => {
2009                    results[index - 1].clone()
2010                }
2011                LetRec { .. } =>
2012                // TODO(mgree): implement a recurrence-based approach (or at least identify common idioms, e.g. transitive closure)
2013                {
2014                    CardinalityEstimate::Unknown
2015                }
2016                Union { base: _, inputs: _ } => depends
2017                    .children_of_rev(index, expr.children().count())
2018                    .map(|off| results[off].clone())
2019                    .sum(),
2020                FlatMap { func, .. } => {
2021                    let input = results[index - 1];
2022                    self.flat_map(func, input)
2023                }
2024                Filter { predicates, .. } => {
2025                    let input = results[index - 1];
2026                    let keys = depends.results::<UniqueKeys>();
2027                    let keys = &keys[index - 1];
2028                    self.filter(predicates, keys, input)
2029                }
2030                Join {
2031                    equivalences,
2032                    implementation,
2033                    inputs,
2034                    ..
2035                } => {
2036                    let mut input_results = Vec::with_capacity(inputs.len());
2037
2038                    // maps a column to the index in `inputs` that it belongs to
2039                    let mut unique_columns = BTreeMap::new();
2040                    let mut key_offset = 0;
2041
2042                    let mut offset = 1;
2043                    for idx in 0..inputs.len() {
2044                        let input = results[index - offset];
2045                        input_results.push(input);
2046
2047                        let arity = arity[index - offset];
2048                        let keys = &keys[index - offset];
2049                        for key in keys {
2050                            if key.len() == 1 {
2051                                unique_columns.insert(key_offset + key[0], idx);
2052                            }
2053                        }
2054                        key_offset += arity;
2055
2056                        offset += &sizes[index - offset];
2057                    }
2058
2059                    self.join(equivalences, implementation, unique_columns, input_results)
2060                }
2061                Reduce {
2062                    group_key,
2063                    expected_group_size,
2064                    ..
2065                } => {
2066                    let input = results[index - 1];
2067                    self.reduce(group_key, expected_group_size, input)
2068                }
2069                TopK {
2070                    group_key,
2071                    limit,
2072                    expected_group_size,
2073                    ..
2074                } => {
2075                    let input = results[index - 1];
2076                    self.topk(group_key, limit, expected_group_size, input)
2077                }
2078                Threshold { .. } => {
2079                    let input = results[index - 1];
2080                    self.threshold(input)
2081                }
2082            }
2083        }
2084    }
2085
2086    impl std::fmt::Display for CardinalityEstimate {
2087        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2088            match self {
2089                CardinalityEstimate::Estimate(OrderedFloat(estimate)) => write!(f, "{estimate}"),
2090                CardinalityEstimate::Unknown => write!(f, "<UNKNOWN>"),
2091            }
2092        }
2093    }
2094}
2095
2096mod common_lattice {
2097    use crate::analysis::Lattice;
2098
2099    pub struct BoolLattice;
2100
2101    impl Lattice<bool> for BoolLattice {
2102        // `true` > `false`.
2103        fn top(&self) -> bool {
2104            true
2105        }
2106        // `false` is the greatest lower bound. `into` changes if it's true and `item` is false.
2107        fn meet_assign(&self, into: &mut bool, item: bool) -> bool {
2108            let changed = *into && !item;
2109            *into = *into && item;
2110            changed
2111        }
2112    }
2113}