mz_transform/will_distinct.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Transform that allows expressions to arbitrarily vary the magnitude of the multiplicity of each row.
11//!
12//! This is most commonly from a `Distinct` operation, which flattens all positive multiplicities to one.
13//! When a `Distinct` will certainly be applied, its input expressions are allowed to change the magnitudes
14//! of their record multiplicities, for example removing other `Distinct` operations that are redundant.
15
16use itertools::Itertools;
17use mz_expr::MirRelationExpr;
18
19use crate::analysis::{DerivedView, NonNegative};
20
21use crate::{TransformCtx, TransformError};
22
23/// Pushes down the information that a collection will be subjected to a `Distinct`.
24///
25/// This intends to recognize idiomatic stacks of `UNION` from SQL, which look like `Distinct` over `Union`, potentially
26/// over other `Distinct` expressions. It is only able to see patterns of `DISTINCT UNION DISTINCT, ..` and other operators
27/// in between will prevent the necessary insight to remove the second `DISTINCT`.
28///
29/// There are other potential optimizations, for example fusing multiple reads of the same source, where distinctness may
30/// mean they could be a single read (with additional requirements).
31#[derive(Debug, Default)]
32pub struct WillDistinct;
33
34impl crate::Transform for WillDistinct {
35 fn name(&self) -> &'static str {
36 "WillDistinct"
37 }
38
39 #[mz_ore::instrument(
40 target = "optimizer"
41 level = "trace",
42 fields(path.segment = "will_distinct")
43 )]
44 fn actually_perform_transform(
45 &self,
46 relation: &mut MirRelationExpr,
47 ctx: &mut TransformCtx,
48 ) -> Result<(), TransformError> {
49 // Perform bottom-up equivalence class analysis.
50 use crate::analysis::DerivedBuilder;
51 let mut builder = DerivedBuilder::new(ctx.features);
52 builder.require(NonNegative);
53 let derived = builder.visit(relation);
54 let derived = derived.as_view();
55
56 self.apply(
57 relation,
58 derived,
59 ctx.features.enable_will_distinct_propagation,
60 );
61
62 mz_repr::explain::trace_plan(&*relation);
63 Ok(())
64 }
65}
66
67impl WillDistinct {
68 fn apply(&self, expr: &mut MirRelationExpr, derived: DerivedView, generalize: bool) {
69 // Maintain a todo list of triples of 1. expression, 2. child analysis results, and 3. a "will distinct" bit.
70 // The "will distinct" bit says that a subsequent operator will make the specific multiplicities of each record
71 // irrelevant, and the expression only needs to present the correct *multiset* of records, with any positive
72 // cardinality allowed.
73 let mut todo = vec![(expr, derived, false)];
74 while let Some((expr, derived, distinct_by)) = todo.pop() {
75 // If we find a `Distinct` expression in the shadow of another `Distinct` that will apply to its key columns,
76 // we can remove this `Distinct` operator as the distinctness will be enforced by the other expression.
77 if let (
78 MirRelationExpr::Reduce {
79 input,
80 group_key,
81 aggregates,
82 ..
83 },
84 true,
85 ) = (&mut *expr, distinct_by)
86 {
87 if aggregates.is_empty() {
88 // We can remove the `Distinct`, but we must install a `Map` and a `Project` to implement that
89 // aspect of the operator. We do this by hand so that we can still descend down `input` and
90 // continue to remove shadowed `Distinct` operators.
91 let arity = input.arity();
92 *expr = MirRelationExpr::Project {
93 outputs: (arity..arity + group_key.len()).collect::<Vec<_>>(),
94 input: Box::new(MirRelationExpr::Map {
95 scalars: group_key.clone(),
96 input: Box::new(input.take_dangerous()),
97 }),
98 };
99 // We are certain to have a specific pattern of AST nodes, which we need to push through so that
100 // we can continue recursively.
101 if let MirRelationExpr::Project { input, .. } = expr {
102 // `input` is a `Map` node, but it has a single input like the `Distinct` it came from.
103 // Although it reads a bit weird, this lines up the child of the distinct with its derived
104 // analysis results.
105 todo.extend(
106 input
107 .children_mut()
108 .rev()
109 .zip_eq(derived.children_rev())
110 .map(|(x, y)| (x, y, true)),
111 );
112 }
113 } else {
114 todo.extend(
115 expr.children_mut()
116 .rev()
117 .zip_eq(derived.children_rev())
118 .map(|(x, y)| (x, y, false)),
119 );
120 }
121 } else {
122 match expr {
123 MirRelationExpr::Reduce {
124 input, aggregates, ..
125 } => {
126 if aggregates.is_empty() {
127 todo.push((input, derived.last_child(), true));
128 } else {
129 todo.push((input, derived.last_child(), false))
130 }
131 }
132 MirRelationExpr::TopK {
133 input,
134 limit,
135 offset,
136 ..
137 } => {
138 // A `TopK` masks input magnitudes (behaving like a `Distinct`) only when it
139 // returns the single first row per group: `limit == 1` and `offset == 0`.
140 // With a non-zero offset, *which* row survives depends on the cumulative
141 // multiplicities of the rows skipped, so a descendant that alters magnitudes
142 // could shift the offset boundary and select a different row.
143 if generalize
144 && *offset == 0
145 && limit.as_ref().and_then(|e| e.as_literal_int64()) == Some(1)
146 {
147 todo.push((input, derived.last_child(), true));
148 } else {
149 todo.push((input, derived.last_child(), false));
150 }
151 }
152 // The masking `Distinct` above depends only on the *signs* of multiplicities,
153 // not their magnitudes. The following operators are sign-preserving: given the
154 // same input signs they produce the same output signs, because they never sum
155 // two distinct input rows into a single output row.
156 // * `Map`/`Filter`/`Threshold` pass rows through (Filter/Threshold may drop
157 // rows, but never merge them).
158 // * `Negate` flips every sign uniformly, which the magnitude-masking
159 // `Distinct` is invariant to.
160 // * `FlatMap` retains the input columns, so distinct input rows stay distinct
161 // in the output; only same-input duplicates are scaled, by a non-negative
162 // count. Hence no cross-row cancellation.
163 // Because none of these can cancel rows against each other, the permission to
164 // alter magnitudes pushes through unconditionally. `Project` and `Union`, by
165 // contrast, *can* sum across rows (Project by dropping distinguishing columns,
166 // Union by combining inputs), so they require the `NonNegative` gate below to
167 // rule out sign-changing cancellation.
168 MirRelationExpr::Map { input, .. } => {
169 todo.push((input, derived.last_child(), generalize && distinct_by));
170 }
171 MirRelationExpr::Filter { input, .. } => {
172 todo.push((input, derived.last_child(), generalize && distinct_by));
173 }
174 MirRelationExpr::FlatMap { input, .. } => {
175 todo.push((input, derived.last_child(), generalize && distinct_by));
176 }
177 MirRelationExpr::Threshold { input, .. } => {
178 todo.push((input, derived.last_child(), generalize && distinct_by));
179 }
180 MirRelationExpr::Negate { input, .. } => {
181 todo.push((input, derived.last_child(), generalize && distinct_by));
182 }
183 MirRelationExpr::Project { input, .. } => {
184 // Project needs a non-negative input to ensure output polarity does not change.
185 // Two inputs that collapse to be one output, if their input polarities are different,
186 // could end with either output polarity (or zero).
187 if generalize && *derived.last_child().value::<NonNegative>().unwrap() {
188 todo.push((input, derived.last_child(), distinct_by));
189 } else {
190 todo.push((input, derived.last_child(), false));
191 }
192 }
193 // `Join` deliberately falls through to the catch-all arm below (resetting the
194 // bit to false). Pushing distinct elision through a join would work against
195 // Semijoin elision, which needs to see distinct-ed inputs.
196 //
197 // If all inputs to the union are non-negative, any distinct enforced above the expression can be
198 // communicated on to each input.
199 MirRelationExpr::Union { base, inputs } => {
200 if derived
201 .children_rev()
202 .all(|v| *v.value::<NonNegative>().unwrap())
203 {
204 let children_rev = inputs.iter_mut().rev().chain(Some(&mut **base));
205 todo.extend(
206 children_rev
207 .zip_eq(derived.children_rev())
208 .map(|(x, y)| (x, y, distinct_by)),
209 );
210 } else {
211 let children_rev = inputs.iter_mut().rev().chain(Some(&mut **base));
212 todo.extend(
213 children_rev
214 .zip_eq(derived.children_rev())
215 .map(|(x, y)| (x, y, false)),
216 );
217 }
218 }
219 x => {
220 todo.extend(
221 x.children_mut()
222 .rev()
223 .zip_eq(derived.children_rev())
224 .map(|(x, y)| (x, y, false)),
225 );
226 }
227 }
228 }
229 }
230 }
231}