mz_transform/fusion/
reduce.rs
1use mz_expr::{MirRelationExpr, MirScalarExpr};
12
13use crate::{TransformCtx, TransformError};
14
15#[derive(Debug)]
17pub struct Reduce;
18
19impl crate::Transform for Reduce {
20 fn name(&self) -> &'static str {
21 "ReduceFusion"
22 }
23
24 #[mz_ore::instrument(
25 target = "optimizer",
26 level = "debug",
27 fields(path.segment = "reduce_fusion")
28 )]
29 fn actually_perform_transform(
30 &self,
31 relation: &mut MirRelationExpr,
32 _: &mut TransformCtx,
33 ) -> Result<(), TransformError> {
34 let result = relation.visit_pre_mut(|e| self.action(e));
35 mz_repr::explain::trace_plan(&*relation);
36 Ok(result)
37 }
38}
39
40impl Reduce {
41 pub fn action(&self, relation: &mut MirRelationExpr) {
43 if let MirRelationExpr::Reduce {
44 input,
45 group_key,
46 aggregates,
47 monotonic: _,
48 expected_group_size: _,
49 } = relation
50 {
51 if let MirRelationExpr::Reduce {
52 input: inner_input,
53 group_key: inner_group_key,
54 aggregates: inner_aggregates,
55 monotonic: _,
56 expected_group_size: _,
57 } = &mut **input
58 {
59 let mut outer_cols = vec![];
61 for expr in group_key.iter() {
62 expr.visit_pre(|e| {
63 if let MirScalarExpr::Column(i) = e {
64 outer_cols.push(*i);
65 }
66 });
67 }
68
69 if outer_cols.iter().any(|c| *c >= inner_group_key.len()) {
72 return;
73 }
74
75 if aggregates.is_empty() && inner_aggregates.is_empty() {
76 let mut outputs = vec![];
78 let mut scalars = vec![];
79
80 let arity = inner_input.arity();
81 for e in inner_group_key {
82 if let MirScalarExpr::Column(i) = e {
83 outputs.push(*i);
84 } else {
85 outputs.push(arity + scalars.len());
86 scalars.push(e.clone());
87 }
88 }
89
90 **input = inner_input.take_dangerous().map(scalars).project(outputs);
91 }
92 }
93 }
94 }
95}