Skip to main content

mz_transform/movement/
projection_pushdown.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Pushes column removal down through other operators.
11//!
12//! This action improves the quality of the query by
13//! reducing the width of data in the dataflow. It determines the unique
14//! columns an expression depends on, and pushes a projection onto only
15//! those columns down through child operators.
16//!
17//! A `MirRelationExpr::Project` node is actually three transformations in one.
18//! 1) Projection - removes columns.
19//! 2) Permutation - reorders columns.
20//! 3) Repetition - duplicates columns.
21//!
22//! This action handles these three transformations like so:
23//! 1) Projections are pushed as far down as possible.
24//! 2) Permutations are pushed as far down as is convenient.
25//! 3) Repetitions are not pushed down at all.
26//!
27//! Some comments have been inherited from the `Demand` transform.
28//!
29//! Note that this transform is one that can operate across views in a dataflow
30//! and thus currently exists outside of both the physical and logical
31//! optimizers.
32
33use std::collections::{BTreeMap, BTreeSet};
34
35use itertools::{Itertools, zip_eq};
36use mz_expr::{
37    Columns, Id, JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr,
38    RECURSION_LIMIT, TableFunc, func,
39};
40use mz_ore::assert_none;
41use mz_ore::stack::{CheckedRecursion, RecursionGuard};
42use mz_repr::{Datum, ReprRelationType, ReprScalarType};
43
44use crate::{TransformCtx, TransformError};
45
46/// Pushes projections down through other operators.
47#[derive(Debug)]
48pub struct ProjectionPushdown {
49    recursion_guard: RecursionGuard,
50    include_joins: bool,
51}
52
53impl Default for ProjectionPushdown {
54    fn default() -> Self {
55        Self {
56            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
57            include_joins: true,
58        }
59    }
60}
61
62impl ProjectionPushdown {
63    /// Construct a `ProjectionPushdown` that does not push projections through joins (but does
64    /// descend into join inputs).
65    pub fn skip_joins() -> Self {
66        Self {
67            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
68            include_joins: false,
69        }
70    }
71}
72
73impl CheckedRecursion for ProjectionPushdown {
74    fn recursion_guard(&self) -> &RecursionGuard {
75        &self.recursion_guard
76    }
77}
78
79impl crate::Transform for ProjectionPushdown {
80    fn name(&self) -> &'static str {
81        "ProjectionPushdown"
82    }
83
84    // This method is only used during unit testing.
85    #[mz_ore::instrument(
86        target = "optimizer",
87        level = "debug",
88        fields(path.segment = "projection_pushdown")
89    )]
90    fn actually_perform_transform(
91        &self,
92        relation: &mut MirRelationExpr,
93        _: &mut TransformCtx,
94    ) -> Result<(), crate::TransformError> {
95        let result = self.action(
96            relation,
97            &(0..relation.arity()).collect(),
98            &mut BTreeMap::new(),
99        );
100        mz_repr::explain::trace_plan(&*relation);
101        result
102    }
103}
104
105impl ProjectionPushdown {
106    /// Pushes the `desired_projection` down through `relation`.
107    ///
108    /// This action transforms `relation` to a `MirRelationExpr` equivalent to
109    /// `relation.project(desired_projection)`.
110    ///
111    /// `desired_projection` is expected to consist of unique columns.
112    pub fn action(
113        &self,
114        relation: &mut MirRelationExpr,
115        desired_projection: &Vec<usize>,
116        gets: &mut BTreeMap<Id, BTreeSet<usize>>,
117    ) -> Result<(), TransformError> {
118        self.checked_recur(|_| {
119            // First, try to push the desired projection down through `relation`.
120            // In the process `relation` is transformed to a `MirRelationExpr`
121            // equivalent to `relation.project(actual_projection)`.
122            // There are three reasons why `actual_projection` may differ from
123            // `desired_projection`:
124            // 1) `relation` may need one or more columns that is not contained in
125            //    `desired_projection`.
126            // 2) `relation` may not be able to accommodate certain permutations.
127            //    For example, `MirRelationExpr::Map` always appends all
128            //    newly-created columns to the end.
129            // 3) Nothing can be pushed through a leaf node. If `relation` is a leaf
130            //    node, `actual_projection` will always be `(0..relation.arity())`.
131            // Then, if `actual_projection` and `desired_projection` differ, we will
132            // add a project around `relation`.
133            let actual_projection = match relation {
134                MirRelationExpr::Constant { .. } => (0..relation.arity()).collect(),
135                MirRelationExpr::Get { id, .. } => {
136                    gets.entry(*id)
137                        .or_insert_with(BTreeSet::new)
138                        .extend(desired_projection.iter().cloned());
139                    (0..relation.arity()).collect()
140                }
141                MirRelationExpr::Let { id, value, body } => {
142                    // Let harvests any requirements of get from its body,
143                    // and pushes the sorted union of the requirements at its value.
144                    let id = Id::Local(*id);
145                    let prior = gets.insert(id, BTreeSet::new());
146                    self.action(body, desired_projection, gets)?;
147                    let desired_value_projection = gets.remove(&id).unwrap();
148                    if let Some(prior) = prior {
149                        gets.insert(id, prior);
150                    }
151                    let desired_value_projection =
152                        desired_value_projection.into_iter().collect::<Vec<_>>();
153                    self.action(value, &desired_value_projection, gets)?;
154                    let new_type = value.typ();
155                    self.update_projection_around_get(
156                        body,
157                        &BTreeMap::from_iter(std::iter::once((
158                            id,
159                            (desired_value_projection, new_type),
160                        ))),
161                    );
162                    desired_projection.clone()
163                }
164                MirRelationExpr::LetRec {
165                    ids,
166                    values,
167                    limits: _,
168                    body,
169                } => {
170                    // Determine the recursive IDs in this LetRec binding.
171                    let rec_ids = MirRelationExpr::recursive_ids(ids, values);
172
173                    // Seed the gets map with empty demand for each non-recursive ID.
174                    for id in ids.iter().filter(|id| !rec_ids.contains(id)) {
175                        let prior = gets.insert(Id::Local(*id), BTreeSet::new());
176                        assert_none!(prior);
177                    }
178
179                    // Descend into the body with the supplied desired_projection.
180                    self.action(body, desired_projection, gets)?;
181                    // Descend into the values in reverse order.
182                    for (id, value) in zip_eq(ids.iter().rev(), values.iter_mut().rev()) {
183                        let desired_projection = if rec_ids.contains(id) {
184                            // For recursive IDs: request all columns.
185                            let columns = 0..value.arity();
186                            columns.collect::<Vec<_>>()
187                        } else {
188                            // For non-recursive IDs: request the gets entry.
189                            let columns = gets.get(&Id::Local(*id)).unwrap();
190                            columns.iter().cloned().collect::<Vec<_>>()
191                        };
192                        self.action(value, &desired_projection, gets)?;
193                    }
194
195                    // Update projections around gets of non-recursive IDs.
196                    let mut updates = BTreeMap::new();
197                    for (id, value) in zip_eq(ids.iter(), values.iter_mut()) {
198                        // Update the current value.
199                        self.update_projection_around_get(value, &updates);
200                        // If this is a non-recursive ID, add an entry to the
201                        // updates map for subsequent values and the body.
202                        if !rec_ids.contains(id) {
203                            let new_type = value.typ();
204                            let new_proj = {
205                                let columns = gets.remove(&Id::Local(*id)).unwrap();
206                                columns.iter().cloned().collect::<Vec<_>>()
207                            };
208                            updates.insert(Id::Local(*id), (new_proj, new_type));
209                        }
210                    }
211                    // Update the body.
212                    self.update_projection_around_get(body, &updates);
213
214                    // Remove the entries for all ids (don't restrict only to
215                    // non-recursive IDs here for better hygene).
216                    for id in ids.iter() {
217                        gets.remove(&Id::Local(*id));
218                    }
219
220                    // Return the desired projection (leads to a no-op in the
221                    // projection handling logic after this match statement).
222                    desired_projection.clone()
223                }
224                MirRelationExpr::Join {
225                    inputs,
226                    equivalences,
227                    implementation,
228                } if self.include_joins => {
229                    assert!(
230                        matches!(implementation, JoinImplementation::Unimplemented),
231                        "ProjectionPushdown can't deal with filled in join implementations. Turn off `include_joins` if you'd like to run it after `JoinImplementation`."
232                    );
233
234                    let input_mapper = JoinInputMapper::new(inputs);
235
236                    let mut columns_to_pushdown =
237                        desired_projection.iter().cloned().collect::<BTreeSet<_>>();
238                    // Each equivalence class imposes internal demand for columns.
239                    for equivalence in equivalences.iter() {
240                        for expr in equivalence.iter() {
241                            expr.support_into(&mut columns_to_pushdown);
242                        }
243                    }
244
245                    // Populate child demands from external and internal demands.
246                    let new_columns =
247                        input_mapper.split_column_set_by_input(columns_to_pushdown.iter());
248
249                    // Recursively indicate the requirements.
250                    for (input, inp_columns) in inputs.iter_mut().zip_eq(new_columns) {
251                        let inp_columns = inp_columns.into_iter().collect::<Vec<_>>();
252                        self.action(input, &inp_columns, gets)?;
253                    }
254
255                    reverse_permute(
256                        equivalences.iter_mut().flat_map(|e| e.iter_mut()),
257                        columns_to_pushdown.iter(),
258                    );
259
260                    columns_to_pushdown.into_iter().collect()
261                }
262                // Skip joins if `self.include_joins` is turned off.
263                MirRelationExpr::Join { inputs, equivalences: _, implementation: _ } => {
264                    let input_mapper = JoinInputMapper::new(inputs);
265
266                    // Include all columns.
267                    let columns_to_pushdown: Vec<_> = (0..input_mapper.total_columns()).collect();
268                    let child_columns =
269                        input_mapper.split_column_set_by_input(columns_to_pushdown.iter());
270
271                    // Recursively indicate the requirements.
272                    for (input, inp_columns) in inputs.iter_mut().zip_eq(child_columns) {
273                        let inp_columns = inp_columns.into_iter().collect::<Vec<_>>();
274                        self.action(input, &inp_columns, gets)?;
275                    }
276
277                    columns_to_pushdown.into_iter().collect()
278                }
279                MirRelationExpr::FlatMap { input, func, exprs } => {
280                    let inner_arity = input.arity();
281                    // When the values generated by a `generate_series` are not
282                    // demanded, only its cardinality matters. Collapse it into a
283                    // `RepeatRowNonNegative`, which encodes that cardinality in a
284                    // single row's diff rather than emitting one row per value.
285                    if !desired_projection.iter().any(|c| *c >= inner_arity) {
286                        collapse_unused_generate_series(func, exprs);
287                    }
288                    // A FlatMap which returns zero rows acts like a filter
289                    // so we always need to execute it
290                    let mut columns_to_pushdown =
291                        desired_projection.iter().cloned().collect::<BTreeSet<_>>();
292                    for expr in exprs.iter() {
293                        expr.support_into(&mut columns_to_pushdown);
294                    }
295                    columns_to_pushdown.retain(|c| *c < inner_arity);
296
297                    reverse_permute(exprs.iter_mut(), columns_to_pushdown.iter());
298                    let columns_to_pushdown = columns_to_pushdown.into_iter().collect::<Vec<_>>();
299                    self.action(input, &columns_to_pushdown, gets)?;
300                    // The actual projection always has the newly-created columns at
301                    // the end.
302                    let mut actual_projection = columns_to_pushdown;
303                    for c in 0..func.output_arity() {
304                        actual_projection.push(inner_arity + c);
305                    }
306                    actual_projection
307                }
308                MirRelationExpr::Filter { input, predicates } => {
309                    let mut columns_to_pushdown =
310                        desired_projection.iter().cloned().collect::<BTreeSet<_>>();
311                    for predicate in predicates.iter() {
312                        predicate.support_into(&mut columns_to_pushdown);
313                    }
314                    reverse_permute(predicates.iter_mut(), columns_to_pushdown.iter());
315                    let columns_to_pushdown = columns_to_pushdown.into_iter().collect::<Vec<_>>();
316                    self.action(input, &columns_to_pushdown, gets)?;
317                    columns_to_pushdown
318                }
319                MirRelationExpr::Project { input, outputs } => {
320                    // Combine `outputs` with `desired_projection`.
321                    *outputs = desired_projection.iter().map(|c| outputs[*c]).collect();
322
323                    let unique_outputs = outputs.iter().map(|i| *i).collect::<BTreeSet<_>>();
324                    if outputs.len() == unique_outputs.len() {
325                        // Push down the project as is.
326                        self.action(input, outputs, gets)?;
327                        *relation = input.take_dangerous();
328                    } else {
329                        // Push down only the unique elems in `outputs`.
330                        let columns_to_pushdown = unique_outputs.into_iter().collect::<Vec<_>>();
331                        reverse_permute_columns(outputs.iter_mut(), columns_to_pushdown.iter());
332                        self.action(input, &columns_to_pushdown, gets)?;
333                    }
334
335                    desired_projection.clone()
336                }
337                MirRelationExpr::Map { input, scalars } => {
338                    let arity = input.arity();
339                    // contains columns whose supports have yet to be explored
340                    let mut actual_projection =
341                        desired_projection.iter().cloned().collect::<BTreeSet<_>>();
342                    for (i, scalar) in scalars.iter().enumerate().rev() {
343                        if actual_projection.contains(&(i + arity)) {
344                            scalar.support_into(&mut actual_projection);
345                        }
346                    }
347                    *scalars = (0..scalars.len())
348                        .filter_map(|i| {
349                            if actual_projection.contains(&(i + arity)) {
350                                Some(scalars[i].clone())
351                            } else {
352                                None
353                            }
354                        })
355                        .collect::<Vec<_>>();
356                    reverse_permute(scalars.iter_mut(), actual_projection.iter());
357                    self.action(
358                        input,
359                        &actual_projection
360                            .iter()
361                            .filter(|c| **c < arity)
362                            .map(|c| *c)
363                            .collect(),
364                        gets,
365                    )?;
366                    actual_projection.into_iter().collect()
367                }
368                MirRelationExpr::Reduce {
369                    input,
370                    group_key,
371                    aggregates,
372                    monotonic: _,
373                    expected_group_size: _,
374                } => {
375                    let mut columns_to_pushdown = BTreeSet::new();
376                    // Group keys determine aggregation granularity and are
377                    // each crucial in determining aggregates and even the
378                    // multiplicities of other keys.
379                    for k in group_key.iter() {
380                        k.support_into(&mut columns_to_pushdown)
381                    }
382
383                    for index in (0..aggregates.len()).rev() {
384                        if !desired_projection.contains(&(group_key.len() + index)) {
385                            aggregates.remove(index);
386                        } else {
387                            // No obvious requirements on aggregate columns.
388                            // A "non-empty" requirement, I guess?
389                            aggregates[index]
390                                .expr
391                                .support_into(&mut columns_to_pushdown)
392                        }
393                    }
394
395                    reverse_permute(
396                        itertools::chain!(
397                            group_key.iter_mut(),
398                            aggregates.iter_mut().map(|a| &mut a.expr)
399                        ),
400                        columns_to_pushdown.iter(),
401                    );
402
403                    self.action(
404                        input,
405                        &columns_to_pushdown.into_iter().collect::<Vec<_>>(),
406                        gets,
407                    )?;
408                    let mut actual_projection =
409                        desired_projection.iter().cloned().collect::<BTreeSet<_>>();
410                    actual_projection.extend(0..group_key.len());
411                    actual_projection.into_iter().collect()
412                }
413                MirRelationExpr::TopK {
414                    input,
415                    group_key,
416                    order_key,
417                    limit,
418                    ..
419                } => {
420                    // Group and order keys and limit support must be retained, as
421                    // they define which rows are retained.
422                    let mut columns_to_pushdown =
423                        desired_projection.iter().cloned().collect::<BTreeSet<_>>();
424                    columns_to_pushdown.extend(group_key.iter().cloned());
425                    columns_to_pushdown.extend(order_key.iter().map(|o| o.column));
426                    if let Some(limit) = limit.as_ref() {
427                        // Strictly speaking not needed because the
428                        // `limit` support should be a subset of the
429                        // `group_key` support, but we don't want to
430                        // take this for granted here.
431                        limit.support_into(&mut columns_to_pushdown);
432                    }
433                    // If the `TopK` does not have any new column demand, just push
434                    // down the desired projection. Otherwise, push down the sorted
435                    // column demand.
436                    let columns_to_pushdown =
437                        if columns_to_pushdown.len() == desired_projection.len() {
438                            desired_projection.clone()
439                        } else {
440                            columns_to_pushdown.into_iter().collect::<Vec<_>>()
441                        };
442                    reverse_permute_columns(
443                        itertools::chain!(
444                            group_key.iter_mut(),
445                            order_key.iter_mut().map(|o| &mut o.column),
446                        ),
447                        columns_to_pushdown.iter(),
448                    );
449                    reverse_permute(limit.iter_mut(), columns_to_pushdown.iter());
450                    self.action(input, &columns_to_pushdown, gets)?;
451                    columns_to_pushdown
452                }
453                MirRelationExpr::Negate { input } => {
454                    self.action(input, desired_projection, gets)?;
455                    desired_projection.clone()
456                }
457                MirRelationExpr::Union { base, inputs } => {
458                    self.action(base, desired_projection, gets)?;
459                    for input in inputs {
460                        self.action(input, desired_projection, gets)?;
461                    }
462                    desired_projection.clone()
463                }
464                MirRelationExpr::Threshold { input } => {
465                    // Threshold requires all columns, as collapsing any distinct values
466                    // has the potential to change how it thresholds counts. This could
467                    // be improved with reasoning about distinctness or non-negativity.
468                    let arity = input.arity();
469                    self.action(input, &(0..arity).collect(), gets)?;
470                    (0..arity).collect()
471                }
472                MirRelationExpr::ArrangeBy { input, keys: _ } => {
473                    // Do not push the project past the ArrangeBy.
474                    // TODO: how do we handle key sets containing column references
475                    // that are not demanded upstream?
476                    let arity = input.arity();
477                    self.action(input, &(0..arity).collect(), gets)?;
478                    (0..arity).collect()
479                }
480            };
481            let add_project = desired_projection != &actual_projection;
482            if add_project {
483                let mut projection_to_add = desired_projection.to_owned();
484                reverse_permute_columns(projection_to_add.iter_mut(), actual_projection.iter());
485                *relation = relation.take_dangerous().project(projection_to_add);
486            }
487            Ok(())
488        })
489    }
490
491    /// When we push the `desired_value_projection` at `value`,
492    /// the columns returned by `Get(get_id)` will change, so we need
493    /// to permute `Project`s around `Get(get_id)`.
494    pub fn update_projection_around_get(
495        &self,
496        relation: &mut MirRelationExpr,
497        applied_projections: &BTreeMap<Id, (Vec<usize>, ReprRelationType)>,
498    ) {
499        relation.visit_pre_mut(|e| {
500            if let MirRelationExpr::Project { input, outputs } = e {
501                if let MirRelationExpr::Get {
502                    id: inner_id,
503                    typ,
504                    access_strategy: _,
505                } = &mut **input
506                {
507                    if let Some((new_projection, new_type)) = applied_projections.get(inner_id) {
508                        typ.clone_from(new_type);
509                        reverse_permute_columns(outputs.iter_mut(), new_projection.iter());
510                        if outputs.len() == new_projection.len()
511                            && outputs.iter().enumerate().all(|(i, o)| i == *o)
512                        {
513                            *e = input.take_dangerous();
514                        }
515                    }
516                }
517            }
518            // If there is no `Project` around a Get, all columns of
519            // `Get(get_id)` are required. Thus, the columns returned by
520            // `Get(get_id)` will not have changed, so no action
521            // is necessary.
522        });
523    }
524}
525
526/// If `func` is an integer `generate_series` whose output values are unused,
527/// rewrites it in place into a `RepeatRowNonNegative` that emits the same number
528/// of rows. The caller must have established that the generated column is not
529/// demanded.
530///
531/// `generate_series(start, stop, step)` is inclusive of `stop`, so its
532/// cardinality is `max(0, floor((stop - start) / step) + 1)`. We only rewrite
533/// when `step` is a known non-zero literal, which lets us specialize on its
534/// sign: truncating integer division (what `DivInt64` does) coincides with
535/// floor division exactly when the dividend and divisor share a sign, which the
536/// emptiness guard guarantees. The guard also collapses the empty series to a
537/// count of zero, so the result is always non-negative.
538///
539/// When `start` and `stop` are also literals, the cardinality is computed here,
540/// exactly, in `i128` (where no `i64` inputs can overflow). If it does not fit
541/// in an `i64` we decline to rewrite: the original `FlatMap` enumerates such a
542/// series without error (its iteration only ever visits in-range values), so
543/// its replacement must not error either.
544///
545/// When the bounds are not literals we synthesize the arithmetic, and the width
546/// we synthesize it in depends on `step`:
547///
548/// * `|step| == 1`: `i64`, and the division (the identity or a negation) is
549///   elided, so the count is just `(a - b) + 1`. A literal subtracted bound is
550///   folded into `a + (1 - b)` — `a` itself for `generate_series(1, n)`. Here
551///   `a - b` overflows `i64` only once the span reaches `2^63`, which means the
552///   series has more than `i64::MAX` rows — infeasible, exactly what the literal
553///   path declines. So `i64` arithmetic is exact for every feasible series, and
554///   an overflow only ever stands in for an effectively non-terminating
555///   enumeration.
556///
557/// * `|step| >= 2`: `numeric`. A coarse step over near-opposite `i64` extremes
558///   yields a *feasible* series (few rows) whose span nonetheless overflows
559///   `i64`, so `i64` arithmetic would error where the original does not.
560///   `numeric` holds the full `~2^64` span comfortably and represents the
561///   quotient exactly (it needs at most ~20 significant digits, well under
562///   `numeric`'s 39), so the only place this can error is the final cast back to
563///   `i64` — which happens exactly when the count itself exceeds `i64`, the same
564///   infeasible case the literal path declines.
565///
566/// Null inputs are handled by `RepeatRowNonNegative` itself: like
567/// `generate_series`, it is `empty_on_null_input`, so a null count yields no
568/// rows.
569fn collapse_unused_generate_series(func: &mut TableFunc, exprs: &mut Vec<MirScalarExpr>) {
570    // Only integer `generate_series` is handled here.
571    if !matches!(
572        func,
573        TableFunc::GenerateSeriesInt32 | TableFunc::GenerateSeriesInt64
574    ) {
575        return;
576    }
577    // Extracts a non-null integer literal as an `i64`, accepting any integer
578    // width (an argument's datum type does not necessarily match the series
579    // width).
580    let literal_i64 = |e: &MirScalarExpr| match e.as_literal() {
581        Some(Ok(Datum::Int16(v))) => Some(i64::from(v)),
582        Some(Ok(Datum::Int32(v))) => Some(i64::from(v)),
583        Some(Ok(Datum::Int64(v))) => Some(v),
584        _ => None,
585    };
586    // Bail unless the step is a non-null, non-zero literal.
587    let step = match literal_i64(&exprs[2]) {
588        Some(step) => step,
589        None => return,
590    };
591    if step == 0 {
592        // `generate_series` errors on a zero step; leave it to do so.
593        return;
594    }
595
596    // Literal bounds: compute the exact cardinality now, in `i128`, where no
597    // `i64` inputs can overflow.
598    if let (Some(start), Some(stop)) = (literal_i64(&exprs[0]), literal_i64(&exprs[1])) {
599        let empty = if step > 0 { stop < start } else { stop > start };
600        let count: i128 = if empty {
601            0
602        } else {
603            // Same-sign dividend and divisor, so truncating division is floor
604            // division.
605            (i128::from(stop) - i128::from(start)) / i128::from(step) + 1
606        };
607        if let Ok(count) = i64::try_from(count) {
608            *func = TableFunc::RepeatRowNonNegative;
609            *exprs = vec![MirScalarExpr::literal_ok(
610                Datum::Int64(count),
611                ReprScalarType::Int64,
612            )];
613        }
614        // A count beyond `i64` is left alone rather than collapsed: the
615        // original enumerates it without error, so the rewrite must too.
616        return;
617    }
618
619    let int32 = matches!(func, TableFunc::GenerateSeriesInt32);
620    let lit = |v: i64| MirScalarExpr::literal_ok(Datum::Int64(v), ReprScalarType::Int64);
621
622    // The series is non-empty exactly when `stop` is on the far side of `start`
623    // in the direction of `step`. Comparing the original (un-widened) bounds is
624    // overflow-free at any width.
625    let non_empty = if step > 0 {
626        exprs[1].clone().call_binary(exprs[0].clone(), func::Gte)
627    } else {
628        exprs[1].clone().call_binary(exprs[0].clone(), func::Lte)
629    };
630
631    // floor((stop - start) / step) + 1, valid (== truncating division) under the
632    // `non_empty` guard. With `|step| == 1` an overflow of `stop - start` only
633    // occurs for infeasible series, so cheap `i64` arithmetic is safe; with
634    // `|step| >= 2` a feasible series can still overflow `i64`, so we compute in
635    // `numeric` (see the doc comment).
636    let cardinality = if step.abs() == 1 {
637        // `|step| == 1` makes the division in `floor((stop - start) / step) + 1`
638        // either the identity (`step == 1`) or a negation (`step == -1`), so we
639        // write the count as `(a - b) + 1` directly, with no `DivInt64`:
640        //   step ==  1: `(a, b) = (stop, start)`
641        //   step == -1: `(a, b) = (start, stop)`
642        // Widen 32-bit bounds to the `i64` count `RepeatRowNonNegative` expects.
643        let to_i64 = |e: MirScalarExpr| {
644            if int32 {
645                e.call_unary(func::CastInt32ToInt64)
646            } else {
647                e
648            }
649        };
650        let (a_idx, b_idx) = if step > 0 { (1, 0) } else { (0, 1) };
651        let a = to_i64(exprs[a_idx].clone());
652        // When the subtracted bound `b` is a literal, fold `(a - b) + 1` into the
653        // single constant `a + (1 - b)`, which is just `a` when `b == 1` (the
654        // common `generate_series(1, n)` shape). Decline the fold when `1 - b`
655        // overflows `i64`; the `(a - b) + 1` fallback then carries the same
656        // infeasible-only overflow as any other `|step| == 1` count.
657        match literal_i64(&exprs[b_idx]).and_then(|b| i64::try_from(1i128 - i128::from(b)).ok()) {
658            Some(0) => a,
659            Some(c) => a.call_binary(lit(c), func::AddInt64),
660            None => {
661                let b = to_i64(exprs[b_idx].clone());
662                a.call_binary(b, func::SubInt64)
663                    .call_binary(lit(1), func::AddInt64)
664            }
665        }
666    } else {
667        // Cast the bounds to `numeric` so the span can't overflow, then floor the
668        // quotient (exact here) and cast back to the `i64` count.
669        let to_numeric = |e: MirScalarExpr| {
670            if int32 {
671                e.call_unary(func::CastInt32ToNumeric(None))
672            } else {
673                e.call_unary(func::CastInt64ToNumeric(None))
674            }
675        };
676        let numeric_lit = |v: i64| lit(v).call_unary(func::CastInt64ToNumeric(None));
677        let start = to_numeric(exprs[0].clone());
678        let stop = to_numeric(exprs[1].clone());
679        stop.call_binary(start, func::SubNumeric)
680            .call_binary(numeric_lit(step), func::DivNumeric)
681            .call_unary(func::FloorNumeric)
682            .call_binary(numeric_lit(1), func::AddNumeric)
683            .call_unary(func::CastNumericToInt64)
684    };
685
686    *func = TableFunc::RepeatRowNonNegative;
687    *exprs = vec![non_empty.if_then_else(cardinality, lit(0))];
688}
689
690/// Applies the reverse of [MirScalarExpr.permute] on each expression.
691///
692/// `permutation` can be thought of as a mapping of column references from
693/// `stateA` to `stateB`. [MirScalarExpr.permute] assumes that the column
694/// references of the expression are in `stateA` and need to be remapped to
695/// their `stateB` counterparts. This methods assumes that the column
696/// references are in `stateB` and need to be remapped to `stateA`.
697///
698/// The `outputs` field of [MirRelationExpr::Project] is a mapping from "after"
699/// to "before". Thus, when lifting projections, you would permute on `outputs`,
700/// but you need to reverse permute when pushing projections down.
701fn reverse_permute<'a, I, J>(exprs: I, permutation: J)
702where
703    I: Iterator<Item = &'a mut MirScalarExpr>,
704    J: Iterator<Item = &'a usize>,
705{
706    let reverse_col_map = permutation
707        .enumerate()
708        .map(|(idx, c)| (*c, idx))
709        .collect::<BTreeMap<_, _>>();
710    for expr in exprs {
711        expr.permute_map(&reverse_col_map);
712    }
713}
714
715/// Same as [reverse_permute], but takes column numbers as input
716fn reverse_permute_columns<'a, I, J>(columns: I, permutation: J)
717where
718    I: Iterator<Item = &'a mut usize>,
719    J: Iterator<Item = &'a usize>,
720{
721    let reverse_col_map = permutation
722        .enumerate()
723        .map(|(idx, c)| (*c, idx))
724        .collect::<BTreeMap<_, _>>();
725    for c in columns {
726        *c = reverse_col_map[c];
727    }
728}