mz_transform/demand.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Transformation based on pushing demand information about columns toward sources.
11
12use itertools::Itertools;
13use mz_ore::assert_none;
14use std::collections::{BTreeMap, BTreeSet};
15
16use mz_expr::{
17 AggregateExpr, AggregateFunc, Id, JoinInputMapper, MirRelationExpr, MirScalarExpr,
18 RECURSION_LIMIT,
19};
20use mz_ore::stack::{CheckedRecursion, RecursionGuard};
21use mz_repr::{Datum, Row};
22
23use crate::TransformCtx;
24
25/// Drive demand from the root through operators.
26///
27/// This transform alerts operators to their columns that influence the
28/// ultimate output of the expression, and gives them permission to swap
29/// other columns for dummy values. As part of this, operators should not
30/// actually use any of these dummy values, lest they run-time error.
31///
32/// This transformation primarily informs the `Join` operator, which can
33/// simplify its intermediate state when it knows that certain columns are
34/// not observed in its output. Internal arrangements need not maintain
35/// columns that are no longer required in the join pipeline, which are
36/// those columns not required by the output nor any further equalities.
37///
38/// Nowadays, this transform is mostly obsoleted by `ProjectionPushdown`.
39/// However, I know of one thing that it still does that `ProjectionPushdown`
40/// doesn't do (there might be more such things):
41/// if you have something like
42/// ```code
43/// Project (#0, #1)
44/// Join on=(#0 = #1)
45/// ```
46/// then this is turned into
47/// ```code
48/// Project (#0, #0)
49/// Join on=(#0 = #1)
50/// ```
51/// This can be beneficial for projecting out some columns earlier inside a complex join (by the LIR
52/// planning), and then recovering them after the join (if needed) by duplicating existing columns.
53///
54/// After the last run of `Demand`, there should always be a `ProjectionPushdown`, so that dummies
55/// are eliminated from plans.
56#[derive(Debug)]
57pub struct Demand {
58 recursion_guard: RecursionGuard,
59}
60
61impl Default for Demand {
62 fn default() -> Demand {
63 Demand {
64 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
65 }
66 }
67}
68
69impl CheckedRecursion for Demand {
70 fn recursion_guard(&self) -> &RecursionGuard {
71 &self.recursion_guard
72 }
73}
74
75impl crate::Transform for Demand {
76 fn name(&self) -> &'static str {
77 "Demand"
78 }
79
80 #[mz_ore::instrument(
81 target = "optimizer",
82 level = "debug",
83 fields(path.segment = "demand")
84 )]
85 fn actually_perform_transform(
86 &self,
87 relation: &mut MirRelationExpr,
88 _: &mut TransformCtx,
89 ) -> Result<(), crate::TransformError> {
90 let result = self.action(
91 relation,
92 (0..relation.arity()).collect(),
93 &mut BTreeMap::new(),
94 );
95 mz_repr::explain::trace_plan(&*relation);
96 result
97 }
98}
99
100impl Demand {
101 /// Columns to be produced.
102 fn action(
103 &self,
104 relation: &mut MirRelationExpr,
105 mut columns: BTreeSet<usize>,
106 gets: &mut BTreeMap<Id, BTreeSet<usize>>,
107 ) -> Result<(), crate::TransformError> {
108 self.checked_recur(|_| {
109 // A valid relation type is only needed for Maps, but we can't borrow
110 // the relation in the corresponding branch of the match statement, since
111 // it is already borrowed mutably.
112 let relation_type = if matches!(relation, MirRelationExpr::Map { .. }) {
113 Some(relation.typ())
114 } else {
115 None
116 };
117 match relation {
118 MirRelationExpr::Constant { .. } => {
119 // Nothing clever to do with constants, that I can think of.
120 Ok(())
121 }
122 MirRelationExpr::Get { id, .. } => {
123 gets.entry(*id)
124 .or_insert_with(BTreeSet::new)
125 .extend(columns);
126 Ok(())
127 }
128 MirRelationExpr::Let { id, value, body } => {
129 // Let harvests any requirements of get from its body,
130 // and pushes the union of the requirements at its value.
131 let id = Id::Local(*id);
132 let prior = gets.insert(id, BTreeSet::new());
133 assert_none!(prior); // no shadowing
134 self.action(body, columns, gets)?;
135 let needs = gets.remove(&id).expect("existing gets entry");
136 if let Some(prior) = prior {
137 gets.insert(id, prior);
138 }
139
140 self.action(value, needs, gets)
141 }
142 MirRelationExpr::LetRec {
143 ids,
144 values,
145 limits: _,
146 body,
147 } => {
148 let ids_used_across_iterations = MirRelationExpr::recursive_ids(ids, values)
149 .iter()
150 .map(|id| Id::Local(*id))
151 .collect::<BTreeSet<_>>();
152 let ids = ids.iter().map(|id| Id::Local(*id)).collect_vec();
153 for id in ids.iter() {
154 let prior = gets.insert(id.clone(), BTreeSet::new());
155 assert_none!(prior); // no shadowing
156 }
157 self.action(body, columns, gets)?;
158 for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
159 let needs = if !ids_used_across_iterations.contains(id) {
160 gets.remove(id).expect("existing gets entry")
161 } else {
162 // Remove, but ignore the collected needs
163 gets.remove(id).expect("existing gets entry");
164 // Instead of using `gets`, we'll say we need all columns for a
165 // recursive id
166 (0..value.arity()).collect::<BTreeSet<_>>()
167 };
168 self.action(value, needs, gets)?;
169 }
170 Ok(())
171 }
172 MirRelationExpr::Project { input, outputs } => self.action(
173 input,
174 columns.into_iter().map(|c| outputs[c]).collect(),
175 gets,
176 ),
177 MirRelationExpr::Map { input, scalars } => {
178 let relation_type = relation_type.as_ref().unwrap();
179 let arity = relation_type.arity() - scalars.len();
180 // contains columns whose supports have yet to be explored
181 let mut new_columns = columns.clone();
182 new_columns.retain(|c| *c >= arity);
183 while !new_columns.is_empty() {
184 // explore supports
185 new_columns = new_columns
186 .iter()
187 .flat_map(|c| scalars[*c - arity].support())
188 .filter(|c| !columns.contains(c))
189 .collect();
190 // add those columns to the seen list
191 columns.extend(new_columns.clone());
192 new_columns.retain(|c| *c >= arity);
193 }
194
195 // Replace un-read expressions with literals to prevent evaluation.
196 for (index, scalar) in scalars.iter_mut().enumerate() {
197 if !columns.contains(&(arity + index)) {
198 // Leave literals as they are, to benefit explain.
199 if !scalar.is_literal() {
200 let typ = relation_type.column_types[arity + index].clone();
201 *scalar = MirScalarExpr::Literal(
202 Ok(Row::pack_slice(&[Datum::Dummy])),
203 typ,
204 );
205 }
206 }
207 }
208
209 columns.retain(|c| *c < arity);
210 self.action(input, columns, gets)
211 }
212 MirRelationExpr::FlatMap {
213 input,
214 func: _,
215 exprs,
216 } => {
217 // A FlatMap which returns zero rows acts like a filter
218 // so we always need to execute it
219 for expr in exprs {
220 expr.support_into(&mut columns);
221 }
222 let arity = input.arity();
223 columns.retain(|c| *c < arity);
224 self.action(input, columns, gets)
225 }
226 MirRelationExpr::Filter { input, predicates } => {
227 for predicate in predicates {
228 predicate.support_into(&mut columns)
229 }
230 self.action(input, columns, gets)
231 }
232 MirRelationExpr::Join {
233 inputs,
234 equivalences,
235 implementation: _,
236 } => {
237 let input_mapper = JoinInputMapper::new(inputs);
238
239 // Each produced column that is equivalent to a prior column should be remapped
240 // so that upstream uses depend only on the first column, simplifying the demand
241 // analysis. In principle we could choose any representative, if it turns out
242 // that some other column would have been more helpful, but we don't have a great
243 // reason to do that at the moment.
244 let mut permutation: Vec<usize> = (0..input_mapper.total_columns()).collect();
245 for equivalence in equivalences.iter() {
246 let mut first_column = None;
247 for expr in equivalence.iter() {
248 if let MirScalarExpr::Column(c) = expr {
249 if let Some(prior) = &first_column {
250 permutation[*c] = *prior;
251 } else {
252 first_column = Some(*c);
253 }
254 }
255 }
256 }
257
258 let should_permute = columns.iter().any(|c| permutation[*c] != *c);
259
260 // Each equivalence class imposes internal demand for columns.
261 for equivalence in equivalences.iter() {
262 for expr in equivalence.iter() {
263 expr.support_into(&mut columns);
264 }
265 }
266
267 // Populate child demands from external and internal demands.
268 let new_columns = input_mapper.split_column_set_by_input(columns.iter());
269
270 // Recursively indicate the requirements.
271 for (input, columns) in inputs.iter_mut().zip(new_columns) {
272 self.action(input, columns, gets)?;
273 }
274
275 // Install a permutation if any demanded column is not the
276 // canonical column.
277 if should_permute {
278 *relation = relation.take_dangerous().project(permutation);
279 }
280
281 Ok(())
282 }
283 MirRelationExpr::Reduce {
284 input,
285 group_key,
286 aggregates,
287 monotonic: _,
288 expected_group_size: _,
289 } => {
290 let mut new_columns = BTreeSet::new();
291 // Group keys determine aggregation granularity and are
292 // each crucial in determining aggregates and even the
293 // multiplicities of other keys.
294 for k in group_key.iter() {
295 k.support_into(&mut new_columns)
296 }
297 for column in columns.iter() {
298 // No obvious requirements on aggregate columns.
299 // A "non-empty" requirement, I guess?
300 if *column >= group_key.len() {
301 aggregates[*column - group_key.len()]
302 .expr
303 .support_into(&mut new_columns);
304 }
305 }
306
307 // Replace un-demanded aggregations with dummies.
308 let input_type = input.typ();
309 for index in (0..aggregates.len()).rev() {
310 if !columns.contains(&(group_key.len() + index)) {
311 let typ = aggregates[index].typ(&input_type.column_types);
312 aggregates[index] = AggregateExpr {
313 func: AggregateFunc::Dummy,
314 expr: MirScalarExpr::literal_ok(Datum::Dummy, typ.scalar_type),
315 distinct: false,
316 };
317 }
318 }
319
320 self.action(input, new_columns, gets)
321 }
322 MirRelationExpr::TopK {
323 input,
324 group_key,
325 order_key,
326 limit,
327 ..
328 } => {
329 // Group and order keys and limit must be retained, as they
330 // define which rows are retained.
331 columns.extend(group_key.iter().cloned());
332 columns.extend(order_key.iter().map(|o| o.column));
333 if let Some(limit) = limit {
334 // Strictly speaking not needed because the
335 // `limit` support should be a subset of the
336 // `group_key` support, but we don't want to
337 // take this for granted here.
338 limit.support_into(&mut columns)
339 }
340 self.action(input, columns, gets)
341 }
342 MirRelationExpr::Negate { input } => self.action(input, columns, gets),
343 MirRelationExpr::Threshold { input } => {
344 // Threshold requires all columns, as collapsing any distinct values
345 // has the potential to change how it thresholds counts. This could
346 // be improved with reasoning about distinctness or non-negativity.
347 let arity = input.arity();
348 self.action(input, (0..arity).collect(), gets)
349 }
350 MirRelationExpr::Union { base, inputs } => {
351 self.action(base, columns.clone(), gets)?;
352 for input in inputs {
353 self.action(input, columns.clone(), gets)?;
354 }
355 Ok(())
356 }
357 MirRelationExpr::ArrangeBy { input, keys } => {
358 for key_set in keys {
359 for key in key_set {
360 key.support_into(&mut columns);
361 }
362 }
363 self.action(input, columns, gets)
364 }
365 }
366 })
367 }
368}