Skip to main content

mz_compute_types/plan/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//!    Accumulable reductions can be computed inline in a Differential update's `difference`
32//!    field because they basically boil down to tracking counts of things. `sum()` is an
33//!    example of an accumulable reduction, and when some element `x` is removed from the set
34//!    of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//!    formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//!    Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//!    they are still commutative and associative, which lets us compute the reduction over subsets
39//!    of the input, and then compute the reduction again on those results. For example:
40//!    `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//!    reductions this way, we can maintain the computation in sublinear time with respect to
42//!    the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//!    hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//!    but in order to benefit from being computed hierarchically, they need to have some reduction
45//!    in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//!    hierarchical evaluation.
47//!
48//!    When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//!    hierarchically can instead be computed in-place, because we only need to keep the value that's
50//!    better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//!    Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//!    hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//!    we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use mz_expr::{
64    AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
65};
66use mz_ore::soft_assert_or_log;
67use serde::{Deserialize, Serialize};
68
69use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
70
71/// This enum represents the three potential types of aggregations.
72#[derive(
73    Copy,
74    Clone,
75    Debug,
76    Deserialize,
77    Eq,
78    Hash,
79    Ord,
80    PartialEq,
81    PartialOrd,
82    Serialize
83)]
84pub enum ReductionType {
85    /// Accumulable functions can be subtracted from (are invertible), and associative.
86    /// We can compute these results by moving some data to the diff field under arbitrary
87    /// changes to inputs. Examples include sum or count.
88    Accumulable,
89    /// Hierarchical functions are associative, which means we can split up the work of
90    /// computing them across subsets. Note that hierarchical reductions should also
91    /// reduce the data in some way, as otherwise rendering them hierarchically is not
92    /// worth it. Examples include min or max.
93    Hierarchical,
94    /// Basic, for lack of a better word, are functions that are neither accumulable
95    /// nor hierarchical. Examples include jsonb_agg.
96    Basic,
97}
98
99impl TryFrom<&ReducePlan> for ReductionType {
100    type Error = ();
101
102    fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
103        match plan {
104            ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
105            ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
106            ReducePlan::Basic(_) => Ok(ReductionType::Basic),
107            _ => Err(()),
108        }
109    }
110}
111
112/// A `ReducePlan` provides a concise description for how we will
113/// execute a given reduce expression.
114///
115/// The provided reduce expression can have no
116/// aggregations, in which case its just a `Distinct` and otherwise
117/// it's composed of a combination of accumulable, hierarchical and
118/// basic aggregations.
119///
120/// We want to try to centralize as much decision making about the
121/// shape / general computation of the rendered dataflow graph
122/// in this plan, and then make actually rendering the graph
123/// be as simple (and compiler verifiable) as possible.
124#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
125pub enum ReducePlan {
126    /// Plan for not computing any aggregations, just determining the set of
127    /// distinct keys.
128    Distinct,
129    /// Plan for computing only accumulable aggregations.
130    Accumulable(AccumulablePlan),
131    /// Plan for computing only hierarchical aggregations.
132    Hierarchical(HierarchicalPlan),
133    /// Plan for computing only basic aggregations.
134    Basic(BasicPlan),
135}
136
137/// Plan for computing a set of accumulable aggregations.
138///
139/// We fuse all of the accumulable aggregations together
140/// and compute them with one dataflow fragment. We need to
141/// be careful to separate out the aggregations that
142/// apply only to the distinct set of values. We need
143/// to apply a distinct operator to those before we
144/// combine them with everything else.
145#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
146pub struct AccumulablePlan {
147    /// All of the aggregations we were asked to compute, stored
148    /// in order.
149    pub full_aggrs: Vec<AggregateExpr>,
150    /// All of the non-distinct accumulable aggregates.
151    /// Each element represents:
152    /// (index of the datum among inputs, aggregation expr)
153    /// These will all be rendered together in one dataflow fragment.
154    pub simple_aggrs: Vec<(usize, AggregateExpr)>,
155    /// Same as above but for all of the `DISTINCT` accumulable aggregations.
156    pub distinct_aggrs: Vec<(usize, AggregateExpr)>,
157}
158
159/// Plan for computing a set of hierarchical aggregations.
160///
161/// In the append-only setting we can render them in-place
162/// with monotonic plans, but otherwise, we need to render
163/// them with a reduction tree that splits the inputs into
164/// small, and then progressively larger, buckets
165#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
166pub enum HierarchicalPlan {
167    /// Plan hierarchical aggregations under monotonic inputs.
168    Monotonic(MonotonicPlan),
169    /// Plan for hierarchical aggregations under non-monotonic inputs.
170    Bucketed(BucketedPlan),
171}
172
173impl HierarchicalPlan {
174    /// Returns the set of aggregations computed by this plan.
175    pub fn aggr_funcs(&self) -> &[AggregateFunc] {
176        match self {
177            HierarchicalPlan::Monotonic(plan) => &plan.aggr_funcs,
178            HierarchicalPlan::Bucketed(plan) => &plan.aggr_funcs,
179        }
180    }
181
182    /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
183    /// and sets consolidation requirements.
184    pub fn as_monotonic(&mut self, must_consolidate: bool) {
185        match self {
186            HierarchicalPlan::Bucketed(bucketed) => {
187                // TODO: ideally we would not have the `clone()` but ownership
188                // seems fraught here as we are behind a `&mut self` reference.
189                *self =
190                    HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
191            }
192            HierarchicalPlan::Monotonic(monotonic) => {
193                monotonic.must_consolidate = must_consolidate;
194            }
195        }
196    }
197}
198
199/// Plan for computing a set of hierarchical aggregations with a
200/// monotonic input.
201///
202/// Here, the aggregations will be rendered in place. We don't
203/// need to worry about retractions because the inputs are
204/// append only, so we can change our computation to
205/// only retain the "best" value in the diff field, instead
206/// of holding onto all values.
207#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
208pub struct MonotonicPlan {
209    /// All of the aggregations we were asked to compute.
210    pub aggr_funcs: Vec<AggregateFunc>,
211    /// True if the input is not physically monotonic, and the operator must perform
212    /// consolidation to remove potential negations. The operator implementation is
213    /// free to consolidate as late as possible while ensuring correctness, so it is
214    /// not a requirement that the input be directly subjected to consolidation.
215    /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
216    ///
217    /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
218    pub must_consolidate: bool,
219}
220
221/// Plan for computing a set of hierarchical aggregations
222/// with non-monotonic inputs.
223///
224/// To perform hierarchical aggregations with stable runtimes
225/// under updates we'll subdivide the group key into buckets, compute
226/// the reduction in each of those subdivided buckets and then combine
227/// the results into a coarser bucket (one that represents a larger
228/// fraction of the original input) and redo the reduction in another
229/// layer. Effectively, we'll construct a min / max heap out of a series
230/// of reduce operators (each one is a separate layer).
231#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
232pub struct BucketedPlan {
233    /// All of the aggregations we were asked to compute.
234    pub aggr_funcs: Vec<AggregateFunc>,
235    /// The number of buckets in each layer of the reduction tree. Should
236    /// be decreasing, and ideally, a power of two so that we can easily
237    /// distribute values to buckets with `value.hashed() % buckets[layer]`.
238    pub buckets: Vec<u64>,
239}
240
241impl BucketedPlan {
242    /// Convert to a monotonic plan, indicate whether the operator must apply
243    /// consolidation to its input.
244    fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
245        MonotonicPlan {
246            aggr_funcs: self.aggr_funcs,
247            must_consolidate,
248        }
249    }
250}
251
252/// Plan for computing a set of basic aggregations.
253///
254/// There's much less complexity when rendering basic aggregations.
255/// Each aggregation corresponds to one Differential reduce operator.
256/// That's it. However, we still want to present one final arrangement
257/// so basic aggregations present results with the same interface
258/// (one arrangement containing a row with all results) that accumulable
259/// and hierarchical aggregations do. To provide that, we render an
260/// additional reduce operator whenever we have multiple reduce aggregates
261/// to combine and present results in the appropriate order. If we
262/// were only asked to compute a single aggregation, we can skip
263/// that step and return the arrangement provided by computing the aggregation
264/// directly.
265#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
266pub enum BasicPlan {
267    /// Plan for rendering a single basic aggregation.
268    Single(SingleBasicPlan),
269    /// Plan for rendering multiple basic aggregations.
270    /// These need to then be collated together in an additional
271    /// reduction. Each element represents the:
272    /// `(index of the set of the input we are aggregating over,
273    ///   the aggregation function)`
274    Multiple(Vec<AggregateExpr>),
275}
276
277/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
278/// this aggregation.
279#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
280pub struct SingleBasicPlan {
281    /// The aggregation that we should perform.
282    pub expr: AggregateExpr,
283    /// Whether we fused a `FlatMap UnnestList` with this aggregation.
284    pub fused_unnest_list: bool,
285}
286
287/// Plan for collating the results of computing multiple aggregation
288/// types.
289///
290/// TODO: could we express this as a delta join
291#[derive(
292    Clone,
293    Debug,
294    Default,
295    Serialize,
296    Deserialize,
297    Eq,
298    PartialEq,
299    Ord,
300    PartialOrd
301)]
302pub struct CollationPlan {
303    /// Accumulable aggregation results to collate, if any.
304    pub accumulable: Option<AccumulablePlan>,
305    /// Hierarchical aggregation results to collate, if any.
306    pub hierarchical: Option<HierarchicalPlan>,
307    /// Basic aggregation results to collate, if any.
308    pub basic: Option<BasicPlan>,
309    /// When we get results back from each of the different
310    /// aggregation types, they will be subsequences of
311    /// the sequence aggregations in the original reduce expression.
312    /// We keep a map from output position -> reduction type
313    /// to easily merge results back into the requested order.
314    pub aggregate_types: Vec<ReductionType>,
315}
316
317impl CollationPlan {
318    /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
319    /// and sets consolidation requirements.
320    pub fn as_monotonic(&mut self, must_consolidate: bool) {
321        self.hierarchical
322            .as_mut()
323            .map(|plan| plan.as_monotonic(must_consolidate));
324    }
325}
326
327impl ReducePlan {
328    /// Generate a plan for computing the supplied aggregations.
329    ///
330    /// The resulting plan summarizes what the dataflow to be created
331    /// and how the aggregations will be executed.
332    pub fn create_from(
333        aggregates: Vec<AggregateExpr>,
334        monotonic: bool,
335        expected_group_size: Option<u64>,
336        fused_unnest_list: bool,
337    ) -> Self {
338        // We need to make sure that all aggregates have the same type.
339        let mut aggregates_list = Vec::with_capacity(aggregates.len());
340        let mut aggregates = aggregates.into_iter();
341        if let Some(aggregate) = aggregates.next() {
342            let typ = reduction_type(&aggregate.func);
343            aggregates_list.push(aggregate);
344
345            for aggregate in aggregates {
346                assert_eq!(
347                    typ,
348                    reduction_type(&aggregate.func),
349                    "Multiple reduction types detected"
350                );
351                aggregates_list.push(aggregate);
352            }
353            ReducePlan::create_inner(
354                typ,
355                aggregates_list,
356                monotonic,
357                expected_group_size,
358                fused_unnest_list,
359            )
360        } else {
361            // If we don't have any aggregations we are just computing a distinct.
362            ReducePlan::Distinct
363        }
364    }
365
366    /// Generate a plan for computing the specified type of aggregations.
367    ///
368    /// This function assumes that all of the supplied aggregates are
369    /// actually of the correct reduction type.
370    fn create_inner(
371        typ: ReductionType,
372        aggregates_list: Vec<AggregateExpr>,
373        monotonic: bool,
374        expected_group_size: Option<u64>,
375        fused_unnest_list: bool,
376    ) -> Self {
377        if fused_unnest_list {
378            assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
379        }
380        assert!(
381            aggregates_list.len() > 0,
382            "error: tried to render a reduce dataflow with no aggregates"
383        );
384        match typ {
385            ReductionType::Accumulable => {
386                let mut simple_aggrs = vec![];
387                let mut distinct_aggrs = vec![];
388                let full_aggrs = aggregates_list.clone();
389                for (datum_index, aggr) in aggregates_list.into_iter().enumerate() {
390                    // Accumulable aggregations need to do extra per-aggregate work
391                    // for aggregations with the distinct bit set, so we'll separate
392                    // those out now.
393                    if aggr.distinct {
394                        distinct_aggrs.push((datum_index, aggr));
395                    } else {
396                        simple_aggrs.push((datum_index, aggr));
397                    };
398                }
399                ReducePlan::Accumulable(AccumulablePlan {
400                    full_aggrs,
401                    simple_aggrs,
402                    distinct_aggrs,
403                })
404            }
405            ReductionType::Hierarchical => {
406                let aggr_funcs = aggregates_list
407                    .iter()
408                    .map(|aggr| aggr.func.clone())
409                    .collect();
410
411                if monotonic {
412                    let monotonic = MonotonicPlan {
413                        aggr_funcs,
414                        must_consolidate: false,
415                    };
416                    ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
417                } else {
418                    let buckets = bucketing_of_expected_group_size(expected_group_size);
419                    let bucketed = BucketedPlan {
420                        aggr_funcs,
421                        buckets,
422                    };
423
424                    ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
425                }
426            }
427            ReductionType::Basic => match <_ as TryInto<[_; 1]>>::try_into(aggregates_list) {
428                Ok([expr]) => ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
429                    expr,
430                    fused_unnest_list,
431                })),
432                Err(aggregates_list) => ReducePlan::Basic(BasicPlan::Multiple(aggregates_list)),
433            },
434        }
435    }
436
437    /// Reports all keys of produced arrangements.
438    ///
439    /// This is likely either an empty vector, for no arrangement,
440    /// or a singleton vector containing the list of expressions
441    /// that key a single arrangement.
442    pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
443        let key = (0..key_arity)
444            .map(MirScalarExpr::column)
445            .collect::<Vec<_>>();
446        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
447        AvailableCollections::new_arranged(vec![(key, permutation, thinning)])
448    }
449
450    /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
451    /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
452    /// one sitting on top of the reduction.
453    ///
454    /// Non-fusable parts include temporal predicates or any other parts that cannot be
455    /// conservatively asserted to not increase the memory requirements of the output
456    /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
457    /// being the identity MFP.
458    pub fn extract_mfp_after(
459        &self,
460        mut mfp: MapFilterProject,
461        key_arity: usize,
462    ) -> (MapFilterProject, MapFilterProject, usize) {
463        // Extract temporal predicates, as we cannot push them into `Reduce`.
464        let temporal_mfp = mfp.extract_temporal();
465        let non_temporal = mfp;
466        mfp = temporal_mfp;
467
468        // We ensure we do not attempt to project away the key, as we cannot accomplish
469        // this. This is done by a simple analysis of the non-temporal part of `mfp` to
470        // check if can be directly absorbed; if it can't, we then default to a general
471        // strategy that unpacks the MFP to absorb only the filter and supporting map
472        // parts, followed by a post-MFP step.
473        let input_arity = non_temporal.input_arity;
474        let key = Vec::from_iter(0..key_arity);
475        let mut mfp_push;
476        let output_arity;
477
478        if non_temporal.projection.len() <= input_arity
479            && non_temporal.projection.iter().all(|c| *c < input_arity)
480            && non_temporal.projection.starts_with(&key)
481        {
482            // Special case: The key is preserved as a prefix and the projection is only
483            // of output fields from the reduction. So we know that: (a) We can process the
484            // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
485            // and (c) The output projection is at most as wide as the output that would be
486            // produced by the reduction, so we are sure to never regress the memory
487            // requirements of the output arrangement.
488            // Note that this strategy may change the arity of the output arrangement.
489            output_arity = non_temporal.projection.len();
490            mfp_push = non_temporal;
491        } else {
492            // General strategy: Unpack MFP as MF followed by P' that removes all M
493            // columns, then MP afterwards.
494            // Note that this strategy does not result in any changes to the arity of
495            // the output arrangement.
496            let (m, f, p) = non_temporal.into_map_filter_project();
497            mfp_push = MapFilterProject::new(input_arity)
498                .map(m.clone())
499                .filter(f)
500                .project(0..input_arity);
501            output_arity = input_arity;
502
503            // We still need to perform the map and projection for the actual output.
504            let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
505
506            // Compose the non-pushed MFP components.
507            mfp = MapFilterProject::compose(mfp_left, mfp);
508        }
509        mfp_push.optimize();
510        mfp.optimize();
511        (mfp_push, mfp, output_arity)
512    }
513}
514
515/// Plan for extracting keys and values in preparation for a reduction.
516#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
517pub struct KeyValPlan {
518    /// Extracts the columns used as the key.
519    pub key_plan: mz_expr::SafeMfpPlan,
520    /// Extracts the columns used to feed the aggregations.
521    pub val_plan: mz_expr::SafeMfpPlan,
522}
523
524impl KeyValPlan {
525    /// Create a new [KeyValPlan] from aggregation arguments.
526    pub fn new(
527        input_arity: usize,
528        group_key: &[MirScalarExpr],
529        aggregates: &[AggregateExpr],
530        input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
531    ) -> Self {
532        // Form an operator for evaluating key expressions.
533        let mut key_mfp = MapFilterProject::new(input_arity)
534            .map(group_key.iter().cloned())
535            .project(input_arity..(input_arity + group_key.len()));
536        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
537            key_mfp.permute_fn(|c| input_permutation[c], new_arity);
538        }
539
540        // Form an operator for evaluating value expressions.
541        let mut val_mfp = MapFilterProject::new(input_arity)
542            .map(aggregates.iter().map(|a| a.expr.clone()))
543            .project(input_arity..(input_arity + aggregates.len()));
544        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
545            val_mfp.permute_fn(|c| input_permutation[c], new_arity);
546        }
547
548        key_mfp.optimize();
549        let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
550        val_mfp.optimize();
551        let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
552
553        Self { key_plan, val_plan }
554    }
555
556    /// The arity of the key plan
557    pub fn key_arity(&self) -> usize {
558        self.key_plan.projection.len()
559    }
560}
561
562/// Transforms a vector containing indexes of needed columns into one containing
563/// the "skips" an iterator over a Row would need to perform to see those values.
564///
565/// This function requires that all of the elements in `indexes` are strictly
566/// increasing.
567///
568/// # Examples
569///
570/// ```
571/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
572/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
573/// ```
574pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
575    for i in 1..indexes.len() {
576        soft_assert_or_log!(
577            indexes[i - 1] < indexes[i],
578            "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
579            indexes,
580        );
581    }
582
583    for i in (1..indexes.len()).rev() {
584        indexes[i] -= indexes[i - 1];
585        indexes[i] -= 1;
586    }
587
588    indexes
589}
590
591/// Determines whether a function can be accumulated in an update's "difference" field,
592/// and whether it can be subjected to recursive (hierarchical) aggregation.
593///
594/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
595/// which can be accumulated in-place using the addition operation on the type. Aggregations
596/// that indicate they are accumulable will still need to provide an action that takes their
597/// data and introduces it as a difference, and the post-processing when the accumulated value
598/// is presented as data.
599///
600/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
601/// increasingly large subsets of each key. This has the intended property that no invocation
602/// is on a significantly large set of values (and so, no incremental update needs to reform
603/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
604/// input stream is append-only as then we only need to retain the "currently winning" value.
605/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
606pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
607    match func {
608        AggregateFunc::SumInt16
609        | AggregateFunc::SumInt32
610        | AggregateFunc::SumInt64
611        | AggregateFunc::SumUInt16
612        | AggregateFunc::SumUInt32
613        | AggregateFunc::SumUInt64
614        | AggregateFunc::SumFloat32
615        | AggregateFunc::SumFloat64
616        | AggregateFunc::SumNumeric
617        | AggregateFunc::Count
618        | AggregateFunc::Any
619        | AggregateFunc::All
620        | AggregateFunc::Dummy => ReductionType::Accumulable,
621        AggregateFunc::MaxNumeric
622        | AggregateFunc::MaxInt16
623        | AggregateFunc::MaxInt32
624        | AggregateFunc::MaxInt64
625        | AggregateFunc::MaxUInt16
626        | AggregateFunc::MaxUInt32
627        | AggregateFunc::MaxUInt64
628        | AggregateFunc::MaxMzTimestamp
629        | AggregateFunc::MaxFloat32
630        | AggregateFunc::MaxFloat64
631        | AggregateFunc::MaxBool
632        | AggregateFunc::MaxString
633        | AggregateFunc::MaxDate
634        | AggregateFunc::MaxTimestamp
635        | AggregateFunc::MaxTimestampTz
636        | AggregateFunc::MaxInterval
637        | AggregateFunc::MaxTime
638        | AggregateFunc::MinNumeric
639        | AggregateFunc::MinInt16
640        | AggregateFunc::MinInt32
641        | AggregateFunc::MinInt64
642        | AggregateFunc::MinUInt16
643        | AggregateFunc::MinUInt32
644        | AggregateFunc::MinUInt64
645        | AggregateFunc::MinMzTimestamp
646        | AggregateFunc::MinInterval
647        | AggregateFunc::MinFloat32
648        | AggregateFunc::MinFloat64
649        | AggregateFunc::MinBool
650        | AggregateFunc::MinString
651        | AggregateFunc::MinDate
652        | AggregateFunc::MinTimestamp
653        | AggregateFunc::MinTimestampTz
654        | AggregateFunc::MinTime => ReductionType::Hierarchical,
655        AggregateFunc::JsonbAgg { .. }
656        | AggregateFunc::JsonbObjectAgg { .. }
657        | AggregateFunc::MapAgg { .. }
658        | AggregateFunc::ArrayConcat { .. }
659        | AggregateFunc::ListConcat { .. }
660        | AggregateFunc::StringAgg { .. }
661        | AggregateFunc::RowNumber { .. }
662        | AggregateFunc::Rank { .. }
663        | AggregateFunc::DenseRank { .. }
664        | AggregateFunc::LagLead { .. }
665        | AggregateFunc::FirstValue { .. }
666        | AggregateFunc::LastValue { .. }
667        | AggregateFunc::WindowAggregate { .. }
668        | AggregateFunc::FusedValueWindowFunc { .. }
669        | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
670    }
671}