mz_transform/movement/projection_pushdown.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Pushes column removal down through other operators.
11//!
12//! This action improves the quality of the query by
13//! reducing the width of data in the dataflow. It determines the unique
14//! columns an expression depends on, and pushes a projection onto only
15//! those columns down through child operators.
16//!
17//! A `MirRelationExpr::Project` node is actually three transformations in one.
18//! 1) Projection - removes columns.
19//! 2) Permutation - reorders columns.
20//! 3) Repetition - duplicates columns.
21//!
22//! This action handles these three transformations like so:
23//! 1) Projections are pushed as far down as possible.
24//! 2) Permutations are pushed as far down as is convenient.
25//! 3) Repetitions are not pushed down at all.
26//!
27//! Some comments have been inherited from the `Demand` transform.
28//!
29//! Note that this transform is one that can operate across views in a dataflow
30//! and thus currently exists outside of both the physical and logical
31//! optimizers.
32
33use std::collections::{BTreeMap, BTreeSet};
34
35use itertools::{Itertools, zip_eq};
36use mz_expr::{
37 Columns, Id, JoinImplementation, JoinInputMapper, MirRelationExpr, MirScalarExpr,
38 RECURSION_LIMIT, TableFunc, func,
39};
40use mz_ore::assert_none;
41use mz_ore::stack::{CheckedRecursion, RecursionGuard};
42use mz_repr::{Datum, ReprRelationType, ReprScalarType};
43
44use crate::{TransformCtx, TransformError};
45
46/// Pushes projections down through other operators.
47#[derive(Debug)]
48pub struct ProjectionPushdown {
49 recursion_guard: RecursionGuard,
50 include_joins: bool,
51}
52
53impl Default for ProjectionPushdown {
54 fn default() -> Self {
55 Self {
56 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
57 include_joins: true,
58 }
59 }
60}
61
62impl ProjectionPushdown {
63 /// Construct a `ProjectionPushdown` that does not push projections through joins (but does
64 /// descend into join inputs).
65 pub fn skip_joins() -> Self {
66 Self {
67 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
68 include_joins: false,
69 }
70 }
71}
72
73impl CheckedRecursion for ProjectionPushdown {
74 fn recursion_guard(&self) -> &RecursionGuard {
75 &self.recursion_guard
76 }
77}
78
79impl crate::Transform for ProjectionPushdown {
80 fn name(&self) -> &'static str {
81 "ProjectionPushdown"
82 }
83
84 // This method is only used during unit testing.
85 #[mz_ore::instrument(
86 target = "optimizer",
87 level = "debug",
88 fields(path.segment = "projection_pushdown")
89 )]
90 fn actually_perform_transform(
91 &self,
92 relation: &mut MirRelationExpr,
93 _: &mut TransformCtx,
94 ) -> Result<(), crate::TransformError> {
95 let result = self.action(
96 relation,
97 &(0..relation.arity()).collect(),
98 &mut BTreeMap::new(),
99 );
100 mz_repr::explain::trace_plan(&*relation);
101 result
102 }
103}
104
105impl ProjectionPushdown {
106 /// Pushes the `desired_projection` down through `relation`.
107 ///
108 /// This action transforms `relation` to a `MirRelationExpr` equivalent to
109 /// `relation.project(desired_projection)`.
110 ///
111 /// `desired_projection` is expected to consist of unique columns.
112 pub fn action(
113 &self,
114 relation: &mut MirRelationExpr,
115 desired_projection: &Vec<usize>,
116 gets: &mut BTreeMap<Id, BTreeSet<usize>>,
117 ) -> Result<(), TransformError> {
118 self.checked_recur(|_| {
119 // First, try to push the desired projection down through `relation`.
120 // In the process `relation` is transformed to a `MirRelationExpr`
121 // equivalent to `relation.project(actual_projection)`.
122 // There are three reasons why `actual_projection` may differ from
123 // `desired_projection`:
124 // 1) `relation` may need one or more columns that is not contained in
125 // `desired_projection`.
126 // 2) `relation` may not be able to accommodate certain permutations.
127 // For example, `MirRelationExpr::Map` always appends all
128 // newly-created columns to the end.
129 // 3) Nothing can be pushed through a leaf node. If `relation` is a leaf
130 // node, `actual_projection` will always be `(0..relation.arity())`.
131 // Then, if `actual_projection` and `desired_projection` differ, we will
132 // add a project around `relation`.
133 let actual_projection = match relation {
134 MirRelationExpr::Constant { .. } => (0..relation.arity()).collect(),
135 MirRelationExpr::Get { id, .. } => {
136 gets.entry(*id)
137 .or_insert_with(BTreeSet::new)
138 .extend(desired_projection.iter().cloned());
139 (0..relation.arity()).collect()
140 }
141 MirRelationExpr::Let { id, value, body } => {
142 // Let harvests any requirements of get from its body,
143 // and pushes the sorted union of the requirements at its value.
144 let id = Id::Local(*id);
145 let prior = gets.insert(id, BTreeSet::new());
146 self.action(body, desired_projection, gets)?;
147 let desired_value_projection = gets.remove(&id).unwrap();
148 if let Some(prior) = prior {
149 gets.insert(id, prior);
150 }
151 let desired_value_projection =
152 desired_value_projection.into_iter().collect::<Vec<_>>();
153 self.action(value, &desired_value_projection, gets)?;
154 let new_type = value.typ();
155 self.update_projection_around_get(
156 body,
157 &BTreeMap::from_iter(std::iter::once((
158 id,
159 (desired_value_projection, new_type),
160 ))),
161 );
162 desired_projection.clone()
163 }
164 MirRelationExpr::LetRec {
165 ids,
166 values,
167 limits: _,
168 body,
169 } => {
170 // Determine the recursive IDs in this LetRec binding.
171 let rec_ids = MirRelationExpr::recursive_ids(ids, values);
172
173 // Seed the gets map with empty demand for each non-recursive ID.
174 for id in ids.iter().filter(|id| !rec_ids.contains(id)) {
175 let prior = gets.insert(Id::Local(*id), BTreeSet::new());
176 assert_none!(prior);
177 }
178
179 // Descend into the body with the supplied desired_projection.
180 self.action(body, desired_projection, gets)?;
181 // Descend into the values in reverse order.
182 for (id, value) in zip_eq(ids.iter().rev(), values.iter_mut().rev()) {
183 let desired_projection = if rec_ids.contains(id) {
184 // For recursive IDs: request all columns.
185 let columns = 0..value.arity();
186 columns.collect::<Vec<_>>()
187 } else {
188 // For non-recursive IDs: request the gets entry.
189 let columns = gets.get(&Id::Local(*id)).unwrap();
190 columns.iter().cloned().collect::<Vec<_>>()
191 };
192 self.action(value, &desired_projection, gets)?;
193 }
194
195 // Update projections around gets of non-recursive IDs.
196 let mut updates = BTreeMap::new();
197 for (id, value) in zip_eq(ids.iter(), values.iter_mut()) {
198 // Update the current value.
199 self.update_projection_around_get(value, &updates);
200 // If this is a non-recursive ID, add an entry to the
201 // updates map for subsequent values and the body.
202 if !rec_ids.contains(id) {
203 let new_type = value.typ();
204 let new_proj = {
205 let columns = gets.remove(&Id::Local(*id)).unwrap();
206 columns.iter().cloned().collect::<Vec<_>>()
207 };
208 updates.insert(Id::Local(*id), (new_proj, new_type));
209 }
210 }
211 // Update the body.
212 self.update_projection_around_get(body, &updates);
213
214 // Remove the entries for all ids (don't restrict only to
215 // non-recursive IDs here for better hygene).
216 for id in ids.iter() {
217 gets.remove(&Id::Local(*id));
218 }
219
220 // Return the desired projection (leads to a no-op in the
221 // projection handling logic after this match statement).
222 desired_projection.clone()
223 }
224 MirRelationExpr::Join {
225 inputs,
226 equivalences,
227 implementation,
228 } if self.include_joins => {
229 assert!(
230 matches!(implementation, JoinImplementation::Unimplemented),
231 "ProjectionPushdown can't deal with filled in join implementations. Turn off `include_joins` if you'd like to run it after `JoinImplementation`."
232 );
233
234 let input_mapper = JoinInputMapper::new(inputs);
235
236 let mut columns_to_pushdown =
237 desired_projection.iter().cloned().collect::<BTreeSet<_>>();
238 // Each equivalence class imposes internal demand for columns.
239 for equivalence in equivalences.iter() {
240 for expr in equivalence.iter() {
241 expr.support_into(&mut columns_to_pushdown);
242 }
243 }
244
245 // Populate child demands from external and internal demands.
246 let new_columns =
247 input_mapper.split_column_set_by_input(columns_to_pushdown.iter());
248
249 // Recursively indicate the requirements.
250 for (input, inp_columns) in inputs.iter_mut().zip_eq(new_columns) {
251 let inp_columns = inp_columns.into_iter().collect::<Vec<_>>();
252 self.action(input, &inp_columns, gets)?;
253 }
254
255 reverse_permute(
256 equivalences.iter_mut().flat_map(|e| e.iter_mut()),
257 columns_to_pushdown.iter(),
258 );
259
260 columns_to_pushdown.into_iter().collect()
261 }
262 // Skip joins if `self.include_joins` is turned off.
263 MirRelationExpr::Join { inputs, equivalences: _, implementation: _ } => {
264 let input_mapper = JoinInputMapper::new(inputs);
265
266 // Include all columns.
267 let columns_to_pushdown: Vec<_> = (0..input_mapper.total_columns()).collect();
268 let child_columns =
269 input_mapper.split_column_set_by_input(columns_to_pushdown.iter());
270
271 // Recursively indicate the requirements.
272 for (input, inp_columns) in inputs.iter_mut().zip_eq(child_columns) {
273 let inp_columns = inp_columns.into_iter().collect::<Vec<_>>();
274 self.action(input, &inp_columns, gets)?;
275 }
276
277 columns_to_pushdown.into_iter().collect()
278 }
279 MirRelationExpr::FlatMap { input, func, exprs } => {
280 let inner_arity = input.arity();
281 // When the values generated by a `generate_series` are not
282 // demanded, only its cardinality matters. Collapse it into a
283 // `RepeatRowNonNegative`, which encodes that cardinality in a
284 // single row's diff rather than emitting one row per value.
285 if !desired_projection.iter().any(|c| *c >= inner_arity) {
286 collapse_unused_generate_series(func, exprs);
287 }
288 // A FlatMap which returns zero rows acts like a filter
289 // so we always need to execute it
290 let mut columns_to_pushdown =
291 desired_projection.iter().cloned().collect::<BTreeSet<_>>();
292 for expr in exprs.iter() {
293 expr.support_into(&mut columns_to_pushdown);
294 }
295 columns_to_pushdown.retain(|c| *c < inner_arity);
296
297 reverse_permute(exprs.iter_mut(), columns_to_pushdown.iter());
298 let columns_to_pushdown = columns_to_pushdown.into_iter().collect::<Vec<_>>();
299 self.action(input, &columns_to_pushdown, gets)?;
300 // The actual projection always has the newly-created columns at
301 // the end.
302 let mut actual_projection = columns_to_pushdown;
303 for c in 0..func.output_arity() {
304 actual_projection.push(inner_arity + c);
305 }
306 actual_projection
307 }
308 MirRelationExpr::Filter { input, predicates } => {
309 let mut columns_to_pushdown =
310 desired_projection.iter().cloned().collect::<BTreeSet<_>>();
311 for predicate in predicates.iter() {
312 predicate.support_into(&mut columns_to_pushdown);
313 }
314 reverse_permute(predicates.iter_mut(), columns_to_pushdown.iter());
315 let columns_to_pushdown = columns_to_pushdown.into_iter().collect::<Vec<_>>();
316 self.action(input, &columns_to_pushdown, gets)?;
317 columns_to_pushdown
318 }
319 MirRelationExpr::Project { input, outputs } => {
320 // Combine `outputs` with `desired_projection`.
321 *outputs = desired_projection.iter().map(|c| outputs[*c]).collect();
322
323 let unique_outputs = outputs.iter().map(|i| *i).collect::<BTreeSet<_>>();
324 if outputs.len() == unique_outputs.len() {
325 // Push down the project as is.
326 self.action(input, outputs, gets)?;
327 *relation = input.take_dangerous();
328 } else {
329 // Push down only the unique elems in `outputs`.
330 let columns_to_pushdown = unique_outputs.into_iter().collect::<Vec<_>>();
331 reverse_permute_columns(outputs.iter_mut(), columns_to_pushdown.iter());
332 self.action(input, &columns_to_pushdown, gets)?;
333 }
334
335 desired_projection.clone()
336 }
337 MirRelationExpr::Map { input, scalars } => {
338 let arity = input.arity();
339 // contains columns whose supports have yet to be explored
340 let mut actual_projection =
341 desired_projection.iter().cloned().collect::<BTreeSet<_>>();
342 for (i, scalar) in scalars.iter().enumerate().rev() {
343 if actual_projection.contains(&(i + arity)) {
344 scalar.support_into(&mut actual_projection);
345 }
346 }
347 *scalars = (0..scalars.len())
348 .filter_map(|i| {
349 if actual_projection.contains(&(i + arity)) {
350 Some(scalars[i].clone())
351 } else {
352 None
353 }
354 })
355 .collect::<Vec<_>>();
356 reverse_permute(scalars.iter_mut(), actual_projection.iter());
357 self.action(
358 input,
359 &actual_projection
360 .iter()
361 .filter(|c| **c < arity)
362 .map(|c| *c)
363 .collect(),
364 gets,
365 )?;
366 actual_projection.into_iter().collect()
367 }
368 MirRelationExpr::Reduce {
369 input,
370 group_key,
371 aggregates,
372 monotonic: _,
373 expected_group_size: _,
374 } => {
375 let mut columns_to_pushdown = BTreeSet::new();
376 // Group keys determine aggregation granularity and are
377 // each crucial in determining aggregates and even the
378 // multiplicities of other keys.
379 for k in group_key.iter() {
380 k.support_into(&mut columns_to_pushdown)
381 }
382
383 for index in (0..aggregates.len()).rev() {
384 if !desired_projection.contains(&(group_key.len() + index)) {
385 aggregates.remove(index);
386 } else {
387 // No obvious requirements on aggregate columns.
388 // A "non-empty" requirement, I guess?
389 aggregates[index]
390 .expr
391 .support_into(&mut columns_to_pushdown)
392 }
393 }
394
395 reverse_permute(
396 itertools::chain!(
397 group_key.iter_mut(),
398 aggregates.iter_mut().map(|a| &mut a.expr)
399 ),
400 columns_to_pushdown.iter(),
401 );
402
403 self.action(
404 input,
405 &columns_to_pushdown.into_iter().collect::<Vec<_>>(),
406 gets,
407 )?;
408 let mut actual_projection =
409 desired_projection.iter().cloned().collect::<BTreeSet<_>>();
410 actual_projection.extend(0..group_key.len());
411 actual_projection.into_iter().collect()
412 }
413 MirRelationExpr::TopK {
414 input,
415 group_key,
416 order_key,
417 limit,
418 ..
419 } => {
420 // Group and order keys and limit support must be retained, as
421 // they define which rows are retained.
422 let mut columns_to_pushdown =
423 desired_projection.iter().cloned().collect::<BTreeSet<_>>();
424 columns_to_pushdown.extend(group_key.iter().cloned());
425 columns_to_pushdown.extend(order_key.iter().map(|o| o.column));
426 if let Some(limit) = limit.as_ref() {
427 // Strictly speaking not needed because the
428 // `limit` support should be a subset of the
429 // `group_key` support, but we don't want to
430 // take this for granted here.
431 limit.support_into(&mut columns_to_pushdown);
432 }
433 // If the `TopK` does not have any new column demand, just push
434 // down the desired projection. Otherwise, push down the sorted
435 // column demand.
436 let columns_to_pushdown =
437 if columns_to_pushdown.len() == desired_projection.len() {
438 desired_projection.clone()
439 } else {
440 columns_to_pushdown.into_iter().collect::<Vec<_>>()
441 };
442 reverse_permute_columns(
443 itertools::chain!(
444 group_key.iter_mut(),
445 order_key.iter_mut().map(|o| &mut o.column),
446 ),
447 columns_to_pushdown.iter(),
448 );
449 reverse_permute(limit.iter_mut(), columns_to_pushdown.iter());
450 self.action(input, &columns_to_pushdown, gets)?;
451 columns_to_pushdown
452 }
453 MirRelationExpr::Negate { input } => {
454 self.action(input, desired_projection, gets)?;
455 desired_projection.clone()
456 }
457 MirRelationExpr::Union { base, inputs } => {
458 self.action(base, desired_projection, gets)?;
459 for input in inputs {
460 self.action(input, desired_projection, gets)?;
461 }
462 desired_projection.clone()
463 }
464 MirRelationExpr::Threshold { input } => {
465 // Threshold requires all columns, as collapsing any distinct values
466 // has the potential to change how it thresholds counts. This could
467 // be improved with reasoning about distinctness or non-negativity.
468 let arity = input.arity();
469 self.action(input, &(0..arity).collect(), gets)?;
470 (0..arity).collect()
471 }
472 MirRelationExpr::ArrangeBy { input, keys: _ } => {
473 // Do not push the project past the ArrangeBy.
474 // TODO: how do we handle key sets containing column references
475 // that are not demanded upstream?
476 let arity = input.arity();
477 self.action(input, &(0..arity).collect(), gets)?;
478 (0..arity).collect()
479 }
480 };
481 let add_project = desired_projection != &actual_projection;
482 if add_project {
483 let mut projection_to_add = desired_projection.to_owned();
484 reverse_permute_columns(projection_to_add.iter_mut(), actual_projection.iter());
485 *relation = relation.take_dangerous().project(projection_to_add);
486 }
487 Ok(())
488 })
489 }
490
491 /// When we push the `desired_value_projection` at `value`,
492 /// the columns returned by `Get(get_id)` will change, so we need
493 /// to permute `Project`s around `Get(get_id)`.
494 pub fn update_projection_around_get(
495 &self,
496 relation: &mut MirRelationExpr,
497 applied_projections: &BTreeMap<Id, (Vec<usize>, ReprRelationType)>,
498 ) {
499 relation.visit_pre_mut(|e| {
500 if let MirRelationExpr::Project { input, outputs } = e {
501 if let MirRelationExpr::Get {
502 id: inner_id,
503 typ,
504 access_strategy: _,
505 } = &mut **input
506 {
507 if let Some((new_projection, new_type)) = applied_projections.get(inner_id) {
508 typ.clone_from(new_type);
509 reverse_permute_columns(outputs.iter_mut(), new_projection.iter());
510 if outputs.len() == new_projection.len()
511 && outputs.iter().enumerate().all(|(i, o)| i == *o)
512 {
513 *e = input.take_dangerous();
514 }
515 }
516 }
517 }
518 // If there is no `Project` around a Get, all columns of
519 // `Get(get_id)` are required. Thus, the columns returned by
520 // `Get(get_id)` will not have changed, so no action
521 // is necessary.
522 });
523 }
524}
525
526/// If `func` is an integer `generate_series` whose output values are unused,
527/// rewrites it in place into a `RepeatRowNonNegative` that emits the same number
528/// of rows. The caller must have established that the generated column is not
529/// demanded.
530///
531/// `generate_series(start, stop, step)` is inclusive of `stop`, so its
532/// cardinality is `max(0, floor((stop - start) / step) + 1)`. We only rewrite
533/// when `step` is a known non-zero literal, which lets us specialize on its
534/// sign: truncating integer division (what `DivInt64` does) coincides with
535/// floor division exactly when the dividend and divisor share a sign, which the
536/// emptiness guard guarantees. The guard also collapses the empty series to a
537/// count of zero, so the result is always non-negative.
538///
539/// When `start` and `stop` are also literals, the cardinality is computed here,
540/// exactly, in `i128` (where no `i64` inputs can overflow). If it does not fit
541/// in an `i64` we decline to rewrite: the original `FlatMap` enumerates such a
542/// series without error (its iteration only ever visits in-range values), so
543/// its replacement must not error either.
544///
545/// When the bounds are not literals we synthesize the arithmetic, and the width
546/// we synthesize it in depends on `step`:
547///
548/// * `|step| == 1`: `i64`, and the division (the identity or a negation) is
549/// elided, so the count is just `(a - b) + 1`. A literal subtracted bound is
550/// folded into `a + (1 - b)` — `a` itself for `generate_series(1, n)`. Here
551/// `a - b` overflows `i64` only once the span reaches `2^63`, which means the
552/// series has more than `i64::MAX` rows — infeasible, exactly what the literal
553/// path declines. So `i64` arithmetic is exact for every feasible series, and
554/// an overflow only ever stands in for an effectively non-terminating
555/// enumeration.
556///
557/// * `|step| >= 2`: `numeric`. A coarse step over near-opposite `i64` extremes
558/// yields a *feasible* series (few rows) whose span nonetheless overflows
559/// `i64`, so `i64` arithmetic would error where the original does not.
560/// `numeric` holds the full `~2^64` span comfortably and represents the
561/// quotient exactly (it needs at most ~20 significant digits, well under
562/// `numeric`'s 39), so the only place this can error is the final cast back to
563/// `i64` — which happens exactly when the count itself exceeds `i64`, the same
564/// infeasible case the literal path declines.
565///
566/// Null inputs are handled by `RepeatRowNonNegative` itself: like
567/// `generate_series`, it is `empty_on_null_input`, so a null count yields no
568/// rows.
569fn collapse_unused_generate_series(func: &mut TableFunc, exprs: &mut Vec<MirScalarExpr>) {
570 // Only integer `generate_series` is handled here.
571 if !matches!(
572 func,
573 TableFunc::GenerateSeriesInt32 | TableFunc::GenerateSeriesInt64
574 ) {
575 return;
576 }
577 // Extracts a non-null integer literal as an `i64`, accepting any integer
578 // width (an argument's datum type does not necessarily match the series
579 // width).
580 let literal_i64 = |e: &MirScalarExpr| match e.as_literal() {
581 Some(Ok(Datum::Int16(v))) => Some(i64::from(v)),
582 Some(Ok(Datum::Int32(v))) => Some(i64::from(v)),
583 Some(Ok(Datum::Int64(v))) => Some(v),
584 _ => None,
585 };
586 // Bail unless the step is a non-null, non-zero literal.
587 let step = match literal_i64(&exprs[2]) {
588 Some(step) => step,
589 None => return,
590 };
591 if step == 0 {
592 // `generate_series` errors on a zero step; leave it to do so.
593 return;
594 }
595
596 // Literal bounds: compute the exact cardinality now, in `i128`, where no
597 // `i64` inputs can overflow.
598 if let (Some(start), Some(stop)) = (literal_i64(&exprs[0]), literal_i64(&exprs[1])) {
599 let empty = if step > 0 { stop < start } else { stop > start };
600 let count: i128 = if empty {
601 0
602 } else {
603 // Same-sign dividend and divisor, so truncating division is floor
604 // division.
605 (i128::from(stop) - i128::from(start)) / i128::from(step) + 1
606 };
607 if let Ok(count) = i64::try_from(count) {
608 *func = TableFunc::RepeatRowNonNegative;
609 *exprs = vec![MirScalarExpr::literal_ok(
610 Datum::Int64(count),
611 ReprScalarType::Int64,
612 )];
613 }
614 // A count beyond `i64` is left alone rather than collapsed: the
615 // original enumerates it without error, so the rewrite must too.
616 return;
617 }
618
619 let int32 = matches!(func, TableFunc::GenerateSeriesInt32);
620 let lit = |v: i64| MirScalarExpr::literal_ok(Datum::Int64(v), ReprScalarType::Int64);
621
622 // The series is non-empty exactly when `stop` is on the far side of `start`
623 // in the direction of `step`. Comparing the original (un-widened) bounds is
624 // overflow-free at any width.
625 let non_empty = if step > 0 {
626 exprs[1].clone().call_binary(exprs[0].clone(), func::Gte)
627 } else {
628 exprs[1].clone().call_binary(exprs[0].clone(), func::Lte)
629 };
630
631 // floor((stop - start) / step) + 1, valid (== truncating division) under the
632 // `non_empty` guard. With `|step| == 1` an overflow of `stop - start` only
633 // occurs for infeasible series, so cheap `i64` arithmetic is safe; with
634 // `|step| >= 2` a feasible series can still overflow `i64`, so we compute in
635 // `numeric` (see the doc comment).
636 let cardinality = if step.abs() == 1 {
637 // `|step| == 1` makes the division in `floor((stop - start) / step) + 1`
638 // either the identity (`step == 1`) or a negation (`step == -1`), so we
639 // write the count as `(a - b) + 1` directly, with no `DivInt64`:
640 // step == 1: `(a, b) = (stop, start)`
641 // step == -1: `(a, b) = (start, stop)`
642 // Widen 32-bit bounds to the `i64` count `RepeatRowNonNegative` expects.
643 let to_i64 = |e: MirScalarExpr| {
644 if int32 {
645 e.call_unary(func::CastInt32ToInt64)
646 } else {
647 e
648 }
649 };
650 let (a_idx, b_idx) = if step > 0 { (1, 0) } else { (0, 1) };
651 let a = to_i64(exprs[a_idx].clone());
652 // When the subtracted bound `b` is a literal, fold `(a - b) + 1` into the
653 // single constant `a + (1 - b)`, which is just `a` when `b == 1` (the
654 // common `generate_series(1, n)` shape). Decline the fold when `1 - b`
655 // overflows `i64`; the `(a - b) + 1` fallback then carries the same
656 // infeasible-only overflow as any other `|step| == 1` count.
657 match literal_i64(&exprs[b_idx]).and_then(|b| i64::try_from(1i128 - i128::from(b)).ok()) {
658 Some(0) => a,
659 Some(c) => a.call_binary(lit(c), func::AddInt64),
660 None => {
661 let b = to_i64(exprs[b_idx].clone());
662 a.call_binary(b, func::SubInt64)
663 .call_binary(lit(1), func::AddInt64)
664 }
665 }
666 } else {
667 // Cast the bounds to `numeric` so the span can't overflow, then floor the
668 // quotient (exact here) and cast back to the `i64` count.
669 let to_numeric = |e: MirScalarExpr| {
670 if int32 {
671 e.call_unary(func::CastInt32ToNumeric(None))
672 } else {
673 e.call_unary(func::CastInt64ToNumeric(None))
674 }
675 };
676 let numeric_lit = |v: i64| lit(v).call_unary(func::CastInt64ToNumeric(None));
677 let start = to_numeric(exprs[0].clone());
678 let stop = to_numeric(exprs[1].clone());
679 stop.call_binary(start, func::SubNumeric)
680 .call_binary(numeric_lit(step), func::DivNumeric)
681 .call_unary(func::FloorNumeric)
682 .call_binary(numeric_lit(1), func::AddNumeric)
683 .call_unary(func::CastNumericToInt64)
684 };
685
686 *func = TableFunc::RepeatRowNonNegative;
687 *exprs = vec![non_empty.if_then_else(cardinality, lit(0))];
688}
689
690/// Applies the reverse of [MirScalarExpr.permute] on each expression.
691///
692/// `permutation` can be thought of as a mapping of column references from
693/// `stateA` to `stateB`. [MirScalarExpr.permute] assumes that the column
694/// references of the expression are in `stateA` and need to be remapped to
695/// their `stateB` counterparts. This methods assumes that the column
696/// references are in `stateB` and need to be remapped to `stateA`.
697///
698/// The `outputs` field of [MirRelationExpr::Project] is a mapping from "after"
699/// to "before". Thus, when lifting projections, you would permute on `outputs`,
700/// but you need to reverse permute when pushing projections down.
701fn reverse_permute<'a, I, J>(exprs: I, permutation: J)
702where
703 I: Iterator<Item = &'a mut MirScalarExpr>,
704 J: Iterator<Item = &'a usize>,
705{
706 let reverse_col_map = permutation
707 .enumerate()
708 .map(|(idx, c)| (*c, idx))
709 .collect::<BTreeMap<_, _>>();
710 for expr in exprs {
711 expr.permute_map(&reverse_col_map);
712 }
713}
714
715/// Same as [reverse_permute], but takes column numbers as input
716fn reverse_permute_columns<'a, I, J>(columns: I, permutation: J)
717where
718 I: Iterator<Item = &'a mut usize>,
719 J: Iterator<Item = &'a usize>,
720{
721 let reverse_col_map = permutation
722 .enumerate()
723 .map(|(idx, c)| (*c, idx))
724 .collect::<BTreeMap<_, _>>();
725 for c in columns {
726 *c = reverse_col_map[c];
727 }
728}