Skip to main content

mz_transform/
will_distinct.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Transform that allows expressions to arbitrarily vary the magnitude of the multiplicity of each row.
11//!
12//! This is most commonly from a `Distinct` operation, which flattens all positive multiplicities to one.
13//! When a `Distinct` will certainly be applied, its input expressions are allowed to change the magnitudes
14//! of their record multiplicities, for example removing other `Distinct` operations that are redundant.
15
16use itertools::Itertools;
17use mz_expr::MirRelationExpr;
18
19use crate::analysis::{DerivedView, NonNegative};
20
21use crate::{TransformCtx, TransformError};
22
23/// Pushes down the information that a collection will be subjected to a `Distinct`.
24///
25/// This intends to recognize idiomatic stacks of `UNION` from SQL, which look like `Distinct` over `Union`, potentially
26/// over other `Distinct` expressions. It is only able to see patterns of `DISTINCT UNION DISTINCT, ..` and other operators
27/// in between will prevent the necessary insight to remove the second `DISTINCT`.
28///
29/// There are other potential optimizations, for example fusing multiple reads of the same source, where distinctness may
30/// mean they could be a single read (with additional requirements).
31#[derive(Debug, Default)]
32pub struct WillDistinct;
33
34impl crate::Transform for WillDistinct {
35    fn name(&self) -> &'static str {
36        "WillDistinct"
37    }
38
39    #[mz_ore::instrument(
40        target = "optimizer"
41        level = "trace",
42        fields(path.segment = "will_distinct")
43    )]
44    fn actually_perform_transform(
45        &self,
46        relation: &mut MirRelationExpr,
47        ctx: &mut TransformCtx,
48    ) -> Result<(), TransformError> {
49        // Perform bottom-up equivalence class analysis.
50        use crate::analysis::DerivedBuilder;
51        let mut builder = DerivedBuilder::new(ctx.features);
52        builder.require(NonNegative);
53        let derived = builder.visit(relation);
54        let derived = derived.as_view();
55
56        self.apply(
57            relation,
58            derived,
59            ctx.features.enable_will_distinct_propagation,
60        );
61
62        mz_repr::explain::trace_plan(&*relation);
63        Ok(())
64    }
65}
66
67impl WillDistinct {
68    fn apply(&self, expr: &mut MirRelationExpr, derived: DerivedView, generalize: bool) {
69        // Maintain a todo list of triples of 1. expression, 2. child analysis results, and 3. a "will distinct" bit.
70        // The "will distinct" bit says that a subsequent operator will make the specific multiplicities of each record
71        // irrelevant, and the expression only needs to present the correct *multiset* of records, with any positive
72        // cardinality allowed.
73        let mut todo = vec![(expr, derived, false)];
74        while let Some((expr, derived, distinct_by)) = todo.pop() {
75            // If we find a `Distinct` expression in the shadow of another `Distinct` that will apply to its key columns,
76            // we can remove this `Distinct` operator as the distinctness will be enforced by the other expression.
77            if let (
78                MirRelationExpr::Reduce {
79                    input,
80                    group_key,
81                    aggregates,
82                    ..
83                },
84                true,
85            ) = (&mut *expr, distinct_by)
86            {
87                if aggregates.is_empty() {
88                    // We can remove the `Distinct`, but we must install a `Map` and a `Project` to implement that
89                    // aspect of the operator. We do this by hand so that we can still descend down `input` and
90                    // continue to remove shadowed `Distinct` operators.
91                    let arity = input.arity();
92                    *expr = MirRelationExpr::Project {
93                        outputs: (arity..arity + group_key.len()).collect::<Vec<_>>(),
94                        input: Box::new(MirRelationExpr::Map {
95                            scalars: group_key.clone(),
96                            input: Box::new(input.take_dangerous()),
97                        }),
98                    };
99                    // We are certain to have a specific pattern of AST nodes, which we need to push through so that
100                    // we can continue recursively.
101                    if let MirRelationExpr::Project { input, .. } = expr {
102                        // `input` is a `Map` node, but it has a single input like the `Distinct` it came from.
103                        // Although it reads a bit weird, this lines up the child of the distinct with its derived
104                        // analysis results.
105                        todo.extend(
106                            input
107                                .children_mut()
108                                .rev()
109                                .zip_eq(derived.children_rev())
110                                .map(|(x, y)| (x, y, true)),
111                        );
112                    }
113                } else {
114                    todo.extend(
115                        expr.children_mut()
116                            .rev()
117                            .zip_eq(derived.children_rev())
118                            .map(|(x, y)| (x, y, false)),
119                    );
120                }
121            } else {
122                match expr {
123                    MirRelationExpr::Reduce {
124                        input, aggregates, ..
125                    } => {
126                        if aggregates.is_empty() {
127                            todo.push((input, derived.last_child(), true));
128                        } else {
129                            todo.push((input, derived.last_child(), false))
130                        }
131                    }
132                    MirRelationExpr::TopK {
133                        input,
134                        limit,
135                        offset,
136                        ..
137                    } => {
138                        // A `TopK` masks input magnitudes (behaving like a `Distinct`) only when it
139                        // returns the single first row per group: `limit == 1` and `offset == 0`.
140                        // With a non-zero offset, *which* row survives depends on the cumulative
141                        // multiplicities of the rows skipped, so a descendant that alters magnitudes
142                        // could shift the offset boundary and select a different row.
143                        if generalize
144                            && *offset == 0
145                            && limit.as_ref().and_then(|e| e.as_literal_int64()) == Some(1)
146                        {
147                            todo.push((input, derived.last_child(), true));
148                        } else {
149                            todo.push((input, derived.last_child(), false));
150                        }
151                    }
152                    // The masking `Distinct` above depends only on the *signs* of multiplicities,
153                    // not their magnitudes. The following operators are sign-preserving: given the
154                    // same input signs they produce the same output signs, because they never sum
155                    // two distinct input rows into a single output row.
156                    //   * `Map`/`Filter`/`Threshold` pass rows through (Filter/Threshold may drop
157                    //     rows, but never merge them).
158                    //   * `Negate` flips every sign uniformly, which the magnitude-masking
159                    //     `Distinct` is invariant to.
160                    //   * `FlatMap` retains the input columns, so distinct input rows stay distinct
161                    //     in the output; only same-input duplicates are scaled, by a non-negative
162                    //     count. Hence no cross-row cancellation.
163                    // Because none of these can cancel rows against each other, the permission to
164                    // alter magnitudes pushes through unconditionally. `Project` and `Union`, by
165                    // contrast, *can* sum across rows (Project by dropping distinguishing columns,
166                    // Union by combining inputs), so they require the `NonNegative` gate below to
167                    // rule out sign-changing cancellation.
168                    MirRelationExpr::Map { input, .. } => {
169                        todo.push((input, derived.last_child(), generalize && distinct_by));
170                    }
171                    MirRelationExpr::Filter { input, .. } => {
172                        todo.push((input, derived.last_child(), generalize && distinct_by));
173                    }
174                    MirRelationExpr::FlatMap { input, .. } => {
175                        todo.push((input, derived.last_child(), generalize && distinct_by));
176                    }
177                    MirRelationExpr::Threshold { input, .. } => {
178                        todo.push((input, derived.last_child(), generalize && distinct_by));
179                    }
180                    MirRelationExpr::Negate { input, .. } => {
181                        todo.push((input, derived.last_child(), generalize && distinct_by));
182                    }
183                    MirRelationExpr::Project { input, .. } => {
184                        // Project needs a non-negative input to ensure output polarity does not change.
185                        // Two inputs that collapse to be one output, if their input polarities are different,
186                        // could end with either output polarity (or zero).
187                        if generalize && *derived.last_child().value::<NonNegative>().unwrap() {
188                            todo.push((input, derived.last_child(), distinct_by));
189                        } else {
190                            todo.push((input, derived.last_child(), false));
191                        }
192                    }
193                    // `Join` deliberately falls through to the catch-all arm below (resetting the
194                    // bit to false). Pushing distinct elision through a join would work against
195                    // Semijoin elision, which needs to see distinct-ed inputs.
196                    //
197                    // If all inputs to the union are non-negative, any distinct enforced above the expression can be
198                    // communicated on to each input.
199                    MirRelationExpr::Union { base, inputs } => {
200                        if derived
201                            .children_rev()
202                            .all(|v| *v.value::<NonNegative>().unwrap())
203                        {
204                            let children_rev = inputs.iter_mut().rev().chain(Some(&mut **base));
205                            todo.extend(
206                                children_rev
207                                    .zip_eq(derived.children_rev())
208                                    .map(|(x, y)| (x, y, distinct_by)),
209                            );
210                        } else {
211                            let children_rev = inputs.iter_mut().rev().chain(Some(&mut **base));
212                            todo.extend(
213                                children_rev
214                                    .zip_eq(derived.children_rev())
215                                    .map(|(x, y)| (x, y, false)),
216                            );
217                        }
218                    }
219                    x => {
220                        todo.extend(
221                            x.children_mut()
222                                .rev()
223                                .zip_eq(derived.children_rev())
224                                .map(|(x, y)| (x, y, false)),
225                        );
226                    }
227                }
228            }
229        }
230    }
231}