mz_compute_types/plan/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//!    Accumulable reductions can be computed inline in a Differential update's `difference`
32//!    field because they basically boil down to tracking counts of things. `sum()` is an
33//!    example of an accumulable reduction, and when some element `x` is removed from the set
34//!    of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//!    formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//!    Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//!    they are still commutative and associative, which lets us compute the reduction over subsets
39//!    of the input, and then compute the reduction again on those results. For example:
40//!    `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//!    reductions this way, we can maintain the computation in sublinear time with respect to
42//!    the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//!    hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//!    but in order to benefit from being computed hierarchically, they need to have some reduction
45//!    in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//!    hierarchical evaluation.
47//!
48//!    When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//!    hierarchically can instead be computed in-place, because we only need to keep the value that's
50//!    better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//!    Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//!    hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//!    we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use std::collections::BTreeMap;
64
65use mz_expr::{
66    AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
67};
68use mz_ore::{assert_none, soft_assert_or_log};
69use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
70use proptest::prelude::{Arbitrary, BoxedStrategy, any};
71use proptest::strategy::Strategy;
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
76
77include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.reduce.rs"));
78
79/// This enum represents the three potential types of aggregations.
80#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
81pub enum ReductionType {
82    /// Accumulable functions can be subtracted from (are invertible), and associative.
83    /// We can compute these results by moving some data to the diff field under arbitrary
84    /// changes to inputs. Examples include sum or count.
85    Accumulable,
86    /// Hierarchical functions are associative, which means we can split up the work of
87    /// computing them across subsets. Note that hierarchical reductions should also
88    /// reduce the data in some way, as otherwise rendering them hierarchically is not
89    /// worth it. Examples include min or max.
90    Hierarchical,
91    /// Basic, for lack of a better word, are functions that are neither accumulable
92    /// nor hierarchical. Examples include jsonb_agg.
93    Basic,
94}
95
96impl columnation::Columnation for ReductionType {
97    type InnerRegion = columnation::CopyRegion<ReductionType>;
98}
99
100impl RustType<ProtoReductionType> for ReductionType {
101    fn into_proto(&self) -> ProtoReductionType {
102        use proto_reduction_type::Kind;
103        ProtoReductionType {
104            kind: Some(match self {
105                ReductionType::Accumulable => Kind::Accumulable(()),
106                ReductionType::Hierarchical => Kind::Hierarchical(()),
107                ReductionType::Basic => Kind::Basic(()),
108            }),
109        }
110    }
111
112    fn from_proto(proto: ProtoReductionType) -> Result<Self, TryFromProtoError> {
113        use proto_reduction_type::Kind;
114        let kind = proto
115            .kind
116            .ok_or_else(|| TryFromProtoError::missing_field("kind"))?;
117        Ok(match kind {
118            Kind::Accumulable(()) => ReductionType::Accumulable,
119            Kind::Hierarchical(()) => ReductionType::Hierarchical,
120            Kind::Basic(()) => ReductionType::Basic,
121        })
122    }
123}
124
125impl TryFrom<&ReducePlan> for ReductionType {
126    type Error = ();
127
128    fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
129        match plan {
130            ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
131            ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
132            ReducePlan::Basic(_) => Ok(ReductionType::Basic),
133            _ => Err(()),
134        }
135    }
136}
137
138/// A `ReducePlan` provides a concise description for how we will
139/// execute a given reduce expression.
140///
141/// The provided reduce expression can have no
142/// aggregations, in which case its just a `Distinct` and otherwise
143/// it's composed of a combination of accumulable, hierarchical and
144/// basic aggregations.
145///
146/// We want to try to centralize as much decision making about the
147/// shape / general computation of the rendered dataflow graph
148/// in this plan, and then make actually rendering the graph
149/// be as simple (and compiler verifiable) as possible.
150#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
151pub enum ReducePlan {
152    /// Plan for not computing any aggregations, just determining the set of
153    /// distinct keys.
154    Distinct,
155    /// Plan for computing only accumulable aggregations.
156    Accumulable(AccumulablePlan),
157    /// Plan for computing only hierarchical aggregations.
158    Hierarchical(HierarchicalPlan),
159    /// Plan for computing only basic aggregations.
160    Basic(BasicPlan),
161    /// Plan for computing a mix of different kinds of aggregations.
162    /// We need to do extra work here to reassemble results back in the
163    /// requested order.
164    Collation(CollationPlan),
165}
166
167proptest::prop_compose! {
168    /// `expected_group_size` is a u64, but instead of a uniform distribution,
169    /// we want a logarithmic distribution so that we have an even distribution
170    /// in the number of layers of buckets that a hierarchical plan would have.
171    fn any_group_size()
172        (bits in 0..u64::BITS)
173        (integer in (((1_u64) << bits) - 1)
174            ..(if bits == (u64::BITS - 1){ u64::MAX }
175                else { (1_u64) << (bits + 1) - 1 }))
176    -> u64 {
177        integer
178    }
179}
180
181/// To avoid stack overflow, this limits the arbitrarily-generated test
182/// `ReducePlan`s to involve at most 8 aggregations.
183///
184/// To have better coverage of realistic expected group sizes, the
185/// `expected group size` has a logarithmic distribution.
186impl Arbitrary for ReducePlan {
187    type Parameters = ();
188
189    type Strategy = BoxedStrategy<Self>;
190
191    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
192        (
193            proptest::collection::vec(any::<AggregateExpr>(), 0..8),
194            any::<bool>(),
195            any::<bool>(),
196            any_group_size(),
197            any::<bool>(),
198        )
199            .prop_map(
200                |(
201                    exprs,
202                    monotonic,
203                    any_expected_size,
204                    expected_group_size,
205                    mut fused_unnest_list,
206                )| {
207                    let expected_group_size = if any_expected_size {
208                        Some(expected_group_size)
209                    } else {
210                        None
211                    };
212                    if !(exprs.len() == 1
213                        && matches!(reduction_type(&exprs[0].func), ReductionType::Basic))
214                    {
215                        fused_unnest_list = false;
216                    }
217                    ReducePlan::create_from(
218                        exprs,
219                        monotonic,
220                        expected_group_size,
221                        fused_unnest_list,
222                    )
223                },
224            )
225            .boxed()
226    }
227}
228
229impl RustType<ProtoReducePlan> for ReducePlan {
230    fn into_proto(&self) -> ProtoReducePlan {
231        use proto_reduce_plan::Kind::*;
232        ProtoReducePlan {
233            kind: Some(match self {
234                ReducePlan::Distinct => Distinct(()),
235                ReducePlan::Accumulable(plan) => Accumulable(plan.into_proto()),
236                ReducePlan::Hierarchical(plan) => Hierarchical(plan.into_proto()),
237                ReducePlan::Basic(plan) => Basic(plan.into_proto()),
238                ReducePlan::Collation(plan) => Collation(plan.into_proto()),
239            }),
240        }
241    }
242
243    fn from_proto(proto: ProtoReducePlan) -> Result<Self, TryFromProtoError> {
244        use proto_reduce_plan::Kind::*;
245        let kind = proto
246            .kind
247            .ok_or_else(|| TryFromProtoError::missing_field("ProtoReducePlan::kind"))?;
248        Ok(match kind {
249            Distinct(()) => ReducePlan::Distinct,
250            Accumulable(plan) => ReducePlan::Accumulable(plan.into_rust()?),
251            Hierarchical(plan) => ReducePlan::Hierarchical(plan.into_rust()?),
252            Basic(plan) => ReducePlan::Basic(plan.into_rust()?),
253            Collation(plan) => ReducePlan::Collation(plan.into_rust()?),
254        })
255    }
256}
257
258/// Plan for computing a set of accumulable aggregations.
259///
260/// We fuse all of the accumulable aggregations together
261/// and compute them with one dataflow fragment. We need to
262/// be careful to separate out the aggregations that
263/// apply only to the distinct set of values. We need
264/// to apply a distinct operator to those before we
265/// combine them with everything else.
266#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
267pub struct AccumulablePlan {
268    /// All of the aggregations we were asked to compute, stored
269    /// in order.
270    pub full_aggrs: Vec<AggregateExpr>,
271    /// All of the non-distinct accumulable aggregates.
272    /// Each element represents:
273    /// (index of the aggregation among accumulable aggregations,
274    ///  index of the datum among inputs, aggregation expr)
275    /// These will all be rendered together in one dataflow fragment.
276    pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
277    /// Same as above but for all of the `DISTINCT` accumulable aggregations.
278    pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
279}
280
281impl RustType<proto_accumulable_plan::ProtoAggr> for (usize, usize, AggregateExpr) {
282    fn into_proto(&self) -> proto_accumulable_plan::ProtoAggr {
283        proto_accumulable_plan::ProtoAggr {
284            index_agg: self.0.into_proto(),
285            index_inp: self.1.into_proto(),
286            expr: Some(self.2.into_proto()),
287        }
288    }
289
290    fn from_proto(proto: proto_accumulable_plan::ProtoAggr) -> Result<Self, TryFromProtoError> {
291        Ok((
292            proto.index_agg.into_rust()?,
293            proto.index_inp.into_rust()?,
294            proto.expr.into_rust_if_some("ProtoAggr::expr")?,
295        ))
296    }
297}
298
299impl RustType<ProtoAccumulablePlan> for AccumulablePlan {
300    fn into_proto(&self) -> ProtoAccumulablePlan {
301        ProtoAccumulablePlan {
302            full_aggrs: self.full_aggrs.into_proto(),
303            simple_aggrs: self.simple_aggrs.into_proto(),
304            distinct_aggrs: self.distinct_aggrs.into_proto(),
305        }
306    }
307
308    fn from_proto(proto: ProtoAccumulablePlan) -> Result<Self, TryFromProtoError> {
309        Ok(Self {
310            full_aggrs: proto.full_aggrs.into_rust()?,
311            simple_aggrs: proto.simple_aggrs.into_rust()?,
312            distinct_aggrs: proto.distinct_aggrs.into_rust()?,
313        })
314    }
315}
316
317/// Plan for computing a set of hierarchical aggregations.
318///
319/// In the append-only setting we can render them in-place
320/// with monotonic plans, but otherwise, we need to render
321/// them with a reduction tree that splits the inputs into
322/// small, and then progressively larger, buckets
323#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
324pub enum HierarchicalPlan {
325    /// Plan hierarchical aggregations under monotonic inputs.
326    Monotonic(MonotonicPlan),
327    /// Plan for hierarchical aggregations under non-monotonic inputs.
328    Bucketed(BucketedPlan),
329}
330
331impl HierarchicalPlan {
332    /// Returns the set of aggregations computed by this plan.
333    pub fn aggr_funcs(&self) -> &[AggregateFunc] {
334        match self {
335            HierarchicalPlan::Monotonic(plan) => &plan.aggr_funcs,
336            HierarchicalPlan::Bucketed(plan) => &plan.aggr_funcs,
337        }
338    }
339
340    /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
341    /// and sets consolidation requirements.
342    pub fn as_monotonic(&mut self, must_consolidate: bool) {
343        match self {
344            HierarchicalPlan::Bucketed(bucketed) => {
345                // TODO: ideally we would not have the `clone()` but ownership
346                // seems fraught here as we are behind a `&mut self` reference.
347                *self =
348                    HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
349            }
350            HierarchicalPlan::Monotonic(monotonic) => {
351                monotonic.must_consolidate = must_consolidate;
352            }
353        }
354    }
355}
356
357impl RustType<ProtoHierarchicalPlan> for HierarchicalPlan {
358    fn into_proto(&self) -> ProtoHierarchicalPlan {
359        use proto_hierarchical_plan::Kind;
360        ProtoHierarchicalPlan {
361            kind: Some(match self {
362                HierarchicalPlan::Monotonic(plan) => Kind::Monotonic(plan.into_proto()),
363                HierarchicalPlan::Bucketed(plan) => Kind::Bucketed(plan.into_proto()),
364            }),
365        }
366    }
367
368    fn from_proto(proto: ProtoHierarchicalPlan) -> Result<Self, TryFromProtoError> {
369        use proto_hierarchical_plan::Kind;
370        let kind = proto
371            .kind
372            .ok_or_else(|| TryFromProtoError::missing_field("ProtoHierarchicalPlan::Kind"))?;
373        Ok(match kind {
374            Kind::Monotonic(plan) => HierarchicalPlan::Monotonic(plan.into_rust()?),
375            Kind::Bucketed(plan) => HierarchicalPlan::Bucketed(plan.into_rust()?),
376        })
377    }
378}
379
380/// Plan for computing a set of hierarchical aggregations with a
381/// monotonic input.
382///
383/// Here, the aggregations will be rendered in place. We don't
384/// need to worry about retractions because the inputs are
385/// append only, so we can change our computation to
386/// only retain the "best" value in the diff field, instead
387/// of holding onto all values.
388#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
389pub struct MonotonicPlan {
390    /// All of the aggregations we were asked to compute.
391    pub aggr_funcs: Vec<AggregateFunc>,
392    /// Set of "skips" or calls to `nth()` an iterator needs to do over
393    /// the input to extract the relevant datums.
394    pub skips: Vec<usize>,
395    /// True if the input is not physically monotonic, and the operator must perform
396    /// consolidation to remove potential negations. The operator implementation is
397    /// free to consolidate as late as possible while ensuring correctness, so it is
398    /// not a requirement that the input be directly subjected to consolidation.
399    /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
400    ///
401    /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
402    pub must_consolidate: bool,
403}
404
405impl RustType<ProtoMonotonicPlan> for MonotonicPlan {
406    fn into_proto(&self) -> ProtoMonotonicPlan {
407        ProtoMonotonicPlan {
408            aggr_funcs: self.aggr_funcs.into_proto(),
409            skips: self.skips.into_proto(),
410            must_consolidate: self.must_consolidate.into_proto(),
411        }
412    }
413
414    fn from_proto(proto: ProtoMonotonicPlan) -> Result<Self, TryFromProtoError> {
415        Ok(Self {
416            aggr_funcs: proto.aggr_funcs.into_rust()?,
417            skips: proto.skips.into_rust()?,
418            must_consolidate: proto.must_consolidate.into_rust()?,
419        })
420    }
421}
422
423/// Plan for computing a set of hierarchical aggregations
424/// with non-monotonic inputs.
425///
426/// To perform hierarchical aggregations with stable runtimes
427/// under updates we'll subdivide the group key into buckets, compute
428/// the reduction in each of those subdivided buckets and then combine
429/// the results into a coarser bucket (one that represents a larger
430/// fraction of the original input) and redo the reduction in another
431/// layer. Effectively, we'll construct a min / max heap out of a series
432/// of reduce operators (each one is a separate layer).
433#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
434pub struct BucketedPlan {
435    /// All of the aggregations we were asked to compute.
436    pub aggr_funcs: Vec<AggregateFunc>,
437    /// Set of "skips" or calls to `nth()` an iterator needs to do over
438    /// the input to extract the relevant datums.
439    pub skips: Vec<usize>,
440    /// The number of buckets in each layer of the reduction tree. Should
441    /// be decreasing, and ideally, a power of two so that we can easily
442    /// distribute values to buckets with `value.hashed() % buckets[layer]`.
443    pub buckets: Vec<u64>,
444}
445
446impl BucketedPlan {
447    /// Convert to a monotonic plan, indicate whether the operator must apply
448    /// consolidation to its input.
449    fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
450        MonotonicPlan {
451            aggr_funcs: self.aggr_funcs,
452            skips: self.skips,
453            must_consolidate,
454        }
455    }
456}
457
458impl RustType<ProtoBucketedPlan> for BucketedPlan {
459    fn into_proto(&self) -> ProtoBucketedPlan {
460        ProtoBucketedPlan {
461            aggr_funcs: self.aggr_funcs.into_proto(),
462            skips: self.skips.into_proto(),
463            buckets: self.buckets.clone(),
464        }
465    }
466
467    fn from_proto(proto: ProtoBucketedPlan) -> Result<Self, TryFromProtoError> {
468        Ok(Self {
469            aggr_funcs: proto.aggr_funcs.into_rust()?,
470            skips: proto.skips.into_rust()?,
471            buckets: proto.buckets,
472        })
473    }
474}
475
476/// Plan for computing a set of basic aggregations.
477///
478/// There's much less complexity when rendering basic aggregations.
479/// Each aggregation corresponds to one Differential reduce operator.
480/// That's it. However, we still want to present one final arrangement
481/// so basic aggregations present results with the same interface
482/// (one arrangement containing a row with all results) that accumulable
483/// and hierarchical aggregations do. To provide that, we render an
484/// additional reduce operator whenever we have multiple reduce aggregates
485/// to combine and present results in the appropriate order. If we
486/// were only asked to compute a single aggregation, we can skip
487/// that step and return the arrangement provided by computing the aggregation
488/// directly.
489#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
490pub enum BasicPlan {
491    /// Plan for rendering a single basic aggregation.
492    Single(SingleBasicPlan),
493    /// Plan for rendering multiple basic aggregations.
494    /// These need to then be collated together in an additional
495    /// reduction. Each element represents the:
496    /// `(index of the set of the input we are aggregating over,
497    ///   the aggregation function)`
498    Multiple(Vec<(usize, AggregateExpr)>),
499}
500
501/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
502/// this aggregation.
503#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
504pub struct SingleBasicPlan {
505    /// The index in the set of inputs that we are aggregating over.
506    pub index: usize,
507    /// The aggregation that we should perform.
508    pub expr: AggregateExpr,
509    /// Whether we fused a `FlatMap UnnestList` with this aggregation.
510    pub fused_unnest_list: bool,
511}
512
513impl RustType<proto_basic_plan::ProtoSimpleSingleBasicPlan> for (usize, AggregateExpr) {
514    fn into_proto(&self) -> proto_basic_plan::ProtoSimpleSingleBasicPlan {
515        proto_basic_plan::ProtoSimpleSingleBasicPlan {
516            index: self.0.into_proto(),
517            expr: Some(self.1.into_proto()),
518        }
519    }
520
521    fn from_proto(
522        proto: proto_basic_plan::ProtoSimpleSingleBasicPlan,
523    ) -> Result<Self, TryFromProtoError> {
524        Ok((
525            proto.index.into_rust()?,
526            proto
527                .expr
528                .into_rust_if_some("ProtoSimpleSingleBasicPlan::expr")?,
529        ))
530    }
531}
532
533impl RustType<proto_basic_plan::ProtoSingleBasicPlan> for SingleBasicPlan {
534    fn into_proto(&self) -> proto_basic_plan::ProtoSingleBasicPlan {
535        proto_basic_plan::ProtoSingleBasicPlan {
536            index: self.index.into_proto(),
537            expr: Some(self.expr.into_proto()),
538            fused_unnest_list: self.fused_unnest_list.into_proto(),
539        }
540    }
541
542    fn from_proto(
543        proto: proto_basic_plan::ProtoSingleBasicPlan,
544    ) -> Result<Self, TryFromProtoError> {
545        Ok(SingleBasicPlan {
546            index: proto.index.into_rust()?,
547            expr: proto.expr.into_rust_if_some("ProtoSingleBasicPlan::expr")?,
548            fused_unnest_list: proto.fused_unnest_list.into_rust()?,
549        })
550    }
551}
552
553impl RustType<ProtoBasicPlan> for BasicPlan {
554    fn into_proto(&self) -> ProtoBasicPlan {
555        use proto_basic_plan::*;
556
557        ProtoBasicPlan {
558            kind: Some(match self {
559                BasicPlan::Single(plan) => Kind::Single(plan.into_proto()),
560                BasicPlan::Multiple(aggrs) => Kind::Multiple(ProtoMultipleBasicPlan {
561                    aggrs: aggrs.into_proto(),
562                }),
563            }),
564        }
565    }
566
567    fn from_proto(proto: ProtoBasicPlan) -> Result<Self, TryFromProtoError> {
568        use proto_basic_plan::Kind;
569        let kind = proto
570            .kind
571            .ok_or_else(|| TryFromProtoError::missing_field("ProtoBasicPlan::kind"))?;
572
573        Ok(match kind {
574            Kind::Single(plan) => BasicPlan::Single(plan.into_rust()?),
575            Kind::Multiple(x) => BasicPlan::Multiple(x.aggrs.into_rust()?),
576        })
577    }
578}
579
580/// Plan for collating the results of computing multiple aggregation
581/// types.
582///
583/// TODO: could we express this as a delta join
584#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
585pub struct CollationPlan {
586    /// Accumulable aggregation results to collate, if any.
587    pub accumulable: Option<AccumulablePlan>,
588    /// Hierarchical aggregation results to collate, if any.
589    pub hierarchical: Option<HierarchicalPlan>,
590    /// Basic aggregation results to collate, if any.
591    pub basic: Option<BasicPlan>,
592    /// When we get results back from each of the different
593    /// aggregation types, they will be subsequences of
594    /// the sequence aggregations in the original reduce expression.
595    /// We keep a map from output position -> reduction type
596    /// to easily merge results back into the requested order.
597    pub aggregate_types: Vec<ReductionType>,
598}
599
600impl CollationPlan {
601    /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
602    /// and sets consolidation requirements.
603    pub fn as_monotonic(&mut self, must_consolidate: bool) {
604        self.hierarchical
605            .as_mut()
606            .map(|plan| plan.as_monotonic(must_consolidate));
607    }
608}
609
610impl RustType<ProtoCollationPlan> for CollationPlan {
611    fn into_proto(&self) -> ProtoCollationPlan {
612        ProtoCollationPlan {
613            accumulable: self.accumulable.into_proto(),
614            hierarchical: self.hierarchical.into_proto(),
615            basic: self.basic.into_proto(),
616            aggregate_types: self.aggregate_types.into_proto(),
617        }
618    }
619
620    fn from_proto(proto: ProtoCollationPlan) -> Result<Self, TryFromProtoError> {
621        Ok(Self {
622            accumulable: proto.accumulable.into_rust()?,
623            hierarchical: proto.hierarchical.into_rust()?,
624            basic: proto.basic.into_rust()?,
625            aggregate_types: proto.aggregate_types.into_rust()?,
626        })
627    }
628}
629
630impl ReducePlan {
631    /// Generate a plan for computing the supplied aggregations.
632    ///
633    /// The resulting plan summarizes what the dataflow to be created
634    /// and how the aggregations will be executed.
635    pub fn create_from(
636        aggregates: Vec<AggregateExpr>,
637        monotonic: bool,
638        expected_group_size: Option<u64>,
639        fused_unnest_list: bool,
640    ) -> Self {
641        // If we don't have any aggregations we are just computing a distinct.
642        if aggregates.is_empty() {
643            return ReducePlan::Distinct;
644        }
645
646        // Otherwise, we need to group aggregations according to their
647        // reduction type (accumulable, hierarchical, or basic)
648        let mut reduction_types = BTreeMap::new();
649        // We need to make sure that each list of aggregates by type forms
650        // a subsequence of the overall sequence of aggregates.
651        for index in 0..aggregates.len() {
652            let typ = reduction_type(&aggregates[index].func);
653            let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
654            aggregates_list.push((index, aggregates[index].clone()));
655        }
656
657        // Convert each grouped list of reductions into a plan.
658        let plan: Vec<_> = reduction_types
659            .into_iter()
660            .map(|(typ, aggregates_list)| {
661                ReducePlan::create_inner(
662                    typ,
663                    aggregates_list,
664                    monotonic,
665                    expected_group_size,
666                    fused_unnest_list,
667                )
668            })
669            .collect();
670
671        // If we only have a single type of aggregation present we can
672        // render that directly
673        if plan.len() == 1 {
674            return plan[0].clone();
675        }
676
677        // Otherwise, we have to stitch reductions together.
678
679        // First, lets sanity check that we don't have an impossible number
680        // of reduction types.
681        assert!(plan.len() <= 3);
682
683        let mut collation: CollationPlan = Default::default();
684
685        // Construct a mapping from output_position -> reduction that we can
686        // use to reconstruct the output in the correct order.
687        let aggregate_types = aggregates
688            .iter()
689            .map(|a| reduction_type(&a.func))
690            .collect::<Vec<_>>();
691
692        collation.aggregate_types = aggregate_types;
693
694        for expr in plan.into_iter() {
695            match expr {
696                ReducePlan::Accumulable(e) => {
697                    assert_none!(collation.accumulable);
698                    collation.accumulable = Some(e);
699                }
700                ReducePlan::Hierarchical(e) => {
701                    assert_none!(collation.hierarchical);
702                    collation.hierarchical = Some(e);
703                }
704                ReducePlan::Basic(e) => {
705                    assert_none!(collation.basic);
706                    collation.basic = Some(e);
707                }
708                ReducePlan::Distinct | ReducePlan::Collation(_) => {
709                    panic!("Inner reduce plan was unsupported type!")
710                }
711            }
712        }
713
714        ReducePlan::Collation(collation)
715    }
716
717    /// Generate a plan for computing the specified type of aggregations.
718    ///
719    /// This function assumes that all of the supplied aggregates are
720    /// actually of the correct reduction type, and are a subsequence
721    /// of the total list of requested aggregations.
722    fn create_inner(
723        typ: ReductionType,
724        aggregates_list: Vec<(usize, AggregateExpr)>,
725        monotonic: bool,
726        expected_group_size: Option<u64>,
727        fused_unnest_list: bool,
728    ) -> Self {
729        if fused_unnest_list {
730            assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
731        }
732        assert!(
733            aggregates_list.len() > 0,
734            "error: tried to render a reduce dataflow with no aggregates"
735        );
736        match typ {
737            ReductionType::Accumulable => {
738                let mut simple_aggrs = vec![];
739                let mut distinct_aggrs = vec![];
740                let full_aggrs: Vec<_> = aggregates_list
741                    .iter()
742                    .cloned()
743                    .map(|(_, aggr)| aggr)
744                    .collect();
745                for (accumulable_index, (datum_index, aggr)) in
746                    aggregates_list.into_iter().enumerate()
747                {
748                    // Accumulable aggregations need to do extra per-aggregate work
749                    // for aggregations with the distinct bit set, so we'll separate
750                    // those out now.
751                    if aggr.distinct {
752                        distinct_aggrs.push((accumulable_index, datum_index, aggr));
753                    } else {
754                        simple_aggrs.push((accumulable_index, datum_index, aggr));
755                    };
756                }
757                ReducePlan::Accumulable(AccumulablePlan {
758                    full_aggrs,
759                    simple_aggrs,
760                    distinct_aggrs,
761                })
762            }
763            ReductionType::Hierarchical => {
764                let aggr_funcs: Vec<_> = aggregates_list
765                    .iter()
766                    .cloned()
767                    .map(|(_, aggr)| aggr.func)
768                    .collect();
769                let indexes: Vec<_> = aggregates_list
770                    .into_iter()
771                    .map(|(index, _)| index)
772                    .collect();
773
774                // We don't have random access over Rows so we can simplify the
775                // task of grabbing the inputs we are aggregating over by
776                // generating a list of "skips" an iterator over the Row needs
777                // to do to get the desired indexes.
778                let skips = convert_indexes_to_skips(indexes);
779                if monotonic {
780                    let monotonic = MonotonicPlan {
781                        aggr_funcs,
782                        skips,
783                        must_consolidate: false,
784                    };
785                    ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
786                } else {
787                    let buckets = bucketing_of_expected_group_size(expected_group_size);
788                    let bucketed = BucketedPlan {
789                        aggr_funcs,
790                        skips,
791                        buckets,
792                    };
793
794                    ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
795                }
796            }
797            ReductionType::Basic => {
798                if aggregates_list.len() == 1 {
799                    ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
800                        index: aggregates_list[0].0,
801                        expr: aggregates_list[0].1.clone(),
802                        fused_unnest_list,
803                    }))
804                } else {
805                    ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
806                }
807            }
808        }
809    }
810
811    /// Reports all keys of produced arrangements.
812    ///
813    /// This is likely either an empty vector, for no arrangement,
814    /// or a singleton vector containing the list of expressions
815    /// that key a single arrangement.
816    pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
817        let key = (0..key_arity)
818            .map(MirScalarExpr::column)
819            .collect::<Vec<_>>();
820        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
821        AvailableCollections::new_arranged(vec![(key, permutation, thinning)])
822    }
823
824    /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
825    /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
826    /// one sitting on top of the reduction.
827    ///
828    /// Non-fusable parts include temporal predicates or any other parts that cannot be
829    /// conservatively asserted to not increase the memory requirements of the output
830    /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
831    /// being the identity MFP.
832    pub fn extract_mfp_after(
833        &self,
834        mut mfp: MapFilterProject,
835        key_arity: usize,
836    ) -> (MapFilterProject, MapFilterProject, usize) {
837        // Extract temporal predicates, as we cannot push them into `Reduce`.
838        let temporal_mfp = mfp.extract_temporal();
839        let non_temporal = mfp;
840        mfp = temporal_mfp;
841
842        // We ensure we do not attempt to project away the key, as we cannot accomplish
843        // this. This is done by a simple analysis of the non-temporal part of `mfp` to
844        // check if can be directly absorbed; if it can't, we then default to a general
845        // strategy that unpacks the MFP to absorb only the filter and supporting map
846        // parts, followed by a post-MFP step.
847        let input_arity = non_temporal.input_arity;
848        let key = Vec::from_iter(0..key_arity);
849        let mut mfp_push;
850        let output_arity;
851
852        if non_temporal.projection.len() <= input_arity
853            && non_temporal.projection.iter().all(|c| *c < input_arity)
854            && non_temporal.projection.starts_with(&key)
855        {
856            // Special case: The key is preserved as a prefix and the projection is only
857            // of output fields from the reduction. So we know that: (a) We can process the
858            // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
859            // and (c) The output projection is at most as wide as the output that would be
860            // produced by the reduction, so we are sure to never regress the memory
861            // requirements of the output arrangement.
862            // Note that this strategy may change the arity of the output arrangement.
863            output_arity = non_temporal.projection.len();
864            mfp_push = non_temporal;
865        } else {
866            // General strategy: Unpack MFP as MF followed by P' that removes all M
867            // columns, then MP afterwards.
868            // Note that this strategy does not result in any changes to the arity of
869            // the output arrangement.
870            let (m, f, p) = non_temporal.into_map_filter_project();
871            mfp_push = MapFilterProject::new(input_arity)
872                .map(m.clone())
873                .filter(f)
874                .project(0..input_arity);
875            output_arity = input_arity;
876
877            // We still need to perform the map and projection for the actual output.
878            let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
879
880            // Compose the non-pushed MFP components.
881            mfp = MapFilterProject::compose(mfp_left, mfp);
882        }
883        mfp_push.optimize();
884        mfp.optimize();
885        (mfp_push, mfp, output_arity)
886    }
887}
888
889/// Plan for extracting keys and values in preparation for a reduction.
890#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
891pub struct KeyValPlan {
892    /// Extracts the columns used as the key.
893    pub key_plan: mz_expr::SafeMfpPlan,
894    /// Extracts the columns used to feed the aggregations.
895    pub val_plan: mz_expr::SafeMfpPlan,
896}
897
898impl RustType<ProtoKeyValPlan> for KeyValPlan {
899    fn into_proto(&self) -> ProtoKeyValPlan {
900        ProtoKeyValPlan {
901            key_plan: Some(self.key_plan.into_proto()),
902            val_plan: Some(self.val_plan.into_proto()),
903        }
904    }
905
906    fn from_proto(proto: ProtoKeyValPlan) -> Result<Self, TryFromProtoError> {
907        Ok(Self {
908            key_plan: proto
909                .key_plan
910                .into_rust_if_some("ProtoKeyValPlan::key_plan")?,
911            val_plan: proto
912                .val_plan
913                .into_rust_if_some("ProtoKeyValPlan::val_plan")?,
914        })
915    }
916}
917
918impl KeyValPlan {
919    /// Create a new [KeyValPlan] from aggregation arguments.
920    pub fn new(
921        input_arity: usize,
922        group_key: &[MirScalarExpr],
923        aggregates: &[AggregateExpr],
924        input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
925    ) -> Self {
926        // Form an operator for evaluating key expressions.
927        let mut key_mfp = MapFilterProject::new(input_arity)
928            .map(group_key.iter().cloned())
929            .project(input_arity..(input_arity + group_key.len()));
930        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
931            key_mfp.permute_fn(|c| input_permutation[c], new_arity);
932        }
933
934        // Form an operator for evaluating value expressions.
935        let mut val_mfp = MapFilterProject::new(input_arity)
936            .map(aggregates.iter().map(|a| a.expr.clone()))
937            .project(input_arity..(input_arity + aggregates.len()));
938        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
939            val_mfp.permute_fn(|c| input_permutation[c], new_arity);
940        }
941
942        key_mfp.optimize();
943        let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
944        val_mfp.optimize();
945        let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
946
947        Self { key_plan, val_plan }
948    }
949
950    /// The arity of the key plan
951    pub fn key_arity(&self) -> usize {
952        self.key_plan.projection.len()
953    }
954}
955
956/// Transforms a vector containing indexes of needed columns into one containing
957/// the "skips" an iterator over a Row would need to perform to see those values.
958///
959/// This function requires that all of the elements in `indexes` are strictly
960/// increasing.
961///
962/// # Examples
963///
964/// ```
965/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
966/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
967/// ```
968pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
969    for i in 1..indexes.len() {
970        soft_assert_or_log!(
971            indexes[i - 1] < indexes[i],
972            "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
973            indexes,
974        );
975    }
976
977    for i in (1..indexes.len()).rev() {
978        indexes[i] -= indexes[i - 1];
979        indexes[i] -= 1;
980    }
981
982    indexes
983}
984
985/// Determines whether a function can be accumulated in an update's "difference" field,
986/// and whether it can be subjected to recursive (hierarchical) aggregation.
987///
988/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
989/// which can be accumulated in-place using the addition operation on the type. Aggregations
990/// that indicate they are accumulable will still need to provide an action that takes their
991/// data and introduces it as a difference, and the post-processing when the accumulated value
992/// is presented as data.
993///
994/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
995/// increasingly large subsets of each key. This has the intended property that no invocation
996/// is on a significantly large set of values (and so, no incremental update needs to reform
997/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
998/// input stream is append-only as then we only need to retain the "currently winning" value.
999/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
1000pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
1001    match func {
1002        AggregateFunc::SumInt16
1003        | AggregateFunc::SumInt32
1004        | AggregateFunc::SumInt64
1005        | AggregateFunc::SumUInt16
1006        | AggregateFunc::SumUInt32
1007        | AggregateFunc::SumUInt64
1008        | AggregateFunc::SumFloat32
1009        | AggregateFunc::SumFloat64
1010        | AggregateFunc::SumNumeric
1011        | AggregateFunc::Count
1012        | AggregateFunc::Any
1013        | AggregateFunc::All
1014        | AggregateFunc::Dummy => ReductionType::Accumulable,
1015        AggregateFunc::MaxNumeric
1016        | AggregateFunc::MaxInt16
1017        | AggregateFunc::MaxInt32
1018        | AggregateFunc::MaxInt64
1019        | AggregateFunc::MaxUInt16
1020        | AggregateFunc::MaxUInt32
1021        | AggregateFunc::MaxUInt64
1022        | AggregateFunc::MaxMzTimestamp
1023        | AggregateFunc::MaxFloat32
1024        | AggregateFunc::MaxFloat64
1025        | AggregateFunc::MaxBool
1026        | AggregateFunc::MaxString
1027        | AggregateFunc::MaxDate
1028        | AggregateFunc::MaxTimestamp
1029        | AggregateFunc::MaxTimestampTz
1030        | AggregateFunc::MaxInterval
1031        | AggregateFunc::MaxTime
1032        | AggregateFunc::MinNumeric
1033        | AggregateFunc::MinInt16
1034        | AggregateFunc::MinInt32
1035        | AggregateFunc::MinInt64
1036        | AggregateFunc::MinUInt16
1037        | AggregateFunc::MinUInt32
1038        | AggregateFunc::MinUInt64
1039        | AggregateFunc::MinMzTimestamp
1040        | AggregateFunc::MinInterval
1041        | AggregateFunc::MinFloat32
1042        | AggregateFunc::MinFloat64
1043        | AggregateFunc::MinBool
1044        | AggregateFunc::MinString
1045        | AggregateFunc::MinDate
1046        | AggregateFunc::MinTimestamp
1047        | AggregateFunc::MinTimestampTz
1048        | AggregateFunc::MinTime => ReductionType::Hierarchical,
1049        AggregateFunc::JsonbAgg { .. }
1050        | AggregateFunc::JsonbObjectAgg { .. }
1051        | AggregateFunc::MapAgg { .. }
1052        | AggregateFunc::ArrayConcat { .. }
1053        | AggregateFunc::ListConcat { .. }
1054        | AggregateFunc::StringAgg { .. }
1055        | AggregateFunc::RowNumber { .. }
1056        | AggregateFunc::Rank { .. }
1057        | AggregateFunc::DenseRank { .. }
1058        | AggregateFunc::LagLead { .. }
1059        | AggregateFunc::FirstValue { .. }
1060        | AggregateFunc::LastValue { .. }
1061        | AggregateFunc::WindowAggregate { .. }
1062        | AggregateFunc::FusedValueWindowFunc { .. }
1063        | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
1064    }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use mz_ore::assert_ok;
1070    use mz_proto::protobuf_roundtrip;
1071    use proptest::prelude::*;
1072
1073    use super::*;
1074
1075    // This test causes stack overflows if not run with --release,
1076    // ignore by default.
1077    proptest! {
1078        #[mz_ore::test]
1079        #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1080        fn reduce_plan_protobuf_roundtrip(expect in any::<ReducePlan>() ) {
1081            let actual = protobuf_roundtrip::<_, ProtoReducePlan>(&expect);
1082            assert_ok!(actual);
1083            assert_eq!(actual.unwrap(), expect);
1084        }
1085    }
1086}