mz_compute_types/plan/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//!    Accumulable reductions can be computed inline in a Differential update's `difference`
32//!    field because they basically boil down to tracking counts of things. `sum()` is an
33//!    example of an accumulable reduction, and when some element `x` is removed from the set
34//!    of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//!    formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//!    Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//!    they are still commutative and associative, which lets us compute the reduction over subsets
39//!    of the input, and then compute the reduction again on those results. For example:
40//!    `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//!    reductions this way, we can maintain the computation in sublinear time with respect to
42//!    the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//!    hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//!    but in order to benefit from being computed hierarchically, they need to have some reduction
45//!    in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//!    hierarchical evaluation.
47//!
48//!    When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//!    hierarchically can instead be computed in-place, because we only need to keep the value that's
50//!    better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//!    Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//!    hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//!    we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use std::collections::BTreeMap;
64
65use mz_expr::{
66    AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
67};
68use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log};
69use serde::{Deserialize, Serialize};
70
71use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
72
73/// This enum represents the three potential types of aggregations.
74#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
75pub enum ReductionType {
76    /// Accumulable functions can be subtracted from (are invertible), and associative.
77    /// We can compute these results by moving some data to the diff field under arbitrary
78    /// changes to inputs. Examples include sum or count.
79    Accumulable,
80    /// Hierarchical functions are associative, which means we can split up the work of
81    /// computing them across subsets. Note that hierarchical reductions should also
82    /// reduce the data in some way, as otherwise rendering them hierarchically is not
83    /// worth it. Examples include min or max.
84    Hierarchical,
85    /// Basic, for lack of a better word, are functions that are neither accumulable
86    /// nor hierarchical. Examples include jsonb_agg.
87    Basic,
88}
89
90impl columnation::Columnation for ReductionType {
91    type InnerRegion = columnation::CopyRegion<ReductionType>;
92}
93
94impl TryFrom<&ReducePlan> for ReductionType {
95    type Error = ();
96
97    fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
98        match plan {
99            ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
100            ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
101            ReducePlan::Basic(_) => Ok(ReductionType::Basic),
102            _ => Err(()),
103        }
104    }
105}
106
107/// A `ReducePlan` provides a concise description for how we will
108/// execute a given reduce expression.
109///
110/// The provided reduce expression can have no
111/// aggregations, in which case its just a `Distinct` and otherwise
112/// it's composed of a combination of accumulable, hierarchical and
113/// basic aggregations.
114///
115/// We want to try to centralize as much decision making about the
116/// shape / general computation of the rendered dataflow graph
117/// in this plan, and then make actually rendering the graph
118/// be as simple (and compiler verifiable) as possible.
119#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
120pub enum ReducePlan {
121    /// Plan for not computing any aggregations, just determining the set of
122    /// distinct keys.
123    Distinct,
124    /// Plan for computing only accumulable aggregations.
125    Accumulable(AccumulablePlan),
126    /// Plan for computing only hierarchical aggregations.
127    Hierarchical(HierarchicalPlan),
128    /// Plan for computing only basic aggregations.
129    Basic(BasicPlan),
130    /// Plan for computing a mix of different kinds of aggregations.
131    /// We need to do extra work here to reassemble results back in the
132    /// requested order.
133    Collation(CollationPlan),
134}
135
136/// Plan for computing a set of accumulable aggregations.
137///
138/// We fuse all of the accumulable aggregations together
139/// and compute them with one dataflow fragment. We need to
140/// be careful to separate out the aggregations that
141/// apply only to the distinct set of values. We need
142/// to apply a distinct operator to those before we
143/// combine them with everything else.
144#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
145pub struct AccumulablePlan {
146    /// All of the aggregations we were asked to compute, stored
147    /// in order.
148    pub full_aggrs: Vec<AggregateExpr>,
149    /// All of the non-distinct accumulable aggregates.
150    /// Each element represents:
151    /// (index of the aggregation among accumulable aggregations,
152    ///  index of the datum among inputs, aggregation expr)
153    /// These will all be rendered together in one dataflow fragment.
154    pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
155    /// Same as above but for all of the `DISTINCT` accumulable aggregations.
156    pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
157}
158
159/// Plan for computing a set of hierarchical aggregations.
160///
161/// In the append-only setting we can render them in-place
162/// with monotonic plans, but otherwise, we need to render
163/// them with a reduction tree that splits the inputs into
164/// small, and then progressively larger, buckets
165#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
166pub enum HierarchicalPlan {
167    /// Plan hierarchical aggregations under monotonic inputs.
168    Monotonic(MonotonicPlan),
169    /// Plan for hierarchical aggregations under non-monotonic inputs.
170    Bucketed(BucketedPlan),
171}
172
173impl HierarchicalPlan {
174    /// Returns the set of aggregations computed by this plan.
175    pub fn aggr_funcs(&self) -> &[AggregateFunc] {
176        match self {
177            HierarchicalPlan::Monotonic(plan) => &plan.aggr_funcs,
178            HierarchicalPlan::Bucketed(plan) => &plan.aggr_funcs,
179        }
180    }
181
182    /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
183    /// and sets consolidation requirements.
184    pub fn as_monotonic(&mut self, must_consolidate: bool) {
185        match self {
186            HierarchicalPlan::Bucketed(bucketed) => {
187                // TODO: ideally we would not have the `clone()` but ownership
188                // seems fraught here as we are behind a `&mut self` reference.
189                *self =
190                    HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
191            }
192            HierarchicalPlan::Monotonic(monotonic) => {
193                monotonic.must_consolidate = must_consolidate;
194            }
195        }
196    }
197}
198
199/// Plan for computing a set of hierarchical aggregations with a
200/// monotonic input.
201///
202/// Here, the aggregations will be rendered in place. We don't
203/// need to worry about retractions because the inputs are
204/// append only, so we can change our computation to
205/// only retain the "best" value in the diff field, instead
206/// of holding onto all values.
207#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
208pub struct MonotonicPlan {
209    /// All of the aggregations we were asked to compute.
210    pub aggr_funcs: Vec<AggregateFunc>,
211    /// Set of "skips" or calls to `nth()` an iterator needs to do over
212    /// the input to extract the relevant datums.
213    pub skips: Vec<usize>,
214    /// True if the input is not physically monotonic, and the operator must perform
215    /// consolidation to remove potential negations. The operator implementation is
216    /// free to consolidate as late as possible while ensuring correctness, so it is
217    /// not a requirement that the input be directly subjected to consolidation.
218    /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
219    ///
220    /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
221    pub must_consolidate: bool,
222}
223
224/// Plan for computing a set of hierarchical aggregations
225/// with non-monotonic inputs.
226///
227/// To perform hierarchical aggregations with stable runtimes
228/// under updates we'll subdivide the group key into buckets, compute
229/// the reduction in each of those subdivided buckets and then combine
230/// the results into a coarser bucket (one that represents a larger
231/// fraction of the original input) and redo the reduction in another
232/// layer. Effectively, we'll construct a min / max heap out of a series
233/// of reduce operators (each one is a separate layer).
234#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
235pub struct BucketedPlan {
236    /// All of the aggregations we were asked to compute.
237    pub aggr_funcs: Vec<AggregateFunc>,
238    /// Set of "skips" or calls to `nth()` an iterator needs to do over
239    /// the input to extract the relevant datums.
240    pub skips: Vec<usize>,
241    /// The number of buckets in each layer of the reduction tree. Should
242    /// be decreasing, and ideally, a power of two so that we can easily
243    /// distribute values to buckets with `value.hashed() % buckets[layer]`.
244    pub buckets: Vec<u64>,
245}
246
247impl BucketedPlan {
248    /// Convert to a monotonic plan, indicate whether the operator must apply
249    /// consolidation to its input.
250    fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
251        MonotonicPlan {
252            aggr_funcs: self.aggr_funcs,
253            skips: self.skips,
254            must_consolidate,
255        }
256    }
257}
258
259/// Plan for computing a set of basic aggregations.
260///
261/// There's much less complexity when rendering basic aggregations.
262/// Each aggregation corresponds to one Differential reduce operator.
263/// That's it. However, we still want to present one final arrangement
264/// so basic aggregations present results with the same interface
265/// (one arrangement containing a row with all results) that accumulable
266/// and hierarchical aggregations do. To provide that, we render an
267/// additional reduce operator whenever we have multiple reduce aggregates
268/// to combine and present results in the appropriate order. If we
269/// were only asked to compute a single aggregation, we can skip
270/// that step and return the arrangement provided by computing the aggregation
271/// directly.
272#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
273pub enum BasicPlan {
274    /// Plan for rendering a single basic aggregation.
275    Single(SingleBasicPlan),
276    /// Plan for rendering multiple basic aggregations.
277    /// These need to then be collated together in an additional
278    /// reduction. Each element represents the:
279    /// `(index of the set of the input we are aggregating over,
280    ///   the aggregation function)`
281    Multiple(Vec<(usize, AggregateExpr)>),
282}
283
284/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
285/// this aggregation.
286#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
287pub struct SingleBasicPlan {
288    /// The index in the set of inputs that we are aggregating over.
289    pub index: usize,
290    /// The aggregation that we should perform.
291    pub expr: AggregateExpr,
292    /// Whether we fused a `FlatMap UnnestList` with this aggregation.
293    pub fused_unnest_list: bool,
294}
295
296/// Plan for collating the results of computing multiple aggregation
297/// types.
298///
299/// TODO: could we express this as a delta join
300#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
301pub struct CollationPlan {
302    /// Accumulable aggregation results to collate, if any.
303    pub accumulable: Option<AccumulablePlan>,
304    /// Hierarchical aggregation results to collate, if any.
305    pub hierarchical: Option<HierarchicalPlan>,
306    /// Basic aggregation results to collate, if any.
307    pub basic: Option<BasicPlan>,
308    /// When we get results back from each of the different
309    /// aggregation types, they will be subsequences of
310    /// the sequence aggregations in the original reduce expression.
311    /// We keep a map from output position -> reduction type
312    /// to easily merge results back into the requested order.
313    pub aggregate_types: Vec<ReductionType>,
314}
315
316impl CollationPlan {
317    /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
318    /// and sets consolidation requirements.
319    pub fn as_monotonic(&mut self, must_consolidate: bool) {
320        self.hierarchical
321            .as_mut()
322            .map(|plan| plan.as_monotonic(must_consolidate));
323    }
324}
325
326impl ReducePlan {
327    /// Generate a plan for computing the supplied aggregations.
328    ///
329    /// The resulting plan summarizes what the dataflow to be created
330    /// and how the aggregations will be executed.
331    pub fn create_from(
332        aggregates: Vec<AggregateExpr>,
333        monotonic: bool,
334        expected_group_size: Option<u64>,
335        fused_unnest_list: bool,
336    ) -> Self {
337        // If we don't have any aggregations we are just computing a distinct.
338        if aggregates.is_empty() {
339            return ReducePlan::Distinct;
340        }
341
342        // Otherwise, we need to group aggregations according to their
343        // reduction type (accumulable, hierarchical, or basic)
344        let mut reduction_types = BTreeMap::new();
345        // We need to make sure that each list of aggregates by type forms
346        // a subsequence of the overall sequence of aggregates.
347        for index in 0..aggregates.len() {
348            let typ = reduction_type(&aggregates[index].func);
349            let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
350            aggregates_list.push((index, aggregates[index].clone()));
351        }
352
353        // Convert each grouped list of reductions into a plan.
354        let plan: Vec<_> = reduction_types
355            .into_iter()
356            .map(|(typ, aggregates_list)| {
357                ReducePlan::create_inner(
358                    typ,
359                    aggregates_list,
360                    monotonic,
361                    expected_group_size,
362                    fused_unnest_list,
363                )
364            })
365            .collect();
366
367        // If we only have a single type of aggregation present we can
368        // render that directly
369        if plan.len() == 1 {
370            return plan[0].clone();
371        }
372
373        // Warn if we encounter a collation plan. This can trigger if the `enable_reduce_reduction`
374        // flag is disabled.
375        soft_assert_eq_or_log!(
376            plan.len(),
377            1,
378            "Expected reduce reduction to remove collation plans"
379        );
380
381        // Otherwise, we have to stitch reductions together.
382
383        // First, lets sanity check that we don't have an impossible number
384        // of reduction types.
385        assert!(plan.len() <= 3);
386
387        let mut collation: CollationPlan = Default::default();
388
389        // Construct a mapping from output_position -> reduction that we can
390        // use to reconstruct the output in the correct order.
391        let aggregate_types = aggregates
392            .iter()
393            .map(|a| reduction_type(&a.func))
394            .collect::<Vec<_>>();
395
396        collation.aggregate_types = aggregate_types;
397
398        for expr in plan.into_iter() {
399            match expr {
400                ReducePlan::Accumulable(e) => {
401                    assert_none!(collation.accumulable);
402                    collation.accumulable = Some(e);
403                }
404                ReducePlan::Hierarchical(e) => {
405                    assert_none!(collation.hierarchical);
406                    collation.hierarchical = Some(e);
407                }
408                ReducePlan::Basic(e) => {
409                    assert_none!(collation.basic);
410                    collation.basic = Some(e);
411                }
412                ReducePlan::Distinct | ReducePlan::Collation(_) => {
413                    panic!("Inner reduce plan was unsupported type!")
414                }
415            }
416        }
417
418        ReducePlan::Collation(collation)
419    }
420
421    /// Generate a plan for computing the specified type of aggregations.
422    ///
423    /// This function assumes that all of the supplied aggregates are
424    /// actually of the correct reduction type, and are a subsequence
425    /// of the total list of requested aggregations.
426    fn create_inner(
427        typ: ReductionType,
428        aggregates_list: Vec<(usize, AggregateExpr)>,
429        monotonic: bool,
430        expected_group_size: Option<u64>,
431        fused_unnest_list: bool,
432    ) -> Self {
433        if fused_unnest_list {
434            assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
435        }
436        assert!(
437            aggregates_list.len() > 0,
438            "error: tried to render a reduce dataflow with no aggregates"
439        );
440        match typ {
441            ReductionType::Accumulable => {
442                let mut simple_aggrs = vec![];
443                let mut distinct_aggrs = vec![];
444                let full_aggrs: Vec<_> = aggregates_list
445                    .iter()
446                    .cloned()
447                    .map(|(_, aggr)| aggr)
448                    .collect();
449                for (accumulable_index, (datum_index, aggr)) in
450                    aggregates_list.into_iter().enumerate()
451                {
452                    // Accumulable aggregations need to do extra per-aggregate work
453                    // for aggregations with the distinct bit set, so we'll separate
454                    // those out now.
455                    if aggr.distinct {
456                        distinct_aggrs.push((accumulable_index, datum_index, aggr));
457                    } else {
458                        simple_aggrs.push((accumulable_index, datum_index, aggr));
459                    };
460                }
461                ReducePlan::Accumulable(AccumulablePlan {
462                    full_aggrs,
463                    simple_aggrs,
464                    distinct_aggrs,
465                })
466            }
467            ReductionType::Hierarchical => {
468                let aggr_funcs: Vec<_> = aggregates_list
469                    .iter()
470                    .cloned()
471                    .map(|(_, aggr)| aggr.func)
472                    .collect();
473                let indexes: Vec<_> = aggregates_list
474                    .into_iter()
475                    .map(|(index, _)| index)
476                    .collect();
477
478                // We don't have random access over Rows so we can simplify the
479                // task of grabbing the inputs we are aggregating over by
480                // generating a list of "skips" an iterator over the Row needs
481                // to do to get the desired indexes.
482                let skips = convert_indexes_to_skips(indexes);
483                if monotonic {
484                    let monotonic = MonotonicPlan {
485                        aggr_funcs,
486                        skips,
487                        must_consolidate: false,
488                    };
489                    ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
490                } else {
491                    let buckets = bucketing_of_expected_group_size(expected_group_size);
492                    let bucketed = BucketedPlan {
493                        aggr_funcs,
494                        skips,
495                        buckets,
496                    };
497
498                    ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
499                }
500            }
501            ReductionType::Basic => {
502                if aggregates_list.len() == 1 {
503                    ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
504                        index: aggregates_list[0].0,
505                        expr: aggregates_list[0].1.clone(),
506                        fused_unnest_list,
507                    }))
508                } else {
509                    ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
510                }
511            }
512        }
513    }
514
515    /// Reports all keys of produced arrangements.
516    ///
517    /// This is likely either an empty vector, for no arrangement,
518    /// or a singleton vector containing the list of expressions
519    /// that key a single arrangement.
520    pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
521        let key = (0..key_arity)
522            .map(MirScalarExpr::column)
523            .collect::<Vec<_>>();
524        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
525        AvailableCollections::new_arranged(vec![(key, permutation, thinning)])
526    }
527
528    /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
529    /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
530    /// one sitting on top of the reduction.
531    ///
532    /// Non-fusable parts include temporal predicates or any other parts that cannot be
533    /// conservatively asserted to not increase the memory requirements of the output
534    /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
535    /// being the identity MFP.
536    pub fn extract_mfp_after(
537        &self,
538        mut mfp: MapFilterProject,
539        key_arity: usize,
540    ) -> (MapFilterProject, MapFilterProject, usize) {
541        // Extract temporal predicates, as we cannot push them into `Reduce`.
542        let temporal_mfp = mfp.extract_temporal();
543        let non_temporal = mfp;
544        mfp = temporal_mfp;
545
546        // We ensure we do not attempt to project away the key, as we cannot accomplish
547        // this. This is done by a simple analysis of the non-temporal part of `mfp` to
548        // check if can be directly absorbed; if it can't, we then default to a general
549        // strategy that unpacks the MFP to absorb only the filter and supporting map
550        // parts, followed by a post-MFP step.
551        let input_arity = non_temporal.input_arity;
552        let key = Vec::from_iter(0..key_arity);
553        let mut mfp_push;
554        let output_arity;
555
556        if non_temporal.projection.len() <= input_arity
557            && non_temporal.projection.iter().all(|c| *c < input_arity)
558            && non_temporal.projection.starts_with(&key)
559        {
560            // Special case: The key is preserved as a prefix and the projection is only
561            // of output fields from the reduction. So we know that: (a) We can process the
562            // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
563            // and (c) The output projection is at most as wide as the output that would be
564            // produced by the reduction, so we are sure to never regress the memory
565            // requirements of the output arrangement.
566            // Note that this strategy may change the arity of the output arrangement.
567            output_arity = non_temporal.projection.len();
568            mfp_push = non_temporal;
569        } else {
570            // General strategy: Unpack MFP as MF followed by P' that removes all M
571            // columns, then MP afterwards.
572            // Note that this strategy does not result in any changes to the arity of
573            // the output arrangement.
574            let (m, f, p) = non_temporal.into_map_filter_project();
575            mfp_push = MapFilterProject::new(input_arity)
576                .map(m.clone())
577                .filter(f)
578                .project(0..input_arity);
579            output_arity = input_arity;
580
581            // We still need to perform the map and projection for the actual output.
582            let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
583
584            // Compose the non-pushed MFP components.
585            mfp = MapFilterProject::compose(mfp_left, mfp);
586        }
587        mfp_push.optimize();
588        mfp.optimize();
589        (mfp_push, mfp, output_arity)
590    }
591}
592
593/// Plan for extracting keys and values in preparation for a reduction.
594#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
595pub struct KeyValPlan {
596    /// Extracts the columns used as the key.
597    pub key_plan: mz_expr::SafeMfpPlan,
598    /// Extracts the columns used to feed the aggregations.
599    pub val_plan: mz_expr::SafeMfpPlan,
600}
601
602impl KeyValPlan {
603    /// Create a new [KeyValPlan] from aggregation arguments.
604    pub fn new(
605        input_arity: usize,
606        group_key: &[MirScalarExpr],
607        aggregates: &[AggregateExpr],
608        input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
609    ) -> Self {
610        // Form an operator for evaluating key expressions.
611        let mut key_mfp = MapFilterProject::new(input_arity)
612            .map(group_key.iter().cloned())
613            .project(input_arity..(input_arity + group_key.len()));
614        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
615            key_mfp.permute_fn(|c| input_permutation[c], new_arity);
616        }
617
618        // Form an operator for evaluating value expressions.
619        let mut val_mfp = MapFilterProject::new(input_arity)
620            .map(aggregates.iter().map(|a| a.expr.clone()))
621            .project(input_arity..(input_arity + aggregates.len()));
622        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
623            val_mfp.permute_fn(|c| input_permutation[c], new_arity);
624        }
625
626        key_mfp.optimize();
627        let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
628        val_mfp.optimize();
629        let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
630
631        Self { key_plan, val_plan }
632    }
633
634    /// The arity of the key plan
635    pub fn key_arity(&self) -> usize {
636        self.key_plan.projection.len()
637    }
638}
639
640/// Transforms a vector containing indexes of needed columns into one containing
641/// the "skips" an iterator over a Row would need to perform to see those values.
642///
643/// This function requires that all of the elements in `indexes` are strictly
644/// increasing.
645///
646/// # Examples
647///
648/// ```
649/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
650/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
651/// ```
652pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
653    for i in 1..indexes.len() {
654        soft_assert_or_log!(
655            indexes[i - 1] < indexes[i],
656            "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
657            indexes,
658        );
659    }
660
661    for i in (1..indexes.len()).rev() {
662        indexes[i] -= indexes[i - 1];
663        indexes[i] -= 1;
664    }
665
666    indexes
667}
668
669/// Determines whether a function can be accumulated in an update's "difference" field,
670/// and whether it can be subjected to recursive (hierarchical) aggregation.
671///
672/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
673/// which can be accumulated in-place using the addition operation on the type. Aggregations
674/// that indicate they are accumulable will still need to provide an action that takes their
675/// data and introduces it as a difference, and the post-processing when the accumulated value
676/// is presented as data.
677///
678/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
679/// increasingly large subsets of each key. This has the intended property that no invocation
680/// is on a significantly large set of values (and so, no incremental update needs to reform
681/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
682/// input stream is append-only as then we only need to retain the "currently winning" value.
683/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
684pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
685    match func {
686        AggregateFunc::SumInt16
687        | AggregateFunc::SumInt32
688        | AggregateFunc::SumInt64
689        | AggregateFunc::SumUInt16
690        | AggregateFunc::SumUInt32
691        | AggregateFunc::SumUInt64
692        | AggregateFunc::SumFloat32
693        | AggregateFunc::SumFloat64
694        | AggregateFunc::SumNumeric
695        | AggregateFunc::Count
696        | AggregateFunc::Any
697        | AggregateFunc::All
698        | AggregateFunc::Dummy => ReductionType::Accumulable,
699        AggregateFunc::MaxNumeric
700        | AggregateFunc::MaxInt16
701        | AggregateFunc::MaxInt32
702        | AggregateFunc::MaxInt64
703        | AggregateFunc::MaxUInt16
704        | AggregateFunc::MaxUInt32
705        | AggregateFunc::MaxUInt64
706        | AggregateFunc::MaxMzTimestamp
707        | AggregateFunc::MaxFloat32
708        | AggregateFunc::MaxFloat64
709        | AggregateFunc::MaxBool
710        | AggregateFunc::MaxString
711        | AggregateFunc::MaxDate
712        | AggregateFunc::MaxTimestamp
713        | AggregateFunc::MaxTimestampTz
714        | AggregateFunc::MaxInterval
715        | AggregateFunc::MaxTime
716        | AggregateFunc::MinNumeric
717        | AggregateFunc::MinInt16
718        | AggregateFunc::MinInt32
719        | AggregateFunc::MinInt64
720        | AggregateFunc::MinUInt16
721        | AggregateFunc::MinUInt32
722        | AggregateFunc::MinUInt64
723        | AggregateFunc::MinMzTimestamp
724        | AggregateFunc::MinInterval
725        | AggregateFunc::MinFloat32
726        | AggregateFunc::MinFloat64
727        | AggregateFunc::MinBool
728        | AggregateFunc::MinString
729        | AggregateFunc::MinDate
730        | AggregateFunc::MinTimestamp
731        | AggregateFunc::MinTimestampTz
732        | AggregateFunc::MinTime => ReductionType::Hierarchical,
733        AggregateFunc::JsonbAgg { .. }
734        | AggregateFunc::JsonbObjectAgg { .. }
735        | AggregateFunc::MapAgg { .. }
736        | AggregateFunc::ArrayConcat { .. }
737        | AggregateFunc::ListConcat { .. }
738        | AggregateFunc::StringAgg { .. }
739        | AggregateFunc::RowNumber { .. }
740        | AggregateFunc::Rank { .. }
741        | AggregateFunc::DenseRank { .. }
742        | AggregateFunc::LagLead { .. }
743        | AggregateFunc::FirstValue { .. }
744        | AggregateFunc::LastValue { .. }
745        | AggregateFunc::WindowAggregate { .. }
746        | AggregateFunc::FusedValueWindowFunc { .. }
747        | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
748    }
749}