mz_transform/fusion/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Fuses reduce operators with parent operators if possible.
11use mz_expr::{MirRelationExpr, MirScalarExpr};
12
13use crate::{TransformCtx, TransformError};
14
15/// Fuses reduce operators with parent operators if possible.
16#[derive(Debug)]
17pub struct Reduce;
18
19impl crate::Transform for Reduce {
20    fn name(&self) -> &'static str {
21        "ReduceFusion"
22    }
23
24    #[mz_ore::instrument(
25        target = "optimizer",
26        level = "debug",
27        fields(path.segment = "reduce_fusion")
28    )]
29    fn actually_perform_transform(
30        &self,
31        relation: &mut MirRelationExpr,
32        _: &mut TransformCtx,
33    ) -> Result<(), TransformError> {
34        let result = relation.visit_pre_mut(|e| self.action(e));
35        mz_repr::explain::trace_plan(&*relation);
36        Ok(result)
37    }
38}
39
40impl Reduce {
41    /// Fuses reduce operators with parent operators if possible.
42    pub fn action(&self, relation: &mut MirRelationExpr) {
43        if let MirRelationExpr::Reduce {
44            input,
45            group_key,
46            aggregates,
47            monotonic: _,
48            expected_group_size: _,
49        } = relation
50        {
51            if let MirRelationExpr::Reduce {
52                input: inner_input,
53                group_key: inner_group_key,
54                aggregates: inner_aggregates,
55                monotonic: _,
56                expected_group_size: _,
57            } = &mut **input
58            {
59                // Collect all columns referenced by outer
60                let mut outer_cols = vec![];
61                for expr in group_key.iter() {
62                    expr.visit_pre(|e| {
63                        if let MirScalarExpr::Column(i) = e {
64                            outer_cols.push(*i);
65                        }
66                    });
67                }
68
69                // We can fuse reduce operators as long as the outer one doesn't
70                // group by an aggregation performed by the inner one.
71                if outer_cols.iter().any(|c| *c >= inner_group_key.len()) {
72                    return;
73                }
74
75                if aggregates.is_empty() && inner_aggregates.is_empty() {
76                    // Replace inner reduce with map + project (no grouping)
77                    let mut outputs = vec![];
78                    let mut scalars = vec![];
79
80                    let arity = inner_input.arity();
81                    for e in inner_group_key {
82                        if let MirScalarExpr::Column(i) = e {
83                            outputs.push(*i);
84                        } else {
85                            outputs.push(arity + scalars.len());
86                            scalars.push(e.clone());
87                        }
88                    }
89
90                    **input = inner_input.take_dangerous().map(scalars).project(outputs);
91                }
92            }
93        }
94    }
95}