mz_compute_types/plan/reduce.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//! Accumulable reductions can be computed inline in a Differential update's `difference`
32//! field because they basically boil down to tracking counts of things. `sum()` is an
33//! example of an accumulable reduction, and when some element `x` is removed from the set
34//! of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//! formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//! Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//! they are still commutative and associative, which lets us compute the reduction over subsets
39//! of the input, and then compute the reduction again on those results. For example:
40//! `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//! reductions this way, we can maintain the computation in sublinear time with respect to
42//! the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//! hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//! but in order to benefit from being computed hierarchically, they need to have some reduction
45//! in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//! hierarchical evaluation.
47//!
48//! When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//! hierarchically can instead be computed in-place, because we only need to keep the value that's
50//! better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//! Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//! hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//! we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use mz_expr::{
64 AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
65};
66use mz_ore::soft_assert_or_log;
67use serde::{Deserialize, Serialize};
68
69use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
70
71/// This enum represents the three potential types of aggregations.
72#[derive(
73 Copy,
74 Clone,
75 Debug,
76 Deserialize,
77 Eq,
78 Hash,
79 Ord,
80 PartialEq,
81 PartialOrd,
82 Serialize
83)]
84pub enum ReductionType {
85 /// Accumulable functions can be subtracted from (are invertible), and associative.
86 /// We can compute these results by moving some data to the diff field under arbitrary
87 /// changes to inputs. Examples include sum or count.
88 Accumulable,
89 /// Hierarchical functions are associative, which means we can split up the work of
90 /// computing them across subsets. Note that hierarchical reductions should also
91 /// reduce the data in some way, as otherwise rendering them hierarchically is not
92 /// worth it. Examples include min or max.
93 Hierarchical,
94 /// Basic, for lack of a better word, are functions that are neither accumulable
95 /// nor hierarchical. Examples include jsonb_agg.
96 Basic,
97}
98
99impl TryFrom<&ReducePlan> for ReductionType {
100 type Error = ();
101
102 fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
103 match plan {
104 ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
105 ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
106 ReducePlan::Basic(_) => Ok(ReductionType::Basic),
107 _ => Err(()),
108 }
109 }
110}
111
112/// A `ReducePlan` provides a concise description for how we will
113/// execute a given reduce expression.
114///
115/// The provided reduce expression can have no
116/// aggregations, in which case its just a `Distinct` and otherwise
117/// it's composed of a combination of accumulable, hierarchical and
118/// basic aggregations.
119///
120/// We want to try to centralize as much decision making about the
121/// shape / general computation of the rendered dataflow graph
122/// in this plan, and then make actually rendering the graph
123/// be as simple (and compiler verifiable) as possible.
124#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
125pub enum ReducePlan {
126 /// Plan for not computing any aggregations, just determining the set of
127 /// distinct keys.
128 Distinct,
129 /// Plan for computing only accumulable aggregations.
130 Accumulable(AccumulablePlan),
131 /// Plan for computing only hierarchical aggregations.
132 Hierarchical(HierarchicalPlan),
133 /// Plan for computing only basic aggregations.
134 Basic(BasicPlan),
135}
136
137/// Plan for computing a set of accumulable aggregations.
138///
139/// We fuse all of the accumulable aggregations together
140/// and compute them with one dataflow fragment. We need to
141/// be careful to separate out the aggregations that
142/// apply only to the distinct set of values. We need
143/// to apply a distinct operator to those before we
144/// combine them with everything else.
145#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
146pub struct AccumulablePlan {
147 /// All of the aggregations we were asked to compute, stored
148 /// in order.
149 pub full_aggrs: Vec<AggregateExpr>,
150 /// All of the non-distinct accumulable aggregates.
151 /// Each element represents:
152 /// (index of the datum among inputs, aggregation expr)
153 /// These will all be rendered together in one dataflow fragment.
154 pub simple_aggrs: Vec<(usize, AggregateExpr)>,
155 /// Same as above but for all of the `DISTINCT` accumulable aggregations.
156 pub distinct_aggrs: Vec<(usize, AggregateExpr)>,
157}
158
159/// Plan for computing a set of hierarchical aggregations.
160///
161/// In the append-only setting we can render them in-place
162/// with monotonic plans, but otherwise, we need to render
163/// them with a reduction tree that splits the inputs into
164/// small, and then progressively larger, buckets
165#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
166pub enum HierarchicalPlan {
167 /// Plan hierarchical aggregations under monotonic inputs.
168 Monotonic(MonotonicPlan),
169 /// Plan for hierarchical aggregations under non-monotonic inputs.
170 Bucketed(BucketedPlan),
171}
172
173impl HierarchicalPlan {
174 /// Returns the set of aggregations computed by this plan.
175 pub fn aggr_funcs(&self) -> &[AggregateFunc] {
176 match self {
177 HierarchicalPlan::Monotonic(plan) => &plan.aggr_funcs,
178 HierarchicalPlan::Bucketed(plan) => &plan.aggr_funcs,
179 }
180 }
181
182 /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
183 /// and sets consolidation requirements.
184 pub fn as_monotonic(&mut self, must_consolidate: bool) {
185 match self {
186 HierarchicalPlan::Bucketed(bucketed) => {
187 // TODO: ideally we would not have the `clone()` but ownership
188 // seems fraught here as we are behind a `&mut self` reference.
189 *self =
190 HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
191 }
192 HierarchicalPlan::Monotonic(monotonic) => {
193 monotonic.must_consolidate = must_consolidate;
194 }
195 }
196 }
197}
198
199/// Plan for computing a set of hierarchical aggregations with a
200/// monotonic input.
201///
202/// Here, the aggregations will be rendered in place. We don't
203/// need to worry about retractions because the inputs are
204/// append only, so we can change our computation to
205/// only retain the "best" value in the diff field, instead
206/// of holding onto all values.
207#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
208pub struct MonotonicPlan {
209 /// All of the aggregations we were asked to compute.
210 pub aggr_funcs: Vec<AggregateFunc>,
211 /// True if the input is not physically monotonic, and the operator must perform
212 /// consolidation to remove potential negations. The operator implementation is
213 /// free to consolidate as late as possible while ensuring correctness, so it is
214 /// not a requirement that the input be directly subjected to consolidation.
215 /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
216 ///
217 /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
218 pub must_consolidate: bool,
219}
220
221/// Plan for computing a set of hierarchical aggregations
222/// with non-monotonic inputs.
223///
224/// To perform hierarchical aggregations with stable runtimes
225/// under updates we'll subdivide the group key into buckets, compute
226/// the reduction in each of those subdivided buckets and then combine
227/// the results into a coarser bucket (one that represents a larger
228/// fraction of the original input) and redo the reduction in another
229/// layer. Effectively, we'll construct a min / max heap out of a series
230/// of reduce operators (each one is a separate layer).
231#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
232pub struct BucketedPlan {
233 /// All of the aggregations we were asked to compute.
234 pub aggr_funcs: Vec<AggregateFunc>,
235 /// The number of buckets in each layer of the reduction tree. Should
236 /// be decreasing, and ideally, a power of two so that we can easily
237 /// distribute values to buckets with `value.hashed() % buckets[layer]`.
238 pub buckets: Vec<u64>,
239}
240
241impl BucketedPlan {
242 /// Convert to a monotonic plan, indicate whether the operator must apply
243 /// consolidation to its input.
244 fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
245 MonotonicPlan {
246 aggr_funcs: self.aggr_funcs,
247 must_consolidate,
248 }
249 }
250}
251
252/// Plan for computing a set of basic aggregations.
253///
254/// There's much less complexity when rendering basic aggregations.
255/// Each aggregation corresponds to one Differential reduce operator.
256/// That's it. However, we still want to present one final arrangement
257/// so basic aggregations present results with the same interface
258/// (one arrangement containing a row with all results) that accumulable
259/// and hierarchical aggregations do. To provide that, we render an
260/// additional reduce operator whenever we have multiple reduce aggregates
261/// to combine and present results in the appropriate order. If we
262/// were only asked to compute a single aggregation, we can skip
263/// that step and return the arrangement provided by computing the aggregation
264/// directly.
265#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
266pub enum BasicPlan {
267 /// Plan for rendering a single basic aggregation.
268 Single(SingleBasicPlan),
269 /// Plan for rendering multiple basic aggregations.
270 /// These need to then be collated together in an additional
271 /// reduction. Each element represents the:
272 /// `(index of the set of the input we are aggregating over,
273 /// the aggregation function)`
274 Multiple(Vec<AggregateExpr>),
275}
276
277/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
278/// this aggregation.
279#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
280pub struct SingleBasicPlan {
281 /// The aggregation that we should perform.
282 pub expr: AggregateExpr,
283 /// Whether we fused a `FlatMap UnnestList` with this aggregation.
284 pub fused_unnest_list: bool,
285}
286
287/// Plan for collating the results of computing multiple aggregation
288/// types.
289///
290/// TODO: could we express this as a delta join
291#[derive(
292 Clone,
293 Debug,
294 Default,
295 Serialize,
296 Deserialize,
297 Eq,
298 PartialEq,
299 Ord,
300 PartialOrd
301)]
302pub struct CollationPlan {
303 /// Accumulable aggregation results to collate, if any.
304 pub accumulable: Option<AccumulablePlan>,
305 /// Hierarchical aggregation results to collate, if any.
306 pub hierarchical: Option<HierarchicalPlan>,
307 /// Basic aggregation results to collate, if any.
308 pub basic: Option<BasicPlan>,
309 /// When we get results back from each of the different
310 /// aggregation types, they will be subsequences of
311 /// the sequence aggregations in the original reduce expression.
312 /// We keep a map from output position -> reduction type
313 /// to easily merge results back into the requested order.
314 pub aggregate_types: Vec<ReductionType>,
315}
316
317impl CollationPlan {
318 /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
319 /// and sets consolidation requirements.
320 pub fn as_monotonic(&mut self, must_consolidate: bool) {
321 self.hierarchical
322 .as_mut()
323 .map(|plan| plan.as_monotonic(must_consolidate));
324 }
325}
326
327impl ReducePlan {
328 /// Generate a plan for computing the supplied aggregations.
329 ///
330 /// The resulting plan summarizes what the dataflow to be created
331 /// and how the aggregations will be executed.
332 pub fn create_from(
333 aggregates: Vec<AggregateExpr>,
334 monotonic: bool,
335 expected_group_size: Option<u64>,
336 fused_unnest_list: bool,
337 ) -> Self {
338 // We need to make sure that all aggregates have the same type.
339 let mut aggregates_list = Vec::with_capacity(aggregates.len());
340 let mut aggregates = aggregates.into_iter();
341 if let Some(aggregate) = aggregates.next() {
342 let typ = reduction_type(&aggregate.func);
343 aggregates_list.push(aggregate);
344
345 for aggregate in aggregates {
346 assert_eq!(
347 typ,
348 reduction_type(&aggregate.func),
349 "Multiple reduction types detected"
350 );
351 aggregates_list.push(aggregate);
352 }
353 ReducePlan::create_inner(
354 typ,
355 aggregates_list,
356 monotonic,
357 expected_group_size,
358 fused_unnest_list,
359 )
360 } else {
361 // If we don't have any aggregations we are just computing a distinct.
362 ReducePlan::Distinct
363 }
364 }
365
366 /// Generate a plan for computing the specified type of aggregations.
367 ///
368 /// This function assumes that all of the supplied aggregates are
369 /// actually of the correct reduction type.
370 fn create_inner(
371 typ: ReductionType,
372 aggregates_list: Vec<AggregateExpr>,
373 monotonic: bool,
374 expected_group_size: Option<u64>,
375 fused_unnest_list: bool,
376 ) -> Self {
377 if fused_unnest_list {
378 assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
379 }
380 assert!(
381 aggregates_list.len() > 0,
382 "error: tried to render a reduce dataflow with no aggregates"
383 );
384 match typ {
385 ReductionType::Accumulable => {
386 let mut simple_aggrs = vec![];
387 let mut distinct_aggrs = vec![];
388 let full_aggrs = aggregates_list.clone();
389 for (datum_index, aggr) in aggregates_list.into_iter().enumerate() {
390 // Accumulable aggregations need to do extra per-aggregate work
391 // for aggregations with the distinct bit set, so we'll separate
392 // those out now.
393 if aggr.distinct {
394 distinct_aggrs.push((datum_index, aggr));
395 } else {
396 simple_aggrs.push((datum_index, aggr));
397 };
398 }
399 ReducePlan::Accumulable(AccumulablePlan {
400 full_aggrs,
401 simple_aggrs,
402 distinct_aggrs,
403 })
404 }
405 ReductionType::Hierarchical => {
406 let aggr_funcs = aggregates_list
407 .iter()
408 .map(|aggr| aggr.func.clone())
409 .collect();
410
411 if monotonic {
412 let monotonic = MonotonicPlan {
413 aggr_funcs,
414 must_consolidate: false,
415 };
416 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
417 } else {
418 let buckets = bucketing_of_expected_group_size(expected_group_size);
419 let bucketed = BucketedPlan {
420 aggr_funcs,
421 buckets,
422 };
423
424 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
425 }
426 }
427 ReductionType::Basic => match <_ as TryInto<[_; 1]>>::try_into(aggregates_list) {
428 Ok([expr]) => ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
429 expr,
430 fused_unnest_list,
431 })),
432 Err(aggregates_list) => ReducePlan::Basic(BasicPlan::Multiple(aggregates_list)),
433 },
434 }
435 }
436
437 /// Reports all keys of produced arrangements.
438 ///
439 /// This is likely either an empty vector, for no arrangement,
440 /// or a singleton vector containing the list of expressions
441 /// that key a single arrangement.
442 pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
443 let key = (0..key_arity)
444 .map(MirScalarExpr::column)
445 .collect::<Vec<_>>();
446 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
447 AvailableCollections::new_arranged(vec![(key, permutation, thinning)])
448 }
449
450 /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
451 /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
452 /// one sitting on top of the reduction.
453 ///
454 /// Non-fusable parts include temporal predicates or any other parts that cannot be
455 /// conservatively asserted to not increase the memory requirements of the output
456 /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
457 /// being the identity MFP.
458 pub fn extract_mfp_after(
459 &self,
460 mut mfp: MapFilterProject,
461 key_arity: usize,
462 ) -> (MapFilterProject, MapFilterProject, usize) {
463 // Extract temporal predicates, as we cannot push them into `Reduce`.
464 let temporal_mfp = mfp.extract_temporal();
465 let non_temporal = mfp;
466 mfp = temporal_mfp;
467
468 // We ensure we do not attempt to project away the key, as we cannot accomplish
469 // this. This is done by a simple analysis of the non-temporal part of `mfp` to
470 // check if can be directly absorbed; if it can't, we then default to a general
471 // strategy that unpacks the MFP to absorb only the filter and supporting map
472 // parts, followed by a post-MFP step.
473 let input_arity = non_temporal.input_arity;
474 let key = Vec::from_iter(0..key_arity);
475 let mut mfp_push;
476 let output_arity;
477
478 if non_temporal.projection.len() <= input_arity
479 && non_temporal.projection.iter().all(|c| *c < input_arity)
480 && non_temporal.projection.starts_with(&key)
481 {
482 // Special case: The key is preserved as a prefix and the projection is only
483 // of output fields from the reduction. So we know that: (a) We can process the
484 // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
485 // and (c) The output projection is at most as wide as the output that would be
486 // produced by the reduction, so we are sure to never regress the memory
487 // requirements of the output arrangement.
488 // Note that this strategy may change the arity of the output arrangement.
489 output_arity = non_temporal.projection.len();
490 mfp_push = non_temporal;
491 } else {
492 // General strategy: Unpack MFP as MF followed by P' that removes all M
493 // columns, then MP afterwards.
494 // Note that this strategy does not result in any changes to the arity of
495 // the output arrangement.
496 let (m, f, p) = non_temporal.into_map_filter_project();
497 mfp_push = MapFilterProject::new(input_arity)
498 .map(m.clone())
499 .filter(f)
500 .project(0..input_arity);
501 output_arity = input_arity;
502
503 // We still need to perform the map and projection for the actual output.
504 let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
505
506 // Compose the non-pushed MFP components.
507 mfp = MapFilterProject::compose(mfp_left, mfp);
508 }
509 mfp_push.optimize();
510 mfp.optimize();
511 (mfp_push, mfp, output_arity)
512 }
513}
514
515/// Plan for extracting keys and values in preparation for a reduction.
516#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
517pub struct KeyValPlan {
518 /// Extracts the columns used as the key.
519 pub key_plan: mz_expr::SafeMfpPlan,
520 /// Extracts the columns used to feed the aggregations.
521 pub val_plan: mz_expr::SafeMfpPlan,
522}
523
524impl KeyValPlan {
525 /// Create a new [KeyValPlan] from aggregation arguments.
526 pub fn new(
527 input_arity: usize,
528 group_key: &[MirScalarExpr],
529 aggregates: &[AggregateExpr],
530 input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
531 ) -> Self {
532 // Form an operator for evaluating key expressions.
533 let mut key_mfp = MapFilterProject::new(input_arity)
534 .map(group_key.iter().cloned())
535 .project(input_arity..(input_arity + group_key.len()));
536 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
537 key_mfp.permute_fn(|c| input_permutation[c], new_arity);
538 }
539
540 // Form an operator for evaluating value expressions.
541 let mut val_mfp = MapFilterProject::new(input_arity)
542 .map(aggregates.iter().map(|a| a.expr.clone()))
543 .project(input_arity..(input_arity + aggregates.len()));
544 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
545 val_mfp.permute_fn(|c| input_permutation[c], new_arity);
546 }
547
548 key_mfp.optimize();
549 let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
550 val_mfp.optimize();
551 let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
552
553 Self { key_plan, val_plan }
554 }
555
556 /// The arity of the key plan
557 pub fn key_arity(&self) -> usize {
558 self.key_plan.projection.len()
559 }
560}
561
562/// Transforms a vector containing indexes of needed columns into one containing
563/// the "skips" an iterator over a Row would need to perform to see those values.
564///
565/// This function requires that all of the elements in `indexes` are strictly
566/// increasing.
567///
568/// # Examples
569///
570/// ```
571/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
572/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
573/// ```
574pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
575 for i in 1..indexes.len() {
576 soft_assert_or_log!(
577 indexes[i - 1] < indexes[i],
578 "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
579 indexes,
580 );
581 }
582
583 for i in (1..indexes.len()).rev() {
584 indexes[i] -= indexes[i - 1];
585 indexes[i] -= 1;
586 }
587
588 indexes
589}
590
591/// Determines whether a function can be accumulated in an update's "difference" field,
592/// and whether it can be subjected to recursive (hierarchical) aggregation.
593///
594/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
595/// which can be accumulated in-place using the addition operation on the type. Aggregations
596/// that indicate they are accumulable will still need to provide an action that takes their
597/// data and introduces it as a difference, and the post-processing when the accumulated value
598/// is presented as data.
599///
600/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
601/// increasingly large subsets of each key. This has the intended property that no invocation
602/// is on a significantly large set of values (and so, no incremental update needs to reform
603/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
604/// input stream is append-only as then we only need to retain the "currently winning" value.
605/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
606pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
607 match func {
608 AggregateFunc::SumInt16
609 | AggregateFunc::SumInt32
610 | AggregateFunc::SumInt64
611 | AggregateFunc::SumUInt16
612 | AggregateFunc::SumUInt32
613 | AggregateFunc::SumUInt64
614 | AggregateFunc::SumFloat32
615 | AggregateFunc::SumFloat64
616 | AggregateFunc::SumNumeric
617 | AggregateFunc::Count
618 | AggregateFunc::Any
619 | AggregateFunc::All
620 | AggregateFunc::Dummy => ReductionType::Accumulable,
621 AggregateFunc::MaxNumeric
622 | AggregateFunc::MaxInt16
623 | AggregateFunc::MaxInt32
624 | AggregateFunc::MaxInt64
625 | AggregateFunc::MaxUInt16
626 | AggregateFunc::MaxUInt32
627 | AggregateFunc::MaxUInt64
628 | AggregateFunc::MaxMzTimestamp
629 | AggregateFunc::MaxFloat32
630 | AggregateFunc::MaxFloat64
631 | AggregateFunc::MaxBool
632 | AggregateFunc::MaxString
633 | AggregateFunc::MaxDate
634 | AggregateFunc::MaxTimestamp
635 | AggregateFunc::MaxTimestampTz
636 | AggregateFunc::MaxInterval
637 | AggregateFunc::MaxTime
638 | AggregateFunc::MinNumeric
639 | AggregateFunc::MinInt16
640 | AggregateFunc::MinInt32
641 | AggregateFunc::MinInt64
642 | AggregateFunc::MinUInt16
643 | AggregateFunc::MinUInt32
644 | AggregateFunc::MinUInt64
645 | AggregateFunc::MinMzTimestamp
646 | AggregateFunc::MinInterval
647 | AggregateFunc::MinFloat32
648 | AggregateFunc::MinFloat64
649 | AggregateFunc::MinBool
650 | AggregateFunc::MinString
651 | AggregateFunc::MinDate
652 | AggregateFunc::MinTimestamp
653 | AggregateFunc::MinTimestampTz
654 | AggregateFunc::MinTime => ReductionType::Hierarchical,
655 AggregateFunc::JsonbAgg { .. }
656 | AggregateFunc::JsonbObjectAgg { .. }
657 | AggregateFunc::MapAgg { .. }
658 | AggregateFunc::ArrayConcat { .. }
659 | AggregateFunc::ListConcat { .. }
660 | AggregateFunc::StringAgg { .. }
661 | AggregateFunc::RowNumber { .. }
662 | AggregateFunc::Rank { .. }
663 | AggregateFunc::DenseRank { .. }
664 | AggregateFunc::LagLead { .. }
665 | AggregateFunc::FirstValue { .. }
666 | AggregateFunc::LastValue { .. }
667 | AggregateFunc::WindowAggregate { .. }
668 | AggregateFunc::FusedValueWindowFunc { .. }
669 | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
670 }
671}