mz_compute_types/plan/reduce.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//! Accumulable reductions can be computed inline in a Differential update's `difference`
32//! field because they basically boil down to tracking counts of things. `sum()` is an
33//! example of an accumulable reduction, and when some element `x` is removed from the set
34//! of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//! formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//! Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//! they are still commutative and associative, which lets us compute the reduction over subsets
39//! of the input, and then compute the reduction again on those results. For example:
40//! `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//! reductions this way, we can maintain the computation in sublinear time with respect to
42//! the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//! hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//! but in order to benefit from being computed hierarchically, they need to have some reduction
45//! in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//! hierarchical evaluation.
47//!
48//! When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//! hierarchically can instead be computed in-place, because we only need to keep the value that's
50//! better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//! Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//! hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//! we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use std::collections::BTreeMap;
64
65use mz_expr::{
66 AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
67};
68use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log};
69use serde::{Deserialize, Serialize};
70
71use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
72
73/// This enum represents the three potential types of aggregations.
74#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
75pub enum ReductionType {
76 /// Accumulable functions can be subtracted from (are invertible), and associative.
77 /// We can compute these results by moving some data to the diff field under arbitrary
78 /// changes to inputs. Examples include sum or count.
79 Accumulable,
80 /// Hierarchical functions are associative, which means we can split up the work of
81 /// computing them across subsets. Note that hierarchical reductions should also
82 /// reduce the data in some way, as otherwise rendering them hierarchically is not
83 /// worth it. Examples include min or max.
84 Hierarchical,
85 /// Basic, for lack of a better word, are functions that are neither accumulable
86 /// nor hierarchical. Examples include jsonb_agg.
87 Basic,
88}
89
90impl columnation::Columnation for ReductionType {
91 type InnerRegion = columnation::CopyRegion<ReductionType>;
92}
93
94impl TryFrom<&ReducePlan> for ReductionType {
95 type Error = ();
96
97 fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
98 match plan {
99 ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
100 ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
101 ReducePlan::Basic(_) => Ok(ReductionType::Basic),
102 _ => Err(()),
103 }
104 }
105}
106
107/// A `ReducePlan` provides a concise description for how we will
108/// execute a given reduce expression.
109///
110/// The provided reduce expression can have no
111/// aggregations, in which case its just a `Distinct` and otherwise
112/// it's composed of a combination of accumulable, hierarchical and
113/// basic aggregations.
114///
115/// We want to try to centralize as much decision making about the
116/// shape / general computation of the rendered dataflow graph
117/// in this plan, and then make actually rendering the graph
118/// be as simple (and compiler verifiable) as possible.
119#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
120pub enum ReducePlan {
121 /// Plan for not computing any aggregations, just determining the set of
122 /// distinct keys.
123 Distinct,
124 /// Plan for computing only accumulable aggregations.
125 Accumulable(AccumulablePlan),
126 /// Plan for computing only hierarchical aggregations.
127 Hierarchical(HierarchicalPlan),
128 /// Plan for computing only basic aggregations.
129 Basic(BasicPlan),
130 /// Plan for computing a mix of different kinds of aggregations.
131 /// We need to do extra work here to reassemble results back in the
132 /// requested order.
133 Collation(CollationPlan),
134}
135
136/// Plan for computing a set of accumulable aggregations.
137///
138/// We fuse all of the accumulable aggregations together
139/// and compute them with one dataflow fragment. We need to
140/// be careful to separate out the aggregations that
141/// apply only to the distinct set of values. We need
142/// to apply a distinct operator to those before we
143/// combine them with everything else.
144#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
145pub struct AccumulablePlan {
146 /// All of the aggregations we were asked to compute, stored
147 /// in order.
148 pub full_aggrs: Vec<AggregateExpr>,
149 /// All of the non-distinct accumulable aggregates.
150 /// Each element represents:
151 /// (index of the aggregation among accumulable aggregations,
152 /// index of the datum among inputs, aggregation expr)
153 /// These will all be rendered together in one dataflow fragment.
154 pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
155 /// Same as above but for all of the `DISTINCT` accumulable aggregations.
156 pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
157}
158
159/// Plan for computing a set of hierarchical aggregations.
160///
161/// In the append-only setting we can render them in-place
162/// with monotonic plans, but otherwise, we need to render
163/// them with a reduction tree that splits the inputs into
164/// small, and then progressively larger, buckets
165#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
166pub enum HierarchicalPlan {
167 /// Plan hierarchical aggregations under monotonic inputs.
168 Monotonic(MonotonicPlan),
169 /// Plan for hierarchical aggregations under non-monotonic inputs.
170 Bucketed(BucketedPlan),
171}
172
173impl HierarchicalPlan {
174 /// Returns the set of aggregations computed by this plan.
175 pub fn aggr_funcs(&self) -> &[AggregateFunc] {
176 match self {
177 HierarchicalPlan::Monotonic(plan) => &plan.aggr_funcs,
178 HierarchicalPlan::Bucketed(plan) => &plan.aggr_funcs,
179 }
180 }
181
182 /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
183 /// and sets consolidation requirements.
184 pub fn as_monotonic(&mut self, must_consolidate: bool) {
185 match self {
186 HierarchicalPlan::Bucketed(bucketed) => {
187 // TODO: ideally we would not have the `clone()` but ownership
188 // seems fraught here as we are behind a `&mut self` reference.
189 *self =
190 HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
191 }
192 HierarchicalPlan::Monotonic(monotonic) => {
193 monotonic.must_consolidate = must_consolidate;
194 }
195 }
196 }
197}
198
199/// Plan for computing a set of hierarchical aggregations with a
200/// monotonic input.
201///
202/// Here, the aggregations will be rendered in place. We don't
203/// need to worry about retractions because the inputs are
204/// append only, so we can change our computation to
205/// only retain the "best" value in the diff field, instead
206/// of holding onto all values.
207#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
208pub struct MonotonicPlan {
209 /// All of the aggregations we were asked to compute.
210 pub aggr_funcs: Vec<AggregateFunc>,
211 /// Set of "skips" or calls to `nth()` an iterator needs to do over
212 /// the input to extract the relevant datums.
213 pub skips: Vec<usize>,
214 /// True if the input is not physically monotonic, and the operator must perform
215 /// consolidation to remove potential negations. The operator implementation is
216 /// free to consolidate as late as possible while ensuring correctness, so it is
217 /// not a requirement that the input be directly subjected to consolidation.
218 /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
219 ///
220 /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
221 pub must_consolidate: bool,
222}
223
224/// Plan for computing a set of hierarchical aggregations
225/// with non-monotonic inputs.
226///
227/// To perform hierarchical aggregations with stable runtimes
228/// under updates we'll subdivide the group key into buckets, compute
229/// the reduction in each of those subdivided buckets and then combine
230/// the results into a coarser bucket (one that represents a larger
231/// fraction of the original input) and redo the reduction in another
232/// layer. Effectively, we'll construct a min / max heap out of a series
233/// of reduce operators (each one is a separate layer).
234#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
235pub struct BucketedPlan {
236 /// All of the aggregations we were asked to compute.
237 pub aggr_funcs: Vec<AggregateFunc>,
238 /// Set of "skips" or calls to `nth()` an iterator needs to do over
239 /// the input to extract the relevant datums.
240 pub skips: Vec<usize>,
241 /// The number of buckets in each layer of the reduction tree. Should
242 /// be decreasing, and ideally, a power of two so that we can easily
243 /// distribute values to buckets with `value.hashed() % buckets[layer]`.
244 pub buckets: Vec<u64>,
245}
246
247impl BucketedPlan {
248 /// Convert to a monotonic plan, indicate whether the operator must apply
249 /// consolidation to its input.
250 fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
251 MonotonicPlan {
252 aggr_funcs: self.aggr_funcs,
253 skips: self.skips,
254 must_consolidate,
255 }
256 }
257}
258
259/// Plan for computing a set of basic aggregations.
260///
261/// There's much less complexity when rendering basic aggregations.
262/// Each aggregation corresponds to one Differential reduce operator.
263/// That's it. However, we still want to present one final arrangement
264/// so basic aggregations present results with the same interface
265/// (one arrangement containing a row with all results) that accumulable
266/// and hierarchical aggregations do. To provide that, we render an
267/// additional reduce operator whenever we have multiple reduce aggregates
268/// to combine and present results in the appropriate order. If we
269/// were only asked to compute a single aggregation, we can skip
270/// that step and return the arrangement provided by computing the aggregation
271/// directly.
272#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
273pub enum BasicPlan {
274 /// Plan for rendering a single basic aggregation.
275 Single(SingleBasicPlan),
276 /// Plan for rendering multiple basic aggregations.
277 /// These need to then be collated together in an additional
278 /// reduction. Each element represents the:
279 /// `(index of the set of the input we are aggregating over,
280 /// the aggregation function)`
281 Multiple(Vec<(usize, AggregateExpr)>),
282}
283
284/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
285/// this aggregation.
286#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
287pub struct SingleBasicPlan {
288 /// The index in the set of inputs that we are aggregating over.
289 pub index: usize,
290 /// The aggregation that we should perform.
291 pub expr: AggregateExpr,
292 /// Whether we fused a `FlatMap UnnestList` with this aggregation.
293 pub fused_unnest_list: bool,
294}
295
296/// Plan for collating the results of computing multiple aggregation
297/// types.
298///
299/// TODO: could we express this as a delta join
300#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
301pub struct CollationPlan {
302 /// Accumulable aggregation results to collate, if any.
303 pub accumulable: Option<AccumulablePlan>,
304 /// Hierarchical aggregation results to collate, if any.
305 pub hierarchical: Option<HierarchicalPlan>,
306 /// Basic aggregation results to collate, if any.
307 pub basic: Option<BasicPlan>,
308 /// When we get results back from each of the different
309 /// aggregation types, they will be subsequences of
310 /// the sequence aggregations in the original reduce expression.
311 /// We keep a map from output position -> reduction type
312 /// to easily merge results back into the requested order.
313 pub aggregate_types: Vec<ReductionType>,
314}
315
316impl CollationPlan {
317 /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
318 /// and sets consolidation requirements.
319 pub fn as_monotonic(&mut self, must_consolidate: bool) {
320 self.hierarchical
321 .as_mut()
322 .map(|plan| plan.as_monotonic(must_consolidate));
323 }
324}
325
326impl ReducePlan {
327 /// Generate a plan for computing the supplied aggregations.
328 ///
329 /// The resulting plan summarizes what the dataflow to be created
330 /// and how the aggregations will be executed.
331 pub fn create_from(
332 aggregates: Vec<AggregateExpr>,
333 monotonic: bool,
334 expected_group_size: Option<u64>,
335 fused_unnest_list: bool,
336 ) -> Self {
337 // If we don't have any aggregations we are just computing a distinct.
338 if aggregates.is_empty() {
339 return ReducePlan::Distinct;
340 }
341
342 // Otherwise, we need to group aggregations according to their
343 // reduction type (accumulable, hierarchical, or basic)
344 let mut reduction_types = BTreeMap::new();
345 // We need to make sure that each list of aggregates by type forms
346 // a subsequence of the overall sequence of aggregates.
347 for index in 0..aggregates.len() {
348 let typ = reduction_type(&aggregates[index].func);
349 let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
350 aggregates_list.push((index, aggregates[index].clone()));
351 }
352
353 // Convert each grouped list of reductions into a plan.
354 let plan: Vec<_> = reduction_types
355 .into_iter()
356 .map(|(typ, aggregates_list)| {
357 ReducePlan::create_inner(
358 typ,
359 aggregates_list,
360 monotonic,
361 expected_group_size,
362 fused_unnest_list,
363 )
364 })
365 .collect();
366
367 // If we only have a single type of aggregation present we can
368 // render that directly
369 if plan.len() == 1 {
370 return plan[0].clone();
371 }
372
373 // Warn if we encounter a collation plan. This can trigger if the `enable_reduce_reduction`
374 // flag is disabled.
375 soft_assert_eq_or_log!(
376 plan.len(),
377 1,
378 "Expected reduce reduction to remove collation plans"
379 );
380
381 // Otherwise, we have to stitch reductions together.
382
383 // First, lets sanity check that we don't have an impossible number
384 // of reduction types.
385 assert!(plan.len() <= 3);
386
387 let mut collation: CollationPlan = Default::default();
388
389 // Construct a mapping from output_position -> reduction that we can
390 // use to reconstruct the output in the correct order.
391 let aggregate_types = aggregates
392 .iter()
393 .map(|a| reduction_type(&a.func))
394 .collect::<Vec<_>>();
395
396 collation.aggregate_types = aggregate_types;
397
398 for expr in plan.into_iter() {
399 match expr {
400 ReducePlan::Accumulable(e) => {
401 assert_none!(collation.accumulable);
402 collation.accumulable = Some(e);
403 }
404 ReducePlan::Hierarchical(e) => {
405 assert_none!(collation.hierarchical);
406 collation.hierarchical = Some(e);
407 }
408 ReducePlan::Basic(e) => {
409 assert_none!(collation.basic);
410 collation.basic = Some(e);
411 }
412 ReducePlan::Distinct | ReducePlan::Collation(_) => {
413 panic!("Inner reduce plan was unsupported type!")
414 }
415 }
416 }
417
418 ReducePlan::Collation(collation)
419 }
420
421 /// Generate a plan for computing the specified type of aggregations.
422 ///
423 /// This function assumes that all of the supplied aggregates are
424 /// actually of the correct reduction type, and are a subsequence
425 /// of the total list of requested aggregations.
426 fn create_inner(
427 typ: ReductionType,
428 aggregates_list: Vec<(usize, AggregateExpr)>,
429 monotonic: bool,
430 expected_group_size: Option<u64>,
431 fused_unnest_list: bool,
432 ) -> Self {
433 if fused_unnest_list {
434 assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
435 }
436 assert!(
437 aggregates_list.len() > 0,
438 "error: tried to render a reduce dataflow with no aggregates"
439 );
440 match typ {
441 ReductionType::Accumulable => {
442 let mut simple_aggrs = vec![];
443 let mut distinct_aggrs = vec![];
444 let full_aggrs: Vec<_> = aggregates_list
445 .iter()
446 .cloned()
447 .map(|(_, aggr)| aggr)
448 .collect();
449 for (accumulable_index, (datum_index, aggr)) in
450 aggregates_list.into_iter().enumerate()
451 {
452 // Accumulable aggregations need to do extra per-aggregate work
453 // for aggregations with the distinct bit set, so we'll separate
454 // those out now.
455 if aggr.distinct {
456 distinct_aggrs.push((accumulable_index, datum_index, aggr));
457 } else {
458 simple_aggrs.push((accumulable_index, datum_index, aggr));
459 };
460 }
461 ReducePlan::Accumulable(AccumulablePlan {
462 full_aggrs,
463 simple_aggrs,
464 distinct_aggrs,
465 })
466 }
467 ReductionType::Hierarchical => {
468 let aggr_funcs: Vec<_> = aggregates_list
469 .iter()
470 .cloned()
471 .map(|(_, aggr)| aggr.func)
472 .collect();
473 let indexes: Vec<_> = aggregates_list
474 .into_iter()
475 .map(|(index, _)| index)
476 .collect();
477
478 // We don't have random access over Rows so we can simplify the
479 // task of grabbing the inputs we are aggregating over by
480 // generating a list of "skips" an iterator over the Row needs
481 // to do to get the desired indexes.
482 let skips = convert_indexes_to_skips(indexes);
483 if monotonic {
484 let monotonic = MonotonicPlan {
485 aggr_funcs,
486 skips,
487 must_consolidate: false,
488 };
489 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
490 } else {
491 let buckets = bucketing_of_expected_group_size(expected_group_size);
492 let bucketed = BucketedPlan {
493 aggr_funcs,
494 skips,
495 buckets,
496 };
497
498 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
499 }
500 }
501 ReductionType::Basic => {
502 if aggregates_list.len() == 1 {
503 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
504 index: aggregates_list[0].0,
505 expr: aggregates_list[0].1.clone(),
506 fused_unnest_list,
507 }))
508 } else {
509 ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
510 }
511 }
512 }
513 }
514
515 /// Reports all keys of produced arrangements.
516 ///
517 /// This is likely either an empty vector, for no arrangement,
518 /// or a singleton vector containing the list of expressions
519 /// that key a single arrangement.
520 pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
521 let key = (0..key_arity)
522 .map(MirScalarExpr::column)
523 .collect::<Vec<_>>();
524 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
525 AvailableCollections::new_arranged(vec![(key, permutation, thinning)])
526 }
527
528 /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
529 /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
530 /// one sitting on top of the reduction.
531 ///
532 /// Non-fusable parts include temporal predicates or any other parts that cannot be
533 /// conservatively asserted to not increase the memory requirements of the output
534 /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
535 /// being the identity MFP.
536 pub fn extract_mfp_after(
537 &self,
538 mut mfp: MapFilterProject,
539 key_arity: usize,
540 ) -> (MapFilterProject, MapFilterProject, usize) {
541 // Extract temporal predicates, as we cannot push them into `Reduce`.
542 let temporal_mfp = mfp.extract_temporal();
543 let non_temporal = mfp;
544 mfp = temporal_mfp;
545
546 // We ensure we do not attempt to project away the key, as we cannot accomplish
547 // this. This is done by a simple analysis of the non-temporal part of `mfp` to
548 // check if can be directly absorbed; if it can't, we then default to a general
549 // strategy that unpacks the MFP to absorb only the filter and supporting map
550 // parts, followed by a post-MFP step.
551 let input_arity = non_temporal.input_arity;
552 let key = Vec::from_iter(0..key_arity);
553 let mut mfp_push;
554 let output_arity;
555
556 if non_temporal.projection.len() <= input_arity
557 && non_temporal.projection.iter().all(|c| *c < input_arity)
558 && non_temporal.projection.starts_with(&key)
559 {
560 // Special case: The key is preserved as a prefix and the projection is only
561 // of output fields from the reduction. So we know that: (a) We can process the
562 // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
563 // and (c) The output projection is at most as wide as the output that would be
564 // produced by the reduction, so we are sure to never regress the memory
565 // requirements of the output arrangement.
566 // Note that this strategy may change the arity of the output arrangement.
567 output_arity = non_temporal.projection.len();
568 mfp_push = non_temporal;
569 } else {
570 // General strategy: Unpack MFP as MF followed by P' that removes all M
571 // columns, then MP afterwards.
572 // Note that this strategy does not result in any changes to the arity of
573 // the output arrangement.
574 let (m, f, p) = non_temporal.into_map_filter_project();
575 mfp_push = MapFilterProject::new(input_arity)
576 .map(m.clone())
577 .filter(f)
578 .project(0..input_arity);
579 output_arity = input_arity;
580
581 // We still need to perform the map and projection for the actual output.
582 let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
583
584 // Compose the non-pushed MFP components.
585 mfp = MapFilterProject::compose(mfp_left, mfp);
586 }
587 mfp_push.optimize();
588 mfp.optimize();
589 (mfp_push, mfp, output_arity)
590 }
591}
592
593/// Plan for extracting keys and values in preparation for a reduction.
594#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
595pub struct KeyValPlan {
596 /// Extracts the columns used as the key.
597 pub key_plan: mz_expr::SafeMfpPlan,
598 /// Extracts the columns used to feed the aggregations.
599 pub val_plan: mz_expr::SafeMfpPlan,
600}
601
602impl KeyValPlan {
603 /// Create a new [KeyValPlan] from aggregation arguments.
604 pub fn new(
605 input_arity: usize,
606 group_key: &[MirScalarExpr],
607 aggregates: &[AggregateExpr],
608 input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
609 ) -> Self {
610 // Form an operator for evaluating key expressions.
611 let mut key_mfp = MapFilterProject::new(input_arity)
612 .map(group_key.iter().cloned())
613 .project(input_arity..(input_arity + group_key.len()));
614 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
615 key_mfp.permute_fn(|c| input_permutation[c], new_arity);
616 }
617
618 // Form an operator for evaluating value expressions.
619 let mut val_mfp = MapFilterProject::new(input_arity)
620 .map(aggregates.iter().map(|a| a.expr.clone()))
621 .project(input_arity..(input_arity + aggregates.len()));
622 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
623 val_mfp.permute_fn(|c| input_permutation[c], new_arity);
624 }
625
626 key_mfp.optimize();
627 let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
628 val_mfp.optimize();
629 let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
630
631 Self { key_plan, val_plan }
632 }
633
634 /// The arity of the key plan
635 pub fn key_arity(&self) -> usize {
636 self.key_plan.projection.len()
637 }
638}
639
640/// Transforms a vector containing indexes of needed columns into one containing
641/// the "skips" an iterator over a Row would need to perform to see those values.
642///
643/// This function requires that all of the elements in `indexes` are strictly
644/// increasing.
645///
646/// # Examples
647///
648/// ```
649/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
650/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
651/// ```
652pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
653 for i in 1..indexes.len() {
654 soft_assert_or_log!(
655 indexes[i - 1] < indexes[i],
656 "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
657 indexes,
658 );
659 }
660
661 for i in (1..indexes.len()).rev() {
662 indexes[i] -= indexes[i - 1];
663 indexes[i] -= 1;
664 }
665
666 indexes
667}
668
669/// Determines whether a function can be accumulated in an update's "difference" field,
670/// and whether it can be subjected to recursive (hierarchical) aggregation.
671///
672/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
673/// which can be accumulated in-place using the addition operation on the type. Aggregations
674/// that indicate they are accumulable will still need to provide an action that takes their
675/// data and introduces it as a difference, and the post-processing when the accumulated value
676/// is presented as data.
677///
678/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
679/// increasingly large subsets of each key. This has the intended property that no invocation
680/// is on a significantly large set of values (and so, no incremental update needs to reform
681/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
682/// input stream is append-only as then we only need to retain the "currently winning" value.
683/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
684pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
685 match func {
686 AggregateFunc::SumInt16
687 | AggregateFunc::SumInt32
688 | AggregateFunc::SumInt64
689 | AggregateFunc::SumUInt16
690 | AggregateFunc::SumUInt32
691 | AggregateFunc::SumUInt64
692 | AggregateFunc::SumFloat32
693 | AggregateFunc::SumFloat64
694 | AggregateFunc::SumNumeric
695 | AggregateFunc::Count
696 | AggregateFunc::Any
697 | AggregateFunc::All
698 | AggregateFunc::Dummy => ReductionType::Accumulable,
699 AggregateFunc::MaxNumeric
700 | AggregateFunc::MaxInt16
701 | AggregateFunc::MaxInt32
702 | AggregateFunc::MaxInt64
703 | AggregateFunc::MaxUInt16
704 | AggregateFunc::MaxUInt32
705 | AggregateFunc::MaxUInt64
706 | AggregateFunc::MaxMzTimestamp
707 | AggregateFunc::MaxFloat32
708 | AggregateFunc::MaxFloat64
709 | AggregateFunc::MaxBool
710 | AggregateFunc::MaxString
711 | AggregateFunc::MaxDate
712 | AggregateFunc::MaxTimestamp
713 | AggregateFunc::MaxTimestampTz
714 | AggregateFunc::MaxInterval
715 | AggregateFunc::MaxTime
716 | AggregateFunc::MinNumeric
717 | AggregateFunc::MinInt16
718 | AggregateFunc::MinInt32
719 | AggregateFunc::MinInt64
720 | AggregateFunc::MinUInt16
721 | AggregateFunc::MinUInt32
722 | AggregateFunc::MinUInt64
723 | AggregateFunc::MinMzTimestamp
724 | AggregateFunc::MinInterval
725 | AggregateFunc::MinFloat32
726 | AggregateFunc::MinFloat64
727 | AggregateFunc::MinBool
728 | AggregateFunc::MinString
729 | AggregateFunc::MinDate
730 | AggregateFunc::MinTimestamp
731 | AggregateFunc::MinTimestampTz
732 | AggregateFunc::MinTime => ReductionType::Hierarchical,
733 AggregateFunc::JsonbAgg { .. }
734 | AggregateFunc::JsonbObjectAgg { .. }
735 | AggregateFunc::MapAgg { .. }
736 | AggregateFunc::ArrayConcat { .. }
737 | AggregateFunc::ListConcat { .. }
738 | AggregateFunc::StringAgg { .. }
739 | AggregateFunc::RowNumber { .. }
740 | AggregateFunc::Rank { .. }
741 | AggregateFunc::DenseRank { .. }
742 | AggregateFunc::LagLead { .. }
743 | AggregateFunc::FirstValue { .. }
744 | AggregateFunc::LastValue { .. }
745 | AggregateFunc::WindowAggregate { .. }
746 | AggregateFunc::FusedValueWindowFunc { .. }
747 | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
748 }
749}