mz_compute_types/plan/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//!    Accumulable reductions can be computed inline in a Differential update's `difference`
32//!    field because they basically boil down to tracking counts of things. `sum()` is an
33//!    example of an accumulable reduction, and when some element `x` is removed from the set
34//!    of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//!    formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//!    Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//!    they are still commutative and associative, which lets us compute the reduction over subsets
39//!    of the input, and then compute the reduction again on those results. For example:
40//!    `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//!    reductions this way, we can maintain the computation in sublinear time with respect to
42//!    the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//!    hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//!    but in order to benefit from being computed hierarchically, they need to have some reduction
45//!    in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//!    hierarchical evaluation.
47//!
48//!    When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//!    hierarchically can instead be computed in-place, because we only need to keep the value that's
50//!    better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//!    Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//!    hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//!    we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use std::collections::BTreeMap;
64
65use mz_expr::{
66    AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
67};
68use mz_ore::{assert_none, soft_assert_or_log};
69use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
70use proptest::prelude::{Arbitrary, BoxedStrategy, any};
71use proptest::strategy::Strategy;
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
76
77include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.reduce.rs"));
78
79/// This enum represents the three potential types of aggregations.
80#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
81pub enum ReductionType {
82    /// Accumulable functions can be subtracted from (are invertible), and associative.
83    /// We can compute these results by moving some data to the diff field under arbitrary
84    /// changes to inputs. Examples include sum or count.
85    Accumulable,
86    /// Hierarchical functions are associative, which means we can split up the work of
87    /// computing them across subsets. Note that hierarchical reductions should also
88    /// reduce the data in some way, as otherwise rendering them hierarchically is not
89    /// worth it. Examples include min or max.
90    Hierarchical,
91    /// Basic, for lack of a better word, are functions that are neither accumulable
92    /// nor hierarchical. Examples include jsonb_agg.
93    Basic,
94}
95
96impl columnation::Columnation for ReductionType {
97    type InnerRegion = columnation::CopyRegion<ReductionType>;
98}
99
100impl RustType<ProtoReductionType> for ReductionType {
101    fn into_proto(&self) -> ProtoReductionType {
102        use proto_reduction_type::Kind;
103        ProtoReductionType {
104            kind: Some(match self {
105                ReductionType::Accumulable => Kind::Accumulable(()),
106                ReductionType::Hierarchical => Kind::Hierarchical(()),
107                ReductionType::Basic => Kind::Basic(()),
108            }),
109        }
110    }
111
112    fn from_proto(proto: ProtoReductionType) -> Result<Self, TryFromProtoError> {
113        use proto_reduction_type::Kind;
114        let kind = proto
115            .kind
116            .ok_or_else(|| TryFromProtoError::missing_field("kind"))?;
117        Ok(match kind {
118            Kind::Accumulable(()) => ReductionType::Accumulable,
119            Kind::Hierarchical(()) => ReductionType::Hierarchical,
120            Kind::Basic(()) => ReductionType::Basic,
121        })
122    }
123}
124
125impl TryFrom<&ReducePlan> for ReductionType {
126    type Error = ();
127
128    fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
129        match plan {
130            ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
131            ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
132            ReducePlan::Basic(_) => Ok(ReductionType::Basic),
133            _ => Err(()),
134        }
135    }
136}
137
138/// A `ReducePlan` provides a concise description for how we will
139/// execute a given reduce expression.
140///
141/// The provided reduce expression can have no
142/// aggregations, in which case its just a `Distinct` and otherwise
143/// it's composed of a combination of accumulable, hierarchical and
144/// basic aggregations.
145///
146/// We want to try to centralize as much decision making about the
147/// shape / general computation of the rendered dataflow graph
148/// in this plan, and then make actually rendering the graph
149/// be as simple (and compiler verifiable) as possible.
150#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
151pub enum ReducePlan {
152    /// Plan for not computing any aggregations, just determining the set of
153    /// distinct keys.
154    Distinct,
155    /// Plan for computing only accumulable aggregations.
156    Accumulable(AccumulablePlan),
157    /// Plan for computing only hierarchical aggregations.
158    Hierarchical(HierarchicalPlan),
159    /// Plan for computing only basic aggregations.
160    Basic(BasicPlan),
161    /// Plan for computing a mix of different kinds of aggregations.
162    /// We need to do extra work here to reassemble results back in the
163    /// requested order.
164    Collation(CollationPlan),
165}
166
167proptest::prop_compose! {
168    /// `expected_group_size` is a u64, but instead of a uniform distribution,
169    /// we want a logarithmic distribution so that we have an even distribution
170    /// in the number of layers of buckets that a hierarchical plan would have.
171    fn any_group_size()
172        (bits in 0..u64::BITS)
173        (integer in (((1_u64) << bits) - 1)
174            ..(if bits == (u64::BITS - 1){ u64::MAX }
175                else { (1_u64) << (bits + 1) - 1 }))
176    -> u64 {
177        integer
178    }
179}
180
181/// To avoid stack overflow, this limits the arbitrarily-generated test
182/// `ReducePlan`s to involve at most 8 aggregations.
183///
184/// To have better coverage of realistic expected group sizes, the
185/// `expected group size` has a logarithmic distribution.
186impl Arbitrary for ReducePlan {
187    type Parameters = ();
188
189    type Strategy = BoxedStrategy<Self>;
190
191    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
192        (
193            proptest::collection::vec(any::<AggregateExpr>(), 0..8),
194            any::<bool>(),
195            any::<bool>(),
196            any_group_size(),
197            any::<bool>(),
198        )
199            .prop_map(
200                |(
201                    exprs,
202                    monotonic,
203                    any_expected_size,
204                    expected_group_size,
205                    mut fused_unnest_list,
206                )| {
207                    let expected_group_size = if any_expected_size {
208                        Some(expected_group_size)
209                    } else {
210                        None
211                    };
212                    if !(exprs.len() == 1
213                        && matches!(reduction_type(&exprs[0].func), ReductionType::Basic))
214                    {
215                        fused_unnest_list = false;
216                    }
217                    ReducePlan::create_from(
218                        exprs,
219                        monotonic,
220                        expected_group_size,
221                        fused_unnest_list,
222                    )
223                },
224            )
225            .boxed()
226    }
227}
228
229impl RustType<ProtoReducePlan> for ReducePlan {
230    fn into_proto(&self) -> ProtoReducePlan {
231        use proto_reduce_plan::Kind::*;
232        ProtoReducePlan {
233            kind: Some(match self {
234                ReducePlan::Distinct => Distinct(()),
235                ReducePlan::Accumulable(plan) => Accumulable(plan.into_proto()),
236                ReducePlan::Hierarchical(plan) => Hierarchical(plan.into_proto()),
237                ReducePlan::Basic(plan) => Basic(plan.into_proto()),
238                ReducePlan::Collation(plan) => Collation(plan.into_proto()),
239            }),
240        }
241    }
242
243    fn from_proto(proto: ProtoReducePlan) -> Result<Self, TryFromProtoError> {
244        use proto_reduce_plan::Kind::*;
245        let kind = proto
246            .kind
247            .ok_or_else(|| TryFromProtoError::missing_field("ProtoReducePlan::kind"))?;
248        Ok(match kind {
249            Distinct(()) => ReducePlan::Distinct,
250            Accumulable(plan) => ReducePlan::Accumulable(plan.into_rust()?),
251            Hierarchical(plan) => ReducePlan::Hierarchical(plan.into_rust()?),
252            Basic(plan) => ReducePlan::Basic(plan.into_rust()?),
253            Collation(plan) => ReducePlan::Collation(plan.into_rust()?),
254        })
255    }
256}
257
258/// Plan for computing a set of accumulable aggregations.
259///
260/// We fuse all of the accumulable aggregations together
261/// and compute them with one dataflow fragment. We need to
262/// be careful to separate out the aggregations that
263/// apply only to the distinct set of values. We need
264/// to apply a distinct operator to those before we
265/// combine them with everything else.
266#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
267pub struct AccumulablePlan {
268    /// All of the aggregations we were asked to compute, stored
269    /// in order.
270    pub full_aggrs: Vec<AggregateExpr>,
271    /// All of the non-distinct accumulable aggregates.
272    /// Each element represents:
273    /// (index of the aggregation among accumulable aggregations,
274    ///  index of the datum among inputs, aggregation expr)
275    /// These will all be rendered together in one dataflow fragment.
276    pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
277    /// Same as above but for all of the `DISTINCT` accumulable aggregations.
278    pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
279}
280
281impl RustType<proto_accumulable_plan::ProtoAggr> for (usize, usize, AggregateExpr) {
282    fn into_proto(&self) -> proto_accumulable_plan::ProtoAggr {
283        proto_accumulable_plan::ProtoAggr {
284            index_agg: self.0.into_proto(),
285            index_inp: self.1.into_proto(),
286            expr: Some(self.2.into_proto()),
287        }
288    }
289
290    fn from_proto(proto: proto_accumulable_plan::ProtoAggr) -> Result<Self, TryFromProtoError> {
291        Ok((
292            proto.index_agg.into_rust()?,
293            proto.index_inp.into_rust()?,
294            proto.expr.into_rust_if_some("ProtoAggr::expr")?,
295        ))
296    }
297}
298
299impl RustType<ProtoAccumulablePlan> for AccumulablePlan {
300    fn into_proto(&self) -> ProtoAccumulablePlan {
301        ProtoAccumulablePlan {
302            full_aggrs: self.full_aggrs.into_proto(),
303            simple_aggrs: self.simple_aggrs.into_proto(),
304            distinct_aggrs: self.distinct_aggrs.into_proto(),
305        }
306    }
307
308    fn from_proto(proto: ProtoAccumulablePlan) -> Result<Self, TryFromProtoError> {
309        Ok(Self {
310            full_aggrs: proto.full_aggrs.into_rust()?,
311            simple_aggrs: proto.simple_aggrs.into_rust()?,
312            distinct_aggrs: proto.distinct_aggrs.into_rust()?,
313        })
314    }
315}
316
317/// Plan for computing a set of hierarchical aggregations.
318///
319/// In the append-only setting we can render them in-place
320/// with monotonic plans, but otherwise, we need to render
321/// them with a reduction tree that splits the inputs into
322/// small, and then progressively larger, buckets
323#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
324pub enum HierarchicalPlan {
325    /// Plan hierarchical aggregations under monotonic inputs.
326    Monotonic(MonotonicPlan),
327    /// Plan for hierarchical aggregations under non-monotonic inputs.
328    Bucketed(BucketedPlan),
329}
330
331impl HierarchicalPlan {
332    /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
333    /// and sets consolidation requirements.
334    pub fn as_monotonic(&mut self, must_consolidate: bool) {
335        match self {
336            HierarchicalPlan::Bucketed(bucketed) => {
337                // TODO: ideally we would not have the `clone()` but ownership
338                // seems fraught here as we are behind a `&mut self` reference.
339                *self =
340                    HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
341            }
342            HierarchicalPlan::Monotonic(monotonic) => {
343                monotonic.must_consolidate = must_consolidate;
344            }
345        }
346    }
347}
348
349impl RustType<ProtoHierarchicalPlan> for HierarchicalPlan {
350    fn into_proto(&self) -> ProtoHierarchicalPlan {
351        use proto_hierarchical_plan::Kind;
352        ProtoHierarchicalPlan {
353            kind: Some(match self {
354                HierarchicalPlan::Monotonic(plan) => Kind::Monotonic(plan.into_proto()),
355                HierarchicalPlan::Bucketed(plan) => Kind::Bucketed(plan.into_proto()),
356            }),
357        }
358    }
359
360    fn from_proto(proto: ProtoHierarchicalPlan) -> Result<Self, TryFromProtoError> {
361        use proto_hierarchical_plan::Kind;
362        let kind = proto
363            .kind
364            .ok_or_else(|| TryFromProtoError::missing_field("ProtoHierarchicalPlan::Kind"))?;
365        Ok(match kind {
366            Kind::Monotonic(plan) => HierarchicalPlan::Monotonic(plan.into_rust()?),
367            Kind::Bucketed(plan) => HierarchicalPlan::Bucketed(plan.into_rust()?),
368        })
369    }
370}
371
372/// Plan for computing a set of hierarchical aggregations with a
373/// monotonic input.
374///
375/// Here, the aggregations will be rendered in place. We don't
376/// need to worry about retractions because the inputs are
377/// append only, so we can change our computation to
378/// only retain the "best" value in the diff field, instead
379/// of holding onto all values.
380#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
381pub struct MonotonicPlan {
382    /// All of the aggregations we were asked to compute.
383    pub aggr_funcs: Vec<AggregateFunc>,
384    /// Set of "skips" or calls to `nth()` an iterator needs to do over
385    /// the input to extract the relevant datums.
386    pub skips: Vec<usize>,
387    /// True if the input is not physically monotonic, and the operator must perform
388    /// consolidation to remove potential negations. The operator implementation is
389    /// free to consolidate as late as possible while ensuring correctness, so it is
390    /// not a requirement that the input be directly subjected to consolidation.
391    /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
392    ///
393    /// [^1] <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
394    pub must_consolidate: bool,
395}
396
397impl RustType<ProtoMonotonicPlan> for MonotonicPlan {
398    fn into_proto(&self) -> ProtoMonotonicPlan {
399        ProtoMonotonicPlan {
400            aggr_funcs: self.aggr_funcs.into_proto(),
401            skips: self.skips.into_proto(),
402            must_consolidate: self.must_consolidate.into_proto(),
403        }
404    }
405
406    fn from_proto(proto: ProtoMonotonicPlan) -> Result<Self, TryFromProtoError> {
407        Ok(Self {
408            aggr_funcs: proto.aggr_funcs.into_rust()?,
409            skips: proto.skips.into_rust()?,
410            must_consolidate: proto.must_consolidate.into_rust()?,
411        })
412    }
413}
414
415/// Plan for computing a set of hierarchical aggregations
416/// with non-monotonic inputs.
417///
418/// To perform hierarchical aggregations with stable runtimes
419/// under updates we'll subdivide the group key into buckets, compute
420/// the reduction in each of those subdivided buckets and then combine
421/// the results into a coarser bucket (one that represents a larger
422/// fraction of the original input) and redo the reduction in another
423/// layer. Effectively, we'll construct a min / max heap out of a series
424/// of reduce operators (each one is a separate layer).
425#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
426pub struct BucketedPlan {
427    /// All of the aggregations we were asked to compute.
428    pub aggr_funcs: Vec<AggregateFunc>,
429    /// Set of "skips" or calls to `nth()` an iterator needs to do over
430    /// the input to extract the relevant datums.
431    pub skips: Vec<usize>,
432    /// The number of buckets in each layer of the reduction tree. Should
433    /// be decreasing, and ideally, a power of two so that we can easily
434    /// distribute values to buckets with `value.hashed() % buckets[layer]`.
435    pub buckets: Vec<u64>,
436}
437
438impl BucketedPlan {
439    /// Convert to a monotonic plan, indicate whether the operator must apply
440    /// consolidation to its input.
441    fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
442        MonotonicPlan {
443            aggr_funcs: self.aggr_funcs,
444            skips: self.skips,
445            must_consolidate,
446        }
447    }
448}
449
450impl RustType<ProtoBucketedPlan> for BucketedPlan {
451    fn into_proto(&self) -> ProtoBucketedPlan {
452        ProtoBucketedPlan {
453            aggr_funcs: self.aggr_funcs.into_proto(),
454            skips: self.skips.into_proto(),
455            buckets: self.buckets.clone(),
456        }
457    }
458
459    fn from_proto(proto: ProtoBucketedPlan) -> Result<Self, TryFromProtoError> {
460        Ok(Self {
461            aggr_funcs: proto.aggr_funcs.into_rust()?,
462            skips: proto.skips.into_rust()?,
463            buckets: proto.buckets,
464        })
465    }
466}
467
468/// Plan for computing a set of basic aggregations.
469///
470/// There's much less complexity when rendering basic aggregations.
471/// Each aggregation corresponds to one Differential reduce operator.
472/// That's it. However, we still want to present one final arrangement
473/// so basic aggregations present results with the same interface
474/// (one arrangement containing a row with all results) that accumulable
475/// and hierarchical aggregations do. To provide that, we render an
476/// additional reduce operator whenever we have multiple reduce aggregates
477/// to combine and present results in the appropriate order. If we
478/// were only asked to compute a single aggregation, we can skip
479/// that step and return the arrangement provided by computing the aggregation
480/// directly.
481#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
482pub enum BasicPlan {
483    /// Plan for rendering a single basic aggregation.
484    Single(SingleBasicPlan),
485    /// Plan for rendering multiple basic aggregations.
486    /// These need to then be collated together in an additional
487    /// reduction. Each element represents the:
488    /// `(index of the set of the input we are aggregating over,
489    ///   the aggregation function)`
490    Multiple(Vec<(usize, AggregateExpr)>),
491}
492
493/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
494/// this aggregation.
495#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
496pub struct SingleBasicPlan {
497    /// The index in the set of inputs that we are aggregating over.
498    pub index: usize,
499    /// The aggregation that we should perform.
500    pub expr: AggregateExpr,
501    /// Whether we fused a `FlatMap UnnestList` with this aggregation.
502    pub fused_unnest_list: bool,
503}
504
505impl RustType<proto_basic_plan::ProtoSimpleSingleBasicPlan> for (usize, AggregateExpr) {
506    fn into_proto(&self) -> proto_basic_plan::ProtoSimpleSingleBasicPlan {
507        proto_basic_plan::ProtoSimpleSingleBasicPlan {
508            index: self.0.into_proto(),
509            expr: Some(self.1.into_proto()),
510        }
511    }
512
513    fn from_proto(
514        proto: proto_basic_plan::ProtoSimpleSingleBasicPlan,
515    ) -> Result<Self, TryFromProtoError> {
516        Ok((
517            proto.index.into_rust()?,
518            proto
519                .expr
520                .into_rust_if_some("ProtoSimpleSingleBasicPlan::expr")?,
521        ))
522    }
523}
524
525impl RustType<proto_basic_plan::ProtoSingleBasicPlan> for SingleBasicPlan {
526    fn into_proto(&self) -> proto_basic_plan::ProtoSingleBasicPlan {
527        proto_basic_plan::ProtoSingleBasicPlan {
528            index: self.index.into_proto(),
529            expr: Some(self.expr.into_proto()),
530            fused_unnest_list: self.fused_unnest_list.into_proto(),
531        }
532    }
533
534    fn from_proto(
535        proto: proto_basic_plan::ProtoSingleBasicPlan,
536    ) -> Result<Self, TryFromProtoError> {
537        Ok(SingleBasicPlan {
538            index: proto.index.into_rust()?,
539            expr: proto.expr.into_rust_if_some("ProtoSingleBasicPlan::expr")?,
540            fused_unnest_list: proto.fused_unnest_list.into_rust()?,
541        })
542    }
543}
544
545impl RustType<ProtoBasicPlan> for BasicPlan {
546    fn into_proto(&self) -> ProtoBasicPlan {
547        use proto_basic_plan::*;
548
549        ProtoBasicPlan {
550            kind: Some(match self {
551                BasicPlan::Single(plan) => Kind::Single(plan.into_proto()),
552                BasicPlan::Multiple(aggrs) => Kind::Multiple(ProtoMultipleBasicPlan {
553                    aggrs: aggrs.into_proto(),
554                }),
555            }),
556        }
557    }
558
559    fn from_proto(proto: ProtoBasicPlan) -> Result<Self, TryFromProtoError> {
560        use proto_basic_plan::Kind;
561        let kind = proto
562            .kind
563            .ok_or_else(|| TryFromProtoError::missing_field("ProtoBasicPlan::kind"))?;
564
565        Ok(match kind {
566            Kind::Single(plan) => BasicPlan::Single(plan.into_rust()?),
567            Kind::Multiple(x) => BasicPlan::Multiple(x.aggrs.into_rust()?),
568        })
569    }
570}
571
572/// Plan for collating the results of computing multiple aggregation
573/// types.
574///
575/// TODO: could we express this as a delta join
576#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
577pub struct CollationPlan {
578    /// Accumulable aggregation results to collate, if any.
579    pub accumulable: Option<AccumulablePlan>,
580    /// Hierarchical aggregation results to collate, if any.
581    pub hierarchical: Option<HierarchicalPlan>,
582    /// Basic aggregation results to collate, if any.
583    pub basic: Option<BasicPlan>,
584    /// When we get results back from each of the different
585    /// aggregation types, they will be subsequences of
586    /// the sequence aggregations in the original reduce expression.
587    /// We keep a map from output position -> reduction type
588    /// to easily merge results back into the requested order.
589    pub aggregate_types: Vec<ReductionType>,
590}
591
592impl CollationPlan {
593    /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
594    /// and sets consolidation requirements.
595    pub fn as_monotonic(&mut self, must_consolidate: bool) {
596        self.hierarchical
597            .as_mut()
598            .map(|plan| plan.as_monotonic(must_consolidate));
599    }
600}
601
602impl RustType<ProtoCollationPlan> for CollationPlan {
603    fn into_proto(&self) -> ProtoCollationPlan {
604        ProtoCollationPlan {
605            accumulable: self.accumulable.into_proto(),
606            hierarchical: self.hierarchical.into_proto(),
607            basic: self.basic.into_proto(),
608            aggregate_types: self.aggregate_types.into_proto(),
609        }
610    }
611
612    fn from_proto(proto: ProtoCollationPlan) -> Result<Self, TryFromProtoError> {
613        Ok(Self {
614            accumulable: proto.accumulable.into_rust()?,
615            hierarchical: proto.hierarchical.into_rust()?,
616            basic: proto.basic.into_rust()?,
617            aggregate_types: proto.aggregate_types.into_rust()?,
618        })
619    }
620}
621
622impl ReducePlan {
623    /// Generate a plan for computing the supplied aggregations.
624    ///
625    /// The resulting plan summarizes what the dataflow to be created
626    /// and how the aggregations will be executed.
627    pub fn create_from(
628        aggregates: Vec<AggregateExpr>,
629        monotonic: bool,
630        expected_group_size: Option<u64>,
631        fused_unnest_list: bool,
632    ) -> Self {
633        // If we don't have any aggregations we are just computing a distinct.
634        if aggregates.is_empty() {
635            return ReducePlan::Distinct;
636        }
637
638        // Otherwise, we need to group aggregations according to their
639        // reduction type (accumulable, hierarchical, or basic)
640        let mut reduction_types = BTreeMap::new();
641        // We need to make sure that each list of aggregates by type forms
642        // a subsequence of the overall sequence of aggregates.
643        for index in 0..aggregates.len() {
644            let typ = reduction_type(&aggregates[index].func);
645            let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
646            aggregates_list.push((index, aggregates[index].clone()));
647        }
648
649        // Convert each grouped list of reductions into a plan.
650        let plan: Vec<_> = reduction_types
651            .into_iter()
652            .map(|(typ, aggregates_list)| {
653                ReducePlan::create_inner(
654                    typ,
655                    aggregates_list,
656                    monotonic,
657                    expected_group_size,
658                    fused_unnest_list,
659                )
660            })
661            .collect();
662
663        // If we only have a single type of aggregation present we can
664        // render that directly
665        if plan.len() == 1 {
666            return plan[0].clone();
667        }
668
669        // Otherwise, we have to stitch reductions together.
670
671        // First, lets sanity check that we don't have an impossible number
672        // of reduction types.
673        assert!(plan.len() <= 3);
674
675        let mut collation: CollationPlan = Default::default();
676
677        // Construct a mapping from output_position -> reduction that we can
678        // use to reconstruct the output in the correct order.
679        let aggregate_types = aggregates
680            .iter()
681            .map(|a| reduction_type(&a.func))
682            .collect::<Vec<_>>();
683
684        collation.aggregate_types = aggregate_types;
685
686        for expr in plan.into_iter() {
687            match expr {
688                ReducePlan::Accumulable(e) => {
689                    assert_none!(collation.accumulable);
690                    collation.accumulable = Some(e);
691                }
692                ReducePlan::Hierarchical(e) => {
693                    assert_none!(collation.hierarchical);
694                    collation.hierarchical = Some(e);
695                }
696                ReducePlan::Basic(e) => {
697                    assert_none!(collation.basic);
698                    collation.basic = Some(e);
699                }
700                ReducePlan::Distinct | ReducePlan::Collation(_) => {
701                    panic!("Inner reduce plan was unsupported type!")
702                }
703            }
704        }
705
706        ReducePlan::Collation(collation)
707    }
708
709    /// Generate a plan for computing the specified type of aggregations.
710    ///
711    /// This function assumes that all of the supplied aggregates are
712    /// actually of the correct reduction type, and are a subsequence
713    /// of the total list of requested aggregations.
714    fn create_inner(
715        typ: ReductionType,
716        aggregates_list: Vec<(usize, AggregateExpr)>,
717        monotonic: bool,
718        expected_group_size: Option<u64>,
719        fused_unnest_list: bool,
720    ) -> Self {
721        if fused_unnest_list {
722            assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
723        }
724        assert!(
725            aggregates_list.len() > 0,
726            "error: tried to render a reduce dataflow with no aggregates"
727        );
728        match typ {
729            ReductionType::Accumulable => {
730                let mut simple_aggrs = vec![];
731                let mut distinct_aggrs = vec![];
732                let full_aggrs: Vec<_> = aggregates_list
733                    .iter()
734                    .cloned()
735                    .map(|(_, aggr)| aggr)
736                    .collect();
737                for (accumulable_index, (datum_index, aggr)) in
738                    aggregates_list.into_iter().enumerate()
739                {
740                    // Accumulable aggregations need to do extra per-aggregate work
741                    // for aggregations with the distinct bit set, so we'll separate
742                    // those out now.
743                    if aggr.distinct {
744                        distinct_aggrs.push((accumulable_index, datum_index, aggr));
745                    } else {
746                        simple_aggrs.push((accumulable_index, datum_index, aggr));
747                    };
748                }
749                ReducePlan::Accumulable(AccumulablePlan {
750                    full_aggrs,
751                    simple_aggrs,
752                    distinct_aggrs,
753                })
754            }
755            ReductionType::Hierarchical => {
756                let aggr_funcs: Vec<_> = aggregates_list
757                    .iter()
758                    .cloned()
759                    .map(|(_, aggr)| aggr.func)
760                    .collect();
761                let indexes: Vec<_> = aggregates_list
762                    .into_iter()
763                    .map(|(index, _)| index)
764                    .collect();
765
766                // We don't have random access over Rows so we can simplify the
767                // task of grabbing the inputs we are aggregating over by
768                // generating a list of "skips" an iterator over the Row needs
769                // to do to get the desired indexes.
770                let skips = convert_indexes_to_skips(indexes);
771                if monotonic {
772                    let monotonic = MonotonicPlan {
773                        aggr_funcs,
774                        skips,
775                        must_consolidate: false,
776                    };
777                    ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
778                } else {
779                    let buckets = bucketing_of_expected_group_size(expected_group_size);
780                    let bucketed = BucketedPlan {
781                        aggr_funcs,
782                        skips,
783                        buckets,
784                    };
785
786                    ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
787                }
788            }
789            ReductionType::Basic => {
790                if aggregates_list.len() == 1 {
791                    ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
792                        index: aggregates_list[0].0,
793                        expr: aggregates_list[0].1.clone(),
794                        fused_unnest_list,
795                    }))
796                } else {
797                    ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
798                }
799            }
800        }
801    }
802
803    /// Reports all keys of produced arrangements.
804    ///
805    /// This is likely either an empty vector, for no arrangement,
806    /// or a singleton vector containing the list of expressions
807    /// that key a single arrangement.
808    pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
809        let key = (0..key_arity)
810            .map(MirScalarExpr::Column)
811            .collect::<Vec<_>>();
812        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
813        AvailableCollections::new_arranged(vec![(key, permutation, thinning)], None)
814    }
815
816    /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
817    /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
818    /// one sitting on top of the reduction.
819    ///
820    /// Non-fusable parts include temporal predicates or any other parts that cannot be
821    /// conservatively asserted to not increase the memory requirements of the output
822    /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
823    /// being the identity MFP.
824    pub fn extract_mfp_after(
825        &self,
826        mut mfp: MapFilterProject,
827        key_arity: usize,
828    ) -> (MapFilterProject, MapFilterProject, usize) {
829        // Extract temporal predicates, as we cannot push them into `Reduce`.
830        let temporal_mfp = mfp.extract_temporal();
831        let non_temporal = mfp;
832        mfp = temporal_mfp;
833
834        // We ensure we do not attempt to project away the key, as we cannot accomplish
835        // this. This is done by a simple analysis of the non-temporal part of `mfp` to
836        // check if can be directly absorbed; if it can't, we then default to a general
837        // strategy that unpacks the MFP to absorb only the filter and supporting map
838        // parts, followed by a post-MFP step.
839        let input_arity = non_temporal.input_arity;
840        let key = Vec::from_iter(0..key_arity);
841        let mut mfp_push;
842        let output_arity;
843
844        if non_temporal.projection.len() <= input_arity
845            && non_temporal.projection.iter().all(|c| *c < input_arity)
846            && non_temporal.projection.starts_with(&key)
847        {
848            // Special case: The key is preserved as a prefix and the projection is only
849            // of output fields from the reduction. So we know that: (a) We can process the
850            // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
851            // and (c) The output projection is at most as wide as the output that would be
852            // produced by the reduction, so we are sure to never regress the memory
853            // requirements of the output arrangement.
854            // Note that this strategy may change the arity of the output arrangement.
855            output_arity = non_temporal.projection.len();
856            mfp_push = non_temporal;
857        } else {
858            // General strategy: Unpack MFP as MF followed by P' that removes all M
859            // columns, then MP afterwards.
860            // Note that this strategy does not result in any changes to the arity of
861            // the output arrangement.
862            let (m, f, p) = non_temporal.into_map_filter_project();
863            mfp_push = MapFilterProject::new(input_arity)
864                .map(m.clone())
865                .filter(f)
866                .project(0..input_arity);
867            output_arity = input_arity;
868
869            // We still need to perform the map and projection for the actual output.
870            let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
871
872            // Compose the non-pushed MFP components.
873            mfp = MapFilterProject::compose(mfp_left, mfp);
874        }
875        mfp_push.optimize();
876        mfp.optimize();
877        (mfp_push, mfp, output_arity)
878    }
879}
880
881/// Plan for extracting keys and values in preparation for a reduction.
882#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
883pub struct KeyValPlan {
884    /// Extracts the columns used as the key.
885    pub key_plan: mz_expr::SafeMfpPlan,
886    /// Extracts the columns used to feed the aggregations.
887    pub val_plan: mz_expr::SafeMfpPlan,
888}
889
890impl RustType<ProtoKeyValPlan> for KeyValPlan {
891    fn into_proto(&self) -> ProtoKeyValPlan {
892        ProtoKeyValPlan {
893            key_plan: Some(self.key_plan.into_proto()),
894            val_plan: Some(self.val_plan.into_proto()),
895        }
896    }
897
898    fn from_proto(proto: ProtoKeyValPlan) -> Result<Self, TryFromProtoError> {
899        Ok(Self {
900            key_plan: proto
901                .key_plan
902                .into_rust_if_some("ProtoKeyValPlan::key_plan")?,
903            val_plan: proto
904                .val_plan
905                .into_rust_if_some("ProtoKeyValPlan::val_plan")?,
906        })
907    }
908}
909
910impl KeyValPlan {
911    /// Create a new [KeyValPlan] from aggregation arguments.
912    pub fn new(
913        input_arity: usize,
914        group_key: &[MirScalarExpr],
915        aggregates: &[AggregateExpr],
916        input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
917    ) -> Self {
918        // Form an operator for evaluating key expressions.
919        let mut key_mfp = MapFilterProject::new(input_arity)
920            .map(group_key.iter().cloned())
921            .project(input_arity..(input_arity + group_key.len()));
922        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
923            key_mfp.permute_fn(|c| input_permutation[c], new_arity);
924        }
925
926        // Form an operator for evaluating value expressions.
927        let mut val_mfp = MapFilterProject::new(input_arity)
928            .map(aggregates.iter().map(|a| a.expr.clone()))
929            .project(input_arity..(input_arity + aggregates.len()));
930        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
931            val_mfp.permute_fn(|c| input_permutation[c], new_arity);
932        }
933
934        key_mfp.optimize();
935        let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
936        val_mfp.optimize();
937        let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
938
939        Self { key_plan, val_plan }
940    }
941
942    /// The arity of the key plan
943    pub fn key_arity(&self) -> usize {
944        self.key_plan.projection.len()
945    }
946}
947
948/// Transforms a vector containing indexes of needed columns into one containing
949/// the "skips" an iterator over a Row would need to perform to see those values.
950///
951/// This function requires that all of the elements in `indexes` are strictly
952/// increasing.
953///
954/// # Examples
955///
956/// ```
957/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
958/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
959/// ```
960pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
961    for i in 1..indexes.len() {
962        soft_assert_or_log!(
963            indexes[i - 1] < indexes[i],
964            "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
965            indexes,
966        );
967    }
968
969    for i in (1..indexes.len()).rev() {
970        indexes[i] -= indexes[i - 1];
971        indexes[i] -= 1;
972    }
973
974    indexes
975}
976
977/// Determines whether a function can be accumulated in an update's "difference" field,
978/// and whether it can be subjected to recursive (hierarchical) aggregation.
979///
980/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
981/// which can be accumulated in-place using the addition operation on the type. Aggregations
982/// that indicate they are accumulable will still need to provide an action that takes their
983/// data and introduces it as a difference, and the post-processing when the accumulated value
984/// is presented as data.
985///
986/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
987/// increasingly large subsets of each key. This has the intended property that no invocation
988/// is on a significantly large set of values (and so, no incremental update needs to reform
989/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
990/// input stream is append-only as then we only need to retain the "currently winning" value.
991/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
992pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
993    match func {
994        AggregateFunc::SumInt16
995        | AggregateFunc::SumInt32
996        | AggregateFunc::SumInt64
997        | AggregateFunc::SumUInt16
998        | AggregateFunc::SumUInt32
999        | AggregateFunc::SumUInt64
1000        | AggregateFunc::SumFloat32
1001        | AggregateFunc::SumFloat64
1002        | AggregateFunc::SumNumeric
1003        | AggregateFunc::Count
1004        | AggregateFunc::Any
1005        | AggregateFunc::All
1006        | AggregateFunc::Dummy => ReductionType::Accumulable,
1007        AggregateFunc::MaxNumeric
1008        | AggregateFunc::MaxInt16
1009        | AggregateFunc::MaxInt32
1010        | AggregateFunc::MaxInt64
1011        | AggregateFunc::MaxUInt16
1012        | AggregateFunc::MaxUInt32
1013        | AggregateFunc::MaxUInt64
1014        | AggregateFunc::MaxMzTimestamp
1015        | AggregateFunc::MaxFloat32
1016        | AggregateFunc::MaxFloat64
1017        | AggregateFunc::MaxBool
1018        | AggregateFunc::MaxString
1019        | AggregateFunc::MaxDate
1020        | AggregateFunc::MaxTimestamp
1021        | AggregateFunc::MaxTimestampTz
1022        | AggregateFunc::MaxInterval
1023        | AggregateFunc::MaxTime
1024        | AggregateFunc::MinNumeric
1025        | AggregateFunc::MinInt16
1026        | AggregateFunc::MinInt32
1027        | AggregateFunc::MinInt64
1028        | AggregateFunc::MinUInt16
1029        | AggregateFunc::MinUInt32
1030        | AggregateFunc::MinUInt64
1031        | AggregateFunc::MinMzTimestamp
1032        | AggregateFunc::MinInterval
1033        | AggregateFunc::MinFloat32
1034        | AggregateFunc::MinFloat64
1035        | AggregateFunc::MinBool
1036        | AggregateFunc::MinString
1037        | AggregateFunc::MinDate
1038        | AggregateFunc::MinTimestamp
1039        | AggregateFunc::MinTimestampTz
1040        | AggregateFunc::MinTime => ReductionType::Hierarchical,
1041        AggregateFunc::JsonbAgg { .. }
1042        | AggregateFunc::JsonbObjectAgg { .. }
1043        | AggregateFunc::MapAgg { .. }
1044        | AggregateFunc::ArrayConcat { .. }
1045        | AggregateFunc::ListConcat { .. }
1046        | AggregateFunc::StringAgg { .. }
1047        | AggregateFunc::RowNumber { .. }
1048        | AggregateFunc::Rank { .. }
1049        | AggregateFunc::DenseRank { .. }
1050        | AggregateFunc::LagLead { .. }
1051        | AggregateFunc::FirstValue { .. }
1052        | AggregateFunc::LastValue { .. }
1053        | AggregateFunc::WindowAggregate { .. }
1054        | AggregateFunc::FusedValueWindowFunc { .. }
1055        | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use mz_ore::assert_ok;
1062    use mz_proto::protobuf_roundtrip;
1063    use proptest::prelude::*;
1064
1065    use super::*;
1066
1067    // This test causes stack overflows if not run with --release,
1068    // ignore by default.
1069    proptest! {
1070        #[mz_ore::test]
1071        #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1072        fn reduce_plan_protobuf_roundtrip(expect in any::<ReducePlan>() ) {
1073            let actual = protobuf_roundtrip::<_, ProtoReducePlan>(&expect);
1074            assert_ok!(actual);
1075            assert_eq!(actual.unwrap(), expect);
1076        }
1077    }
1078}