mz_transform/
demand.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Transformation based on pushing demand information about columns toward sources.
11
12use itertools::Itertools;
13use mz_ore::assert_none;
14use std::collections::{BTreeMap, BTreeSet};
15
16use mz_expr::{
17    AggregateExpr, AggregateFunc, Id, JoinInputMapper, MirRelationExpr, MirScalarExpr,
18    RECURSION_LIMIT,
19};
20use mz_ore::stack::{CheckedRecursion, RecursionGuard};
21use mz_repr::{Datum, Row};
22
23use crate::TransformCtx;
24
25/// Drive demand from the root through operators.
26///
27/// This transform alerts operators to their columns that influence the
28/// ultimate output of the expression, and gives them permission to swap
29/// other columns for dummy values. As part of this, operators should not
30/// actually use any of these dummy values, lest they run-time error.
31///
32/// This transformation primarily informs the `Join` operator, which can
33/// simplify its intermediate state when it knows that certain columns are
34/// not observed in its output. Internal arrangements need not maintain
35/// columns that are no longer required in the join pipeline, which are
36/// those columns not required by the output nor any further equalities.
37///
38/// Nowadays, this transform is mostly obsoleted by `ProjectionPushdown`.
39/// However, I know of one thing that it still does that `ProjectionPushdown`
40/// doesn't do (there might be more such things):
41/// if you have something like
42/// ```code
43///     Project (#0, #1)
44///       Join on=(#0 = #1)
45/// ```
46/// then this is turned into
47/// ```code
48///     Project (#0, #0)
49///       Join on=(#0 = #1)
50/// ```
51/// This can be beneficial for projecting out some columns earlier inside a complex join (by the LIR
52/// planning), and then recovering them after the join (if needed) by duplicating existing columns.
53///
54/// After the last run of `Demand`, there should always be a `ProjectionPushdown`, so that dummies
55/// are eliminated from plans.
56#[derive(Debug)]
57pub struct Demand {
58    recursion_guard: RecursionGuard,
59}
60
61impl Default for Demand {
62    fn default() -> Demand {
63        Demand {
64            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
65        }
66    }
67}
68
69impl CheckedRecursion for Demand {
70    fn recursion_guard(&self) -> &RecursionGuard {
71        &self.recursion_guard
72    }
73}
74
75impl crate::Transform for Demand {
76    fn name(&self) -> &'static str {
77        "Demand"
78    }
79
80    #[mz_ore::instrument(
81        target = "optimizer",
82        level = "debug",
83        fields(path.segment = "demand")
84    )]
85    fn actually_perform_transform(
86        &self,
87        relation: &mut MirRelationExpr,
88        _: &mut TransformCtx,
89    ) -> Result<(), crate::TransformError> {
90        let result = self.action(
91            relation,
92            (0..relation.arity()).collect(),
93            &mut BTreeMap::new(),
94        );
95        mz_repr::explain::trace_plan(&*relation);
96        result
97    }
98}
99
100impl Demand {
101    /// Columns to be produced.
102    fn action(
103        &self,
104        relation: &mut MirRelationExpr,
105        mut columns: BTreeSet<usize>,
106        gets: &mut BTreeMap<Id, BTreeSet<usize>>,
107    ) -> Result<(), crate::TransformError> {
108        self.checked_recur(|_| {
109            // A valid relation type is only needed for Maps, but we can't borrow
110            // the relation in the corresponding branch of the match statement, since
111            // it is already borrowed mutably.
112            let relation_type = if matches!(relation, MirRelationExpr::Map { .. }) {
113                Some(relation.typ())
114            } else {
115                None
116            };
117            match relation {
118                MirRelationExpr::Constant { .. } => {
119                    // Nothing clever to do with constants, that I can think of.
120                    Ok(())
121                }
122                MirRelationExpr::Get { id, .. } => {
123                    gets.entry(*id)
124                        .or_insert_with(BTreeSet::new)
125                        .extend(columns);
126                    Ok(())
127                }
128                MirRelationExpr::Let { id, value, body } => {
129                    // Let harvests any requirements of get from its body,
130                    // and pushes the union of the requirements at its value.
131                    let id = Id::Local(*id);
132                    let prior = gets.insert(id, BTreeSet::new());
133                    assert_none!(prior); // no shadowing
134                    self.action(body, columns, gets)?;
135                    let needs = gets.remove(&id).expect("existing gets entry");
136                    if let Some(prior) = prior {
137                        gets.insert(id, prior);
138                    }
139
140                    self.action(value, needs, gets)
141                }
142                MirRelationExpr::LetRec {
143                    ids,
144                    values,
145                    limits: _,
146                    body,
147                } => {
148                    let ids_used_across_iterations = MirRelationExpr::recursive_ids(ids, values)
149                        .iter()
150                        .map(|id| Id::Local(*id))
151                        .collect::<BTreeSet<_>>();
152                    let ids = ids.iter().map(|id| Id::Local(*id)).collect_vec();
153                    for id in ids.iter() {
154                        let prior = gets.insert(id.clone(), BTreeSet::new());
155                        assert_none!(prior); // no shadowing
156                    }
157                    self.action(body, columns, gets)?;
158                    for (id, value) in ids.iter().rev().zip_eq(values.iter_mut().rev()) {
159                        let needs = if !ids_used_across_iterations.contains(id) {
160                            gets.remove(id).expect("existing gets entry")
161                        } else {
162                            // Remove, but ignore the collected needs
163                            gets.remove(id).expect("existing gets entry");
164                            // Instead of using `gets`, we'll say we need all columns for a
165                            // recursive id
166                            (0..value.arity()).collect::<BTreeSet<_>>()
167                        };
168                        self.action(value, needs, gets)?;
169                    }
170                    Ok(())
171                }
172                MirRelationExpr::Project { input, outputs } => self.action(
173                    input,
174                    columns.into_iter().map(|c| outputs[c]).collect(),
175                    gets,
176                ),
177                MirRelationExpr::Map { input, scalars } => {
178                    let relation_type = relation_type.as_ref().unwrap();
179                    let arity = relation_type.arity() - scalars.len();
180                    // contains columns whose supports have yet to be explored
181                    let mut new_columns = columns.clone();
182                    new_columns.retain(|c| *c >= arity);
183                    while !new_columns.is_empty() {
184                        // explore supports
185                        new_columns = new_columns
186                            .iter()
187                            .flat_map(|c| scalars[*c - arity].support())
188                            .filter(|c| !columns.contains(c))
189                            .collect();
190                        // add those columns to the seen list
191                        columns.extend(new_columns.clone());
192                        new_columns.retain(|c| *c >= arity);
193                    }
194
195                    // Replace un-read expressions with literals to prevent evaluation.
196                    for (index, scalar) in scalars.iter_mut().enumerate() {
197                        if !columns.contains(&(arity + index)) {
198                            // Leave literals as they are, to benefit explain.
199                            if !scalar.is_literal() {
200                                let typ = relation_type.column_types[arity + index].clone();
201                                *scalar = MirScalarExpr::Literal(
202                                    Ok(Row::pack_slice(&[Datum::Dummy])),
203                                    typ,
204                                );
205                            }
206                        }
207                    }
208
209                    columns.retain(|c| *c < arity);
210                    self.action(input, columns, gets)
211                }
212                MirRelationExpr::FlatMap {
213                    input,
214                    func: _,
215                    exprs,
216                } => {
217                    // A FlatMap which returns zero rows acts like a filter
218                    // so we always need to execute it
219                    for expr in exprs {
220                        expr.support_into(&mut columns);
221                    }
222                    let arity = input.arity();
223                    columns.retain(|c| *c < arity);
224                    self.action(input, columns, gets)
225                }
226                MirRelationExpr::Filter { input, predicates } => {
227                    for predicate in predicates {
228                        predicate.support_into(&mut columns)
229                    }
230                    self.action(input, columns, gets)
231                }
232                MirRelationExpr::Join {
233                    inputs,
234                    equivalences,
235                    implementation: _,
236                } => {
237                    let input_mapper = JoinInputMapper::new(inputs);
238
239                    // Each produced column that is equivalent to a prior column should be remapped
240                    // so that upstream uses depend only on the first column, simplifying the demand
241                    // analysis. In principle we could choose any representative, if it turns out
242                    // that some other column would have been more helpful, but we don't have a great
243                    // reason to do that at the moment.
244                    let mut permutation: Vec<usize> = (0..input_mapper.total_columns()).collect();
245                    for equivalence in equivalences.iter() {
246                        let mut first_column = None;
247                        for expr in equivalence.iter() {
248                            if let MirScalarExpr::Column(c) = expr {
249                                if let Some(prior) = &first_column {
250                                    permutation[*c] = *prior;
251                                } else {
252                                    first_column = Some(*c);
253                                }
254                            }
255                        }
256                    }
257
258                    let should_permute = columns.iter().any(|c| permutation[*c] != *c);
259
260                    // Each equivalence class imposes internal demand for columns.
261                    for equivalence in equivalences.iter() {
262                        for expr in equivalence.iter() {
263                            expr.support_into(&mut columns);
264                        }
265                    }
266
267                    // Populate child demands from external and internal demands.
268                    let new_columns = input_mapper.split_column_set_by_input(columns.iter());
269
270                    // Recursively indicate the requirements.
271                    for (input, columns) in inputs.iter_mut().zip(new_columns) {
272                        self.action(input, columns, gets)?;
273                    }
274
275                    // Install a permutation if any demanded column is not the
276                    // canonical column.
277                    if should_permute {
278                        *relation = relation.take_dangerous().project(permutation);
279                    }
280
281                    Ok(())
282                }
283                MirRelationExpr::Reduce {
284                    input,
285                    group_key,
286                    aggregates,
287                    monotonic: _,
288                    expected_group_size: _,
289                } => {
290                    let mut new_columns = BTreeSet::new();
291                    // Group keys determine aggregation granularity and are
292                    // each crucial in determining aggregates and even the
293                    // multiplicities of other keys.
294                    for k in group_key.iter() {
295                        k.support_into(&mut new_columns)
296                    }
297                    for column in columns.iter() {
298                        // No obvious requirements on aggregate columns.
299                        // A "non-empty" requirement, I guess?
300                        if *column >= group_key.len() {
301                            aggregates[*column - group_key.len()]
302                                .expr
303                                .support_into(&mut new_columns);
304                        }
305                    }
306
307                    // Replace un-demanded aggregations with dummies.
308                    let input_type = input.typ();
309                    for index in (0..aggregates.len()).rev() {
310                        if !columns.contains(&(group_key.len() + index)) {
311                            let typ = aggregates[index].typ(&input_type.column_types);
312                            aggregates[index] = AggregateExpr {
313                                func: AggregateFunc::Dummy,
314                                expr: MirScalarExpr::literal_ok(Datum::Dummy, typ.scalar_type),
315                                distinct: false,
316                            };
317                        }
318                    }
319
320                    self.action(input, new_columns, gets)
321                }
322                MirRelationExpr::TopK {
323                    input,
324                    group_key,
325                    order_key,
326                    limit,
327                    ..
328                } => {
329                    // Group and order keys and limit must be retained, as they
330                    // define which rows are retained.
331                    columns.extend(group_key.iter().cloned());
332                    columns.extend(order_key.iter().map(|o| o.column));
333                    if let Some(limit) = limit {
334                        // Strictly speaking not needed because the
335                        // `limit` support should be a subset of the
336                        // `group_key` support, but we don't want to
337                        // take this for granted here.
338                        limit.support_into(&mut columns)
339                    }
340                    self.action(input, columns, gets)
341                }
342                MirRelationExpr::Negate { input } => self.action(input, columns, gets),
343                MirRelationExpr::Threshold { input } => {
344                    // Threshold requires all columns, as collapsing any distinct values
345                    // has the potential to change how it thresholds counts. This could
346                    // be improved with reasoning about distinctness or non-negativity.
347                    let arity = input.arity();
348                    self.action(input, (0..arity).collect(), gets)
349                }
350                MirRelationExpr::Union { base, inputs } => {
351                    self.action(base, columns.clone(), gets)?;
352                    for input in inputs {
353                        self.action(input, columns.clone(), gets)?;
354                    }
355                    Ok(())
356                }
357                MirRelationExpr::ArrangeBy { input, keys } => {
358                    for key_set in keys {
359                        for key in key_set {
360                            key.support_into(&mut columns);
361                        }
362                    }
363                    self.action(input, columns, gets)
364                }
365            }
366        })
367    }
368}