1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Breaks complex `Reduce` variants into a join of simpler variants.
//!
//! Specifically, any `Reduce` that contains two different "types" of aggregation,
//! in the sense of `ReductionType`, will be broken in to one `Reduce` for each
//! type of aggregation, each containing the aggregations of that type,
//! and the results are then joined back together.
use crate::TransformCtx;
use mz_compute_types::plan::reduce::reduction_type;
use mz_expr::MirRelationExpr;
/// Breaks complex `Reduce` variants into a join of simpler variants.
#[derive(Debug)]
pub struct ReduceReduction;
impl crate::Transform for ReduceReduction {
fn name(&self) -> &'static str {
"ReduceReduction"
}
/// Transforms an expression through accumulated knowledge.
#[mz_ore::instrument(
target = "optimizer",
level = "debug",
fields(path.segment = "reduce_reduction")
)]
fn actually_perform_transform(
&self,
relation: &mut MirRelationExpr,
ctx: &mut TransformCtx,
) -> Result<(), crate::TransformError> {
if ctx.features.enable_reduce_reduction {
relation.visit_pre_mut(&mut Self::action);
mz_repr::explain::trace_plan(&*relation);
}
Ok(())
}
}
impl ReduceReduction {
/// Breaks complex `Reduce` variants into a join of simpler variants.
pub fn action(relation: &mut MirRelationExpr) {
if let MirRelationExpr::Reduce {
input,
group_key,
aggregates,
monotonic,
expected_group_size,
} = relation
{
// We start by segmenting the aggregates into those that should be rendered independently.
// Each element of this list is a pair of lists describing a bundle of aggregations that
// should be applied independently. Each pair of lists correspond to the aggregaties and
// the column positions in which they should appear in the output.
// Perhaps these should be lists of pairs, to ensure they align, but their subsequent use
// is as the shredded lists.
let mut segmented_aggregates: Vec<(Vec<mz_expr::AggregateExpr>, Vec<usize>)> =
Vec::new();
// Our rendering currently produces independent dataflow paths for 1. all accumulable aggregations,
// 2. all hierarchical aggregations, and 3. *each* basic aggregation.
// We'll form groups for accumulable, hierarchical, and a list of basic aggregates.
let mut accumulable = (Vec::new(), Vec::new());
let mut hierarchical = (Vec::new(), Vec::new());
use mz_compute_types::plan::reduce::ReductionType;
for (index, aggr) in aggregates.iter().enumerate() {
match reduction_type(&aggr.func) {
ReductionType::Accumulable => {
accumulable.0.push(aggr.clone());
accumulable.1.push(group_key.len() + index);
}
ReductionType::Hierarchical => {
hierarchical.0.push(aggr.clone());
hierarchical.1.push(group_key.len() + index);
}
ReductionType::Basic => segmented_aggregates
.push((vec![aggr.clone()], vec![group_key.len() + index])),
}
}
// Fold in hierarchical and accumulable aggregates.
if !hierarchical.0.is_empty() {
segmented_aggregates.push(hierarchical);
}
if !accumulable.0.is_empty() {
segmented_aggregates.push(accumulable);
}
segmented_aggregates.sort();
// Do nothing unless there are at least two distinct types of aggregations.
if segmented_aggregates.len() < 2 {
return;
}
// For each type of aggregation we'll plan the corresponding `Reduce`,
// and then join the at-least-two `Reduce` stages together.
// TODO: Perhaps we should introduce a `Let` stage rather than clone the input?
let mut reduces = Vec::with_capacity(segmented_aggregates.len());
// Track the current and intended locations of each output column.
let mut columns = Vec::new();
for (aggrs, indexes) in segmented_aggregates {
columns.extend(0..group_key.len());
columns.extend(indexes);
reduces.push(MirRelationExpr::Reduce {
input: input.clone(),
group_key: group_key.clone(),
aggregates: aggrs,
monotonic: *monotonic,
expected_group_size: *expected_group_size,
});
}
// Now build a `Join` of the reduces, on their keys, followed by a permutation of their aggregates.
// Equate all `group_key` columns in all inputs.
let mut equivalences = vec![Vec::with_capacity(reduces.len()); group_key.len()];
for column in 0..group_key.len() {
for input in 0..reduces.len() {
equivalences[column].push((input, column));
}
}
// Determine projection that puts aggregate columns in their intended locations,
// and projects away repeated key columns.
let max_column = columns.iter().max().expect("Non-empty aggregates expected");
let mut projection = Vec::with_capacity(max_column + 1);
for column in 0..max_column + 1 {
projection.push(columns.iter().position(|c| *c == column).unwrap())
}
// Now make the join.
*relation = MirRelationExpr::join(reduces, equivalences).project(projection);
}
}
}