mz_transform/canonicalization/projection_extraction.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Transform column references in a `Map` into a `Project`.
use mz_expr::visit::Visit;
use mz_expr::{MirRelationExpr, MirScalarExpr};
use crate::TransformCtx;
/// Transform column references in a `Map` into a `Project`, or repeated
/// aggregations in a `Reduce` into a `Project`.
#[derive(Debug)]
pub struct ProjectionExtraction;
impl crate::Transform for ProjectionExtraction {
fn name(&self) -> &'static str {
"ProjectionExtraction"
}
#[mz_ore::instrument(
target = "optimizer",
level = "debug",
fields(path.segment = "projection_extraction")
)]
fn actually_perform_transform(
&self,
relation: &mut MirRelationExpr,
_: &mut TransformCtx,
) -> Result<(), crate::TransformError> {
relation.visit_mut_post(&mut Self::action)?;
mz_repr::explain::trace_plan(&*relation);
Ok(())
}
}
impl ProjectionExtraction {
/// Transform column references in a `Map` into a `Project`.
pub fn action(relation: &mut MirRelationExpr) {
if let MirRelationExpr::Map { input, scalars } = relation {
if scalars
.iter()
.any(|s| matches!(s, MirScalarExpr::Column(_)))
{
let input_arity = input.arity();
let mut outputs: Vec<_> = (0..input_arity).collect();
let mut dropped = 0;
scalars.retain(|scalar| {
if let MirScalarExpr::Column(col) = scalar {
dropped += 1;
// We may need to chase down a few levels of indirection;
// find the original input column in `outputs[*col]`.
outputs.push(outputs[*col]);
false // don't retain
} else {
outputs.push(outputs.len() - dropped);
true // retain
}
});
if dropped > 0 {
for scalar in scalars {
scalar.permute(&outputs);
}
*relation = relation.take_dangerous().project(outputs);
}
}
} else if let MirRelationExpr::Reduce {
input: _,
group_key,
aggregates,
monotonic: _,
expected_group_size: _,
} = relation
{
let mut projection = Vec::new();
// If any key is an exact duplicate, we can remove it and use a projection.
let mut finger = 0;
while finger < group_key.len() {
if let Some(position) = group_key[..finger]
.iter()
.position(|x| x == &group_key[finger])
{
projection.push(position);
group_key.remove(finger);
} else {
projection.push(finger);
finger += 1;
}
}
// If any entry of aggregates exists earlier in aggregates, we can remove it
// and replace it with a projection that points to the first instance of it.
let mut finger = 0;
while finger < aggregates.len() {
if let Some(position) = aggregates[..finger]
.iter()
.position(|x| x == &aggregates[finger])
{
projection.push(group_key.len() + position);
aggregates.remove(finger);
} else {
projection.push(group_key.len() + finger);
finger += 1;
}
}
if projection.iter().enumerate().any(|(i, p)| i != *p) {
*relation = relation.take_dangerous().project(projection);
}
}
}
}