mz_compute_types/plan/reduce.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//! Accumulable reductions can be computed inline in a Differential update's `difference`
32//! field because they basically boil down to tracking counts of things. `sum()` is an
33//! example of an accumulable reduction, and when some element `x` is removed from the set
34//! of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//! formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//! Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//! they are still commutative and associative, which lets us compute the reduction over subsets
39//! of the input, and then compute the reduction again on those results. For example:
40//! `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//! reductions this way, we can maintain the computation in sublinear time with respect to
42//! the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//! hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//! but in order to benefit from being computed hierarchically, they need to have some reduction
45//! in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//! hierarchical evaluation.
47//!
48//! When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//! hierarchically can instead be computed in-place, because we only need to keep the value that's
50//! better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//! Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//! hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//! we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use std::collections::BTreeMap;
64
65use mz_expr::{
66 AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
67};
68use mz_ore::{assert_none, soft_assert_or_log};
69use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
70use proptest::prelude::{Arbitrary, BoxedStrategy, any};
71use proptest::strategy::Strategy;
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
76
77include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.reduce.rs"));
78
79/// This enum represents the three potential types of aggregations.
80#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
81pub enum ReductionType {
82 /// Accumulable functions can be subtracted from (are invertible), and associative.
83 /// We can compute these results by moving some data to the diff field under arbitrary
84 /// changes to inputs. Examples include sum or count.
85 Accumulable,
86 /// Hierarchical functions are associative, which means we can split up the work of
87 /// computing them across subsets. Note that hierarchical reductions should also
88 /// reduce the data in some way, as otherwise rendering them hierarchically is not
89 /// worth it. Examples include min or max.
90 Hierarchical,
91 /// Basic, for lack of a better word, are functions that are neither accumulable
92 /// nor hierarchical. Examples include jsonb_agg.
93 Basic,
94}
95
96impl columnation::Columnation for ReductionType {
97 type InnerRegion = columnation::CopyRegion<ReductionType>;
98}
99
100impl RustType<ProtoReductionType> for ReductionType {
101 fn into_proto(&self) -> ProtoReductionType {
102 use proto_reduction_type::Kind;
103 ProtoReductionType {
104 kind: Some(match self {
105 ReductionType::Accumulable => Kind::Accumulable(()),
106 ReductionType::Hierarchical => Kind::Hierarchical(()),
107 ReductionType::Basic => Kind::Basic(()),
108 }),
109 }
110 }
111
112 fn from_proto(proto: ProtoReductionType) -> Result<Self, TryFromProtoError> {
113 use proto_reduction_type::Kind;
114 let kind = proto
115 .kind
116 .ok_or_else(|| TryFromProtoError::missing_field("kind"))?;
117 Ok(match kind {
118 Kind::Accumulable(()) => ReductionType::Accumulable,
119 Kind::Hierarchical(()) => ReductionType::Hierarchical,
120 Kind::Basic(()) => ReductionType::Basic,
121 })
122 }
123}
124
125impl TryFrom<&ReducePlan> for ReductionType {
126 type Error = ();
127
128 fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
129 match plan {
130 ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
131 ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
132 ReducePlan::Basic(_) => Ok(ReductionType::Basic),
133 _ => Err(()),
134 }
135 }
136}
137
138/// A `ReducePlan` provides a concise description for how we will
139/// execute a given reduce expression.
140///
141/// The provided reduce expression can have no
142/// aggregations, in which case its just a `Distinct` and otherwise
143/// it's composed of a combination of accumulable, hierarchical and
144/// basic aggregations.
145///
146/// We want to try to centralize as much decision making about the
147/// shape / general computation of the rendered dataflow graph
148/// in this plan, and then make actually rendering the graph
149/// be as simple (and compiler verifiable) as possible.
150#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
151pub enum ReducePlan {
152 /// Plan for not computing any aggregations, just determining the set of
153 /// distinct keys.
154 Distinct,
155 /// Plan for computing only accumulable aggregations.
156 Accumulable(AccumulablePlan),
157 /// Plan for computing only hierarchical aggregations.
158 Hierarchical(HierarchicalPlan),
159 /// Plan for computing only basic aggregations.
160 Basic(BasicPlan),
161 /// Plan for computing a mix of different kinds of aggregations.
162 /// We need to do extra work here to reassemble results back in the
163 /// requested order.
164 Collation(CollationPlan),
165}
166
167proptest::prop_compose! {
168 /// `expected_group_size` is a u64, but instead of a uniform distribution,
169 /// we want a logarithmic distribution so that we have an even distribution
170 /// in the number of layers of buckets that a hierarchical plan would have.
171 fn any_group_size()
172 (bits in 0..u64::BITS)
173 (integer in (((1_u64) << bits) - 1)
174 ..(if bits == (u64::BITS - 1){ u64::MAX }
175 else { (1_u64) << (bits + 1) - 1 }))
176 -> u64 {
177 integer
178 }
179}
180
181/// To avoid stack overflow, this limits the arbitrarily-generated test
182/// `ReducePlan`s to involve at most 8 aggregations.
183///
184/// To have better coverage of realistic expected group sizes, the
185/// `expected group size` has a logarithmic distribution.
186impl Arbitrary for ReducePlan {
187 type Parameters = ();
188
189 type Strategy = BoxedStrategy<Self>;
190
191 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
192 (
193 proptest::collection::vec(any::<AggregateExpr>(), 0..8),
194 any::<bool>(),
195 any::<bool>(),
196 any_group_size(),
197 any::<bool>(),
198 )
199 .prop_map(
200 |(
201 exprs,
202 monotonic,
203 any_expected_size,
204 expected_group_size,
205 mut fused_unnest_list,
206 )| {
207 let expected_group_size = if any_expected_size {
208 Some(expected_group_size)
209 } else {
210 None
211 };
212 if !(exprs.len() == 1
213 && matches!(reduction_type(&exprs[0].func), ReductionType::Basic))
214 {
215 fused_unnest_list = false;
216 }
217 ReducePlan::create_from(
218 exprs,
219 monotonic,
220 expected_group_size,
221 fused_unnest_list,
222 )
223 },
224 )
225 .boxed()
226 }
227}
228
229impl RustType<ProtoReducePlan> for ReducePlan {
230 fn into_proto(&self) -> ProtoReducePlan {
231 use proto_reduce_plan::Kind::*;
232 ProtoReducePlan {
233 kind: Some(match self {
234 ReducePlan::Distinct => Distinct(()),
235 ReducePlan::Accumulable(plan) => Accumulable(plan.into_proto()),
236 ReducePlan::Hierarchical(plan) => Hierarchical(plan.into_proto()),
237 ReducePlan::Basic(plan) => Basic(plan.into_proto()),
238 ReducePlan::Collation(plan) => Collation(plan.into_proto()),
239 }),
240 }
241 }
242
243 fn from_proto(proto: ProtoReducePlan) -> Result<Self, TryFromProtoError> {
244 use proto_reduce_plan::Kind::*;
245 let kind = proto
246 .kind
247 .ok_or_else(|| TryFromProtoError::missing_field("ProtoReducePlan::kind"))?;
248 Ok(match kind {
249 Distinct(()) => ReducePlan::Distinct,
250 Accumulable(plan) => ReducePlan::Accumulable(plan.into_rust()?),
251 Hierarchical(plan) => ReducePlan::Hierarchical(plan.into_rust()?),
252 Basic(plan) => ReducePlan::Basic(plan.into_rust()?),
253 Collation(plan) => ReducePlan::Collation(plan.into_rust()?),
254 })
255 }
256}
257
258/// Plan for computing a set of accumulable aggregations.
259///
260/// We fuse all of the accumulable aggregations together
261/// and compute them with one dataflow fragment. We need to
262/// be careful to separate out the aggregations that
263/// apply only to the distinct set of values. We need
264/// to apply a distinct operator to those before we
265/// combine them with everything else.
266#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
267pub struct AccumulablePlan {
268 /// All of the aggregations we were asked to compute, stored
269 /// in order.
270 pub full_aggrs: Vec<AggregateExpr>,
271 /// All of the non-distinct accumulable aggregates.
272 /// Each element represents:
273 /// (index of the aggregation among accumulable aggregations,
274 /// index of the datum among inputs, aggregation expr)
275 /// These will all be rendered together in one dataflow fragment.
276 pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
277 /// Same as above but for all of the `DISTINCT` accumulable aggregations.
278 pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
279}
280
281impl RustType<proto_accumulable_plan::ProtoAggr> for (usize, usize, AggregateExpr) {
282 fn into_proto(&self) -> proto_accumulable_plan::ProtoAggr {
283 proto_accumulable_plan::ProtoAggr {
284 index_agg: self.0.into_proto(),
285 index_inp: self.1.into_proto(),
286 expr: Some(self.2.into_proto()),
287 }
288 }
289
290 fn from_proto(proto: proto_accumulable_plan::ProtoAggr) -> Result<Self, TryFromProtoError> {
291 Ok((
292 proto.index_agg.into_rust()?,
293 proto.index_inp.into_rust()?,
294 proto.expr.into_rust_if_some("ProtoAggr::expr")?,
295 ))
296 }
297}
298
299impl RustType<ProtoAccumulablePlan> for AccumulablePlan {
300 fn into_proto(&self) -> ProtoAccumulablePlan {
301 ProtoAccumulablePlan {
302 full_aggrs: self.full_aggrs.into_proto(),
303 simple_aggrs: self.simple_aggrs.into_proto(),
304 distinct_aggrs: self.distinct_aggrs.into_proto(),
305 }
306 }
307
308 fn from_proto(proto: ProtoAccumulablePlan) -> Result<Self, TryFromProtoError> {
309 Ok(Self {
310 full_aggrs: proto.full_aggrs.into_rust()?,
311 simple_aggrs: proto.simple_aggrs.into_rust()?,
312 distinct_aggrs: proto.distinct_aggrs.into_rust()?,
313 })
314 }
315}
316
317/// Plan for computing a set of hierarchical aggregations.
318///
319/// In the append-only setting we can render them in-place
320/// with monotonic plans, but otherwise, we need to render
321/// them with a reduction tree that splits the inputs into
322/// small, and then progressively larger, buckets
323#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
324pub enum HierarchicalPlan {
325 /// Plan hierarchical aggregations under monotonic inputs.
326 Monotonic(MonotonicPlan),
327 /// Plan for hierarchical aggregations under non-monotonic inputs.
328 Bucketed(BucketedPlan),
329}
330
331impl HierarchicalPlan {
332 /// Returns the set of aggregations computed by this plan.
333 pub fn aggr_funcs(&self) -> &[AggregateFunc] {
334 match self {
335 HierarchicalPlan::Monotonic(plan) => &plan.aggr_funcs,
336 HierarchicalPlan::Bucketed(plan) => &plan.aggr_funcs,
337 }
338 }
339
340 /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
341 /// and sets consolidation requirements.
342 pub fn as_monotonic(&mut self, must_consolidate: bool) {
343 match self {
344 HierarchicalPlan::Bucketed(bucketed) => {
345 // TODO: ideally we would not have the `clone()` but ownership
346 // seems fraught here as we are behind a `&mut self` reference.
347 *self =
348 HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
349 }
350 HierarchicalPlan::Monotonic(monotonic) => {
351 monotonic.must_consolidate = must_consolidate;
352 }
353 }
354 }
355}
356
357impl RustType<ProtoHierarchicalPlan> for HierarchicalPlan {
358 fn into_proto(&self) -> ProtoHierarchicalPlan {
359 use proto_hierarchical_plan::Kind;
360 ProtoHierarchicalPlan {
361 kind: Some(match self {
362 HierarchicalPlan::Monotonic(plan) => Kind::Monotonic(plan.into_proto()),
363 HierarchicalPlan::Bucketed(plan) => Kind::Bucketed(plan.into_proto()),
364 }),
365 }
366 }
367
368 fn from_proto(proto: ProtoHierarchicalPlan) -> Result<Self, TryFromProtoError> {
369 use proto_hierarchical_plan::Kind;
370 let kind = proto
371 .kind
372 .ok_or_else(|| TryFromProtoError::missing_field("ProtoHierarchicalPlan::Kind"))?;
373 Ok(match kind {
374 Kind::Monotonic(plan) => HierarchicalPlan::Monotonic(plan.into_rust()?),
375 Kind::Bucketed(plan) => HierarchicalPlan::Bucketed(plan.into_rust()?),
376 })
377 }
378}
379
380/// Plan for computing a set of hierarchical aggregations with a
381/// monotonic input.
382///
383/// Here, the aggregations will be rendered in place. We don't
384/// need to worry about retractions because the inputs are
385/// append only, so we can change our computation to
386/// only retain the "best" value in the diff field, instead
387/// of holding onto all values.
388#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
389pub struct MonotonicPlan {
390 /// All of the aggregations we were asked to compute.
391 pub aggr_funcs: Vec<AggregateFunc>,
392 /// Set of "skips" or calls to `nth()` an iterator needs to do over
393 /// the input to extract the relevant datums.
394 pub skips: Vec<usize>,
395 /// True if the input is not physically monotonic, and the operator must perform
396 /// consolidation to remove potential negations. The operator implementation is
397 /// free to consolidate as late as possible while ensuring correctness, so it is
398 /// not a requirement that the input be directly subjected to consolidation.
399 /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
400 ///
401 /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
402 pub must_consolidate: bool,
403}
404
405impl RustType<ProtoMonotonicPlan> for MonotonicPlan {
406 fn into_proto(&self) -> ProtoMonotonicPlan {
407 ProtoMonotonicPlan {
408 aggr_funcs: self.aggr_funcs.into_proto(),
409 skips: self.skips.into_proto(),
410 must_consolidate: self.must_consolidate.into_proto(),
411 }
412 }
413
414 fn from_proto(proto: ProtoMonotonicPlan) -> Result<Self, TryFromProtoError> {
415 Ok(Self {
416 aggr_funcs: proto.aggr_funcs.into_rust()?,
417 skips: proto.skips.into_rust()?,
418 must_consolidate: proto.must_consolidate.into_rust()?,
419 })
420 }
421}
422
423/// Plan for computing a set of hierarchical aggregations
424/// with non-monotonic inputs.
425///
426/// To perform hierarchical aggregations with stable runtimes
427/// under updates we'll subdivide the group key into buckets, compute
428/// the reduction in each of those subdivided buckets and then combine
429/// the results into a coarser bucket (one that represents a larger
430/// fraction of the original input) and redo the reduction in another
431/// layer. Effectively, we'll construct a min / max heap out of a series
432/// of reduce operators (each one is a separate layer).
433#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
434pub struct BucketedPlan {
435 /// All of the aggregations we were asked to compute.
436 pub aggr_funcs: Vec<AggregateFunc>,
437 /// Set of "skips" or calls to `nth()` an iterator needs to do over
438 /// the input to extract the relevant datums.
439 pub skips: Vec<usize>,
440 /// The number of buckets in each layer of the reduction tree. Should
441 /// be decreasing, and ideally, a power of two so that we can easily
442 /// distribute values to buckets with `value.hashed() % buckets[layer]`.
443 pub buckets: Vec<u64>,
444}
445
446impl BucketedPlan {
447 /// Convert to a monotonic plan, indicate whether the operator must apply
448 /// consolidation to its input.
449 fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
450 MonotonicPlan {
451 aggr_funcs: self.aggr_funcs,
452 skips: self.skips,
453 must_consolidate,
454 }
455 }
456}
457
458impl RustType<ProtoBucketedPlan> for BucketedPlan {
459 fn into_proto(&self) -> ProtoBucketedPlan {
460 ProtoBucketedPlan {
461 aggr_funcs: self.aggr_funcs.into_proto(),
462 skips: self.skips.into_proto(),
463 buckets: self.buckets.clone(),
464 }
465 }
466
467 fn from_proto(proto: ProtoBucketedPlan) -> Result<Self, TryFromProtoError> {
468 Ok(Self {
469 aggr_funcs: proto.aggr_funcs.into_rust()?,
470 skips: proto.skips.into_rust()?,
471 buckets: proto.buckets,
472 })
473 }
474}
475
476/// Plan for computing a set of basic aggregations.
477///
478/// There's much less complexity when rendering basic aggregations.
479/// Each aggregation corresponds to one Differential reduce operator.
480/// That's it. However, we still want to present one final arrangement
481/// so basic aggregations present results with the same interface
482/// (one arrangement containing a row with all results) that accumulable
483/// and hierarchical aggregations do. To provide that, we render an
484/// additional reduce operator whenever we have multiple reduce aggregates
485/// to combine and present results in the appropriate order. If we
486/// were only asked to compute a single aggregation, we can skip
487/// that step and return the arrangement provided by computing the aggregation
488/// directly.
489#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
490pub enum BasicPlan {
491 /// Plan for rendering a single basic aggregation.
492 Single(SingleBasicPlan),
493 /// Plan for rendering multiple basic aggregations.
494 /// These need to then be collated together in an additional
495 /// reduction. Each element represents the:
496 /// `(index of the set of the input we are aggregating over,
497 /// the aggregation function)`
498 Multiple(Vec<(usize, AggregateExpr)>),
499}
500
501/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
502/// this aggregation.
503#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
504pub struct SingleBasicPlan {
505 /// The index in the set of inputs that we are aggregating over.
506 pub index: usize,
507 /// The aggregation that we should perform.
508 pub expr: AggregateExpr,
509 /// Whether we fused a `FlatMap UnnestList` with this aggregation.
510 pub fused_unnest_list: bool,
511}
512
513impl RustType<proto_basic_plan::ProtoSimpleSingleBasicPlan> for (usize, AggregateExpr) {
514 fn into_proto(&self) -> proto_basic_plan::ProtoSimpleSingleBasicPlan {
515 proto_basic_plan::ProtoSimpleSingleBasicPlan {
516 index: self.0.into_proto(),
517 expr: Some(self.1.into_proto()),
518 }
519 }
520
521 fn from_proto(
522 proto: proto_basic_plan::ProtoSimpleSingleBasicPlan,
523 ) -> Result<Self, TryFromProtoError> {
524 Ok((
525 proto.index.into_rust()?,
526 proto
527 .expr
528 .into_rust_if_some("ProtoSimpleSingleBasicPlan::expr")?,
529 ))
530 }
531}
532
533impl RustType<proto_basic_plan::ProtoSingleBasicPlan> for SingleBasicPlan {
534 fn into_proto(&self) -> proto_basic_plan::ProtoSingleBasicPlan {
535 proto_basic_plan::ProtoSingleBasicPlan {
536 index: self.index.into_proto(),
537 expr: Some(self.expr.into_proto()),
538 fused_unnest_list: self.fused_unnest_list.into_proto(),
539 }
540 }
541
542 fn from_proto(
543 proto: proto_basic_plan::ProtoSingleBasicPlan,
544 ) -> Result<Self, TryFromProtoError> {
545 Ok(SingleBasicPlan {
546 index: proto.index.into_rust()?,
547 expr: proto.expr.into_rust_if_some("ProtoSingleBasicPlan::expr")?,
548 fused_unnest_list: proto.fused_unnest_list.into_rust()?,
549 })
550 }
551}
552
553impl RustType<ProtoBasicPlan> for BasicPlan {
554 fn into_proto(&self) -> ProtoBasicPlan {
555 use proto_basic_plan::*;
556
557 ProtoBasicPlan {
558 kind: Some(match self {
559 BasicPlan::Single(plan) => Kind::Single(plan.into_proto()),
560 BasicPlan::Multiple(aggrs) => Kind::Multiple(ProtoMultipleBasicPlan {
561 aggrs: aggrs.into_proto(),
562 }),
563 }),
564 }
565 }
566
567 fn from_proto(proto: ProtoBasicPlan) -> Result<Self, TryFromProtoError> {
568 use proto_basic_plan::Kind;
569 let kind = proto
570 .kind
571 .ok_or_else(|| TryFromProtoError::missing_field("ProtoBasicPlan::kind"))?;
572
573 Ok(match kind {
574 Kind::Single(plan) => BasicPlan::Single(plan.into_rust()?),
575 Kind::Multiple(x) => BasicPlan::Multiple(x.aggrs.into_rust()?),
576 })
577 }
578}
579
580/// Plan for collating the results of computing multiple aggregation
581/// types.
582///
583/// TODO: could we express this as a delta join
584#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
585pub struct CollationPlan {
586 /// Accumulable aggregation results to collate, if any.
587 pub accumulable: Option<AccumulablePlan>,
588 /// Hierarchical aggregation results to collate, if any.
589 pub hierarchical: Option<HierarchicalPlan>,
590 /// Basic aggregation results to collate, if any.
591 pub basic: Option<BasicPlan>,
592 /// When we get results back from each of the different
593 /// aggregation types, they will be subsequences of
594 /// the sequence aggregations in the original reduce expression.
595 /// We keep a map from output position -> reduction type
596 /// to easily merge results back into the requested order.
597 pub aggregate_types: Vec<ReductionType>,
598}
599
600impl CollationPlan {
601 /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
602 /// and sets consolidation requirements.
603 pub fn as_monotonic(&mut self, must_consolidate: bool) {
604 self.hierarchical
605 .as_mut()
606 .map(|plan| plan.as_monotonic(must_consolidate));
607 }
608}
609
610impl RustType<ProtoCollationPlan> for CollationPlan {
611 fn into_proto(&self) -> ProtoCollationPlan {
612 ProtoCollationPlan {
613 accumulable: self.accumulable.into_proto(),
614 hierarchical: self.hierarchical.into_proto(),
615 basic: self.basic.into_proto(),
616 aggregate_types: self.aggregate_types.into_proto(),
617 }
618 }
619
620 fn from_proto(proto: ProtoCollationPlan) -> Result<Self, TryFromProtoError> {
621 Ok(Self {
622 accumulable: proto.accumulable.into_rust()?,
623 hierarchical: proto.hierarchical.into_rust()?,
624 basic: proto.basic.into_rust()?,
625 aggregate_types: proto.aggregate_types.into_rust()?,
626 })
627 }
628}
629
630impl ReducePlan {
631 /// Generate a plan for computing the supplied aggregations.
632 ///
633 /// The resulting plan summarizes what the dataflow to be created
634 /// and how the aggregations will be executed.
635 pub fn create_from(
636 aggregates: Vec<AggregateExpr>,
637 monotonic: bool,
638 expected_group_size: Option<u64>,
639 fused_unnest_list: bool,
640 ) -> Self {
641 // If we don't have any aggregations we are just computing a distinct.
642 if aggregates.is_empty() {
643 return ReducePlan::Distinct;
644 }
645
646 // Otherwise, we need to group aggregations according to their
647 // reduction type (accumulable, hierarchical, or basic)
648 let mut reduction_types = BTreeMap::new();
649 // We need to make sure that each list of aggregates by type forms
650 // a subsequence of the overall sequence of aggregates.
651 for index in 0..aggregates.len() {
652 let typ = reduction_type(&aggregates[index].func);
653 let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
654 aggregates_list.push((index, aggregates[index].clone()));
655 }
656
657 // Convert each grouped list of reductions into a plan.
658 let plan: Vec<_> = reduction_types
659 .into_iter()
660 .map(|(typ, aggregates_list)| {
661 ReducePlan::create_inner(
662 typ,
663 aggregates_list,
664 monotonic,
665 expected_group_size,
666 fused_unnest_list,
667 )
668 })
669 .collect();
670
671 // If we only have a single type of aggregation present we can
672 // render that directly
673 if plan.len() == 1 {
674 return plan[0].clone();
675 }
676
677 // Otherwise, we have to stitch reductions together.
678
679 // First, lets sanity check that we don't have an impossible number
680 // of reduction types.
681 assert!(plan.len() <= 3);
682
683 let mut collation: CollationPlan = Default::default();
684
685 // Construct a mapping from output_position -> reduction that we can
686 // use to reconstruct the output in the correct order.
687 let aggregate_types = aggregates
688 .iter()
689 .map(|a| reduction_type(&a.func))
690 .collect::<Vec<_>>();
691
692 collation.aggregate_types = aggregate_types;
693
694 for expr in plan.into_iter() {
695 match expr {
696 ReducePlan::Accumulable(e) => {
697 assert_none!(collation.accumulable);
698 collation.accumulable = Some(e);
699 }
700 ReducePlan::Hierarchical(e) => {
701 assert_none!(collation.hierarchical);
702 collation.hierarchical = Some(e);
703 }
704 ReducePlan::Basic(e) => {
705 assert_none!(collation.basic);
706 collation.basic = Some(e);
707 }
708 ReducePlan::Distinct | ReducePlan::Collation(_) => {
709 panic!("Inner reduce plan was unsupported type!")
710 }
711 }
712 }
713
714 ReducePlan::Collation(collation)
715 }
716
717 /// Generate a plan for computing the specified type of aggregations.
718 ///
719 /// This function assumes that all of the supplied aggregates are
720 /// actually of the correct reduction type, and are a subsequence
721 /// of the total list of requested aggregations.
722 fn create_inner(
723 typ: ReductionType,
724 aggregates_list: Vec<(usize, AggregateExpr)>,
725 monotonic: bool,
726 expected_group_size: Option<u64>,
727 fused_unnest_list: bool,
728 ) -> Self {
729 if fused_unnest_list {
730 assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
731 }
732 assert!(
733 aggregates_list.len() > 0,
734 "error: tried to render a reduce dataflow with no aggregates"
735 );
736 match typ {
737 ReductionType::Accumulable => {
738 let mut simple_aggrs = vec![];
739 let mut distinct_aggrs = vec![];
740 let full_aggrs: Vec<_> = aggregates_list
741 .iter()
742 .cloned()
743 .map(|(_, aggr)| aggr)
744 .collect();
745 for (accumulable_index, (datum_index, aggr)) in
746 aggregates_list.into_iter().enumerate()
747 {
748 // Accumulable aggregations need to do extra per-aggregate work
749 // for aggregations with the distinct bit set, so we'll separate
750 // those out now.
751 if aggr.distinct {
752 distinct_aggrs.push((accumulable_index, datum_index, aggr));
753 } else {
754 simple_aggrs.push((accumulable_index, datum_index, aggr));
755 };
756 }
757 ReducePlan::Accumulable(AccumulablePlan {
758 full_aggrs,
759 simple_aggrs,
760 distinct_aggrs,
761 })
762 }
763 ReductionType::Hierarchical => {
764 let aggr_funcs: Vec<_> = aggregates_list
765 .iter()
766 .cloned()
767 .map(|(_, aggr)| aggr.func)
768 .collect();
769 let indexes: Vec<_> = aggregates_list
770 .into_iter()
771 .map(|(index, _)| index)
772 .collect();
773
774 // We don't have random access over Rows so we can simplify the
775 // task of grabbing the inputs we are aggregating over by
776 // generating a list of "skips" an iterator over the Row needs
777 // to do to get the desired indexes.
778 let skips = convert_indexes_to_skips(indexes);
779 if monotonic {
780 let monotonic = MonotonicPlan {
781 aggr_funcs,
782 skips,
783 must_consolidate: false,
784 };
785 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
786 } else {
787 let buckets = bucketing_of_expected_group_size(expected_group_size);
788 let bucketed = BucketedPlan {
789 aggr_funcs,
790 skips,
791 buckets,
792 };
793
794 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
795 }
796 }
797 ReductionType::Basic => {
798 if aggregates_list.len() == 1 {
799 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
800 index: aggregates_list[0].0,
801 expr: aggregates_list[0].1.clone(),
802 fused_unnest_list,
803 }))
804 } else {
805 ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
806 }
807 }
808 }
809 }
810
811 /// Reports all keys of produced arrangements.
812 ///
813 /// This is likely either an empty vector, for no arrangement,
814 /// or a singleton vector containing the list of expressions
815 /// that key a single arrangement.
816 pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
817 let key = (0..key_arity)
818 .map(MirScalarExpr::column)
819 .collect::<Vec<_>>();
820 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
821 AvailableCollections::new_arranged(vec![(key, permutation, thinning)])
822 }
823
824 /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
825 /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
826 /// one sitting on top of the reduction.
827 ///
828 /// Non-fusable parts include temporal predicates or any other parts that cannot be
829 /// conservatively asserted to not increase the memory requirements of the output
830 /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
831 /// being the identity MFP.
832 pub fn extract_mfp_after(
833 &self,
834 mut mfp: MapFilterProject,
835 key_arity: usize,
836 ) -> (MapFilterProject, MapFilterProject, usize) {
837 // Extract temporal predicates, as we cannot push them into `Reduce`.
838 let temporal_mfp = mfp.extract_temporal();
839 let non_temporal = mfp;
840 mfp = temporal_mfp;
841
842 // We ensure we do not attempt to project away the key, as we cannot accomplish
843 // this. This is done by a simple analysis of the non-temporal part of `mfp` to
844 // check if can be directly absorbed; if it can't, we then default to a general
845 // strategy that unpacks the MFP to absorb only the filter and supporting map
846 // parts, followed by a post-MFP step.
847 let input_arity = non_temporal.input_arity;
848 let key = Vec::from_iter(0..key_arity);
849 let mut mfp_push;
850 let output_arity;
851
852 if non_temporal.projection.len() <= input_arity
853 && non_temporal.projection.iter().all(|c| *c < input_arity)
854 && non_temporal.projection.starts_with(&key)
855 {
856 // Special case: The key is preserved as a prefix and the projection is only
857 // of output fields from the reduction. So we know that: (a) We can process the
858 // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
859 // and (c) The output projection is at most as wide as the output that would be
860 // produced by the reduction, so we are sure to never regress the memory
861 // requirements of the output arrangement.
862 // Note that this strategy may change the arity of the output arrangement.
863 output_arity = non_temporal.projection.len();
864 mfp_push = non_temporal;
865 } else {
866 // General strategy: Unpack MFP as MF followed by P' that removes all M
867 // columns, then MP afterwards.
868 // Note that this strategy does not result in any changes to the arity of
869 // the output arrangement.
870 let (m, f, p) = non_temporal.into_map_filter_project();
871 mfp_push = MapFilterProject::new(input_arity)
872 .map(m.clone())
873 .filter(f)
874 .project(0..input_arity);
875 output_arity = input_arity;
876
877 // We still need to perform the map and projection for the actual output.
878 let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
879
880 // Compose the non-pushed MFP components.
881 mfp = MapFilterProject::compose(mfp_left, mfp);
882 }
883 mfp_push.optimize();
884 mfp.optimize();
885 (mfp_push, mfp, output_arity)
886 }
887}
888
889/// Plan for extracting keys and values in preparation for a reduction.
890#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
891pub struct KeyValPlan {
892 /// Extracts the columns used as the key.
893 pub key_plan: mz_expr::SafeMfpPlan,
894 /// Extracts the columns used to feed the aggregations.
895 pub val_plan: mz_expr::SafeMfpPlan,
896}
897
898impl RustType<ProtoKeyValPlan> for KeyValPlan {
899 fn into_proto(&self) -> ProtoKeyValPlan {
900 ProtoKeyValPlan {
901 key_plan: Some(self.key_plan.into_proto()),
902 val_plan: Some(self.val_plan.into_proto()),
903 }
904 }
905
906 fn from_proto(proto: ProtoKeyValPlan) -> Result<Self, TryFromProtoError> {
907 Ok(Self {
908 key_plan: proto
909 .key_plan
910 .into_rust_if_some("ProtoKeyValPlan::key_plan")?,
911 val_plan: proto
912 .val_plan
913 .into_rust_if_some("ProtoKeyValPlan::val_plan")?,
914 })
915 }
916}
917
918impl KeyValPlan {
919 /// Create a new [KeyValPlan] from aggregation arguments.
920 pub fn new(
921 input_arity: usize,
922 group_key: &[MirScalarExpr],
923 aggregates: &[AggregateExpr],
924 input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
925 ) -> Self {
926 // Form an operator for evaluating key expressions.
927 let mut key_mfp = MapFilterProject::new(input_arity)
928 .map(group_key.iter().cloned())
929 .project(input_arity..(input_arity + group_key.len()));
930 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
931 key_mfp.permute_fn(|c| input_permutation[c], new_arity);
932 }
933
934 // Form an operator for evaluating value expressions.
935 let mut val_mfp = MapFilterProject::new(input_arity)
936 .map(aggregates.iter().map(|a| a.expr.clone()))
937 .project(input_arity..(input_arity + aggregates.len()));
938 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
939 val_mfp.permute_fn(|c| input_permutation[c], new_arity);
940 }
941
942 key_mfp.optimize();
943 let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
944 val_mfp.optimize();
945 let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
946
947 Self { key_plan, val_plan }
948 }
949
950 /// The arity of the key plan
951 pub fn key_arity(&self) -> usize {
952 self.key_plan.projection.len()
953 }
954}
955
956/// Transforms a vector containing indexes of needed columns into one containing
957/// the "skips" an iterator over a Row would need to perform to see those values.
958///
959/// This function requires that all of the elements in `indexes` are strictly
960/// increasing.
961///
962/// # Examples
963///
964/// ```
965/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
966/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
967/// ```
968pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
969 for i in 1..indexes.len() {
970 soft_assert_or_log!(
971 indexes[i - 1] < indexes[i],
972 "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
973 indexes,
974 );
975 }
976
977 for i in (1..indexes.len()).rev() {
978 indexes[i] -= indexes[i - 1];
979 indexes[i] -= 1;
980 }
981
982 indexes
983}
984
985/// Determines whether a function can be accumulated in an update's "difference" field,
986/// and whether it can be subjected to recursive (hierarchical) aggregation.
987///
988/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
989/// which can be accumulated in-place using the addition operation on the type. Aggregations
990/// that indicate they are accumulable will still need to provide an action that takes their
991/// data and introduces it as a difference, and the post-processing when the accumulated value
992/// is presented as data.
993///
994/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
995/// increasingly large subsets of each key. This has the intended property that no invocation
996/// is on a significantly large set of values (and so, no incremental update needs to reform
997/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
998/// input stream is append-only as then we only need to retain the "currently winning" value.
999/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
1000pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
1001 match func {
1002 AggregateFunc::SumInt16
1003 | AggregateFunc::SumInt32
1004 | AggregateFunc::SumInt64
1005 | AggregateFunc::SumUInt16
1006 | AggregateFunc::SumUInt32
1007 | AggregateFunc::SumUInt64
1008 | AggregateFunc::SumFloat32
1009 | AggregateFunc::SumFloat64
1010 | AggregateFunc::SumNumeric
1011 | AggregateFunc::Count
1012 | AggregateFunc::Any
1013 | AggregateFunc::All
1014 | AggregateFunc::Dummy => ReductionType::Accumulable,
1015 AggregateFunc::MaxNumeric
1016 | AggregateFunc::MaxInt16
1017 | AggregateFunc::MaxInt32
1018 | AggregateFunc::MaxInt64
1019 | AggregateFunc::MaxUInt16
1020 | AggregateFunc::MaxUInt32
1021 | AggregateFunc::MaxUInt64
1022 | AggregateFunc::MaxMzTimestamp
1023 | AggregateFunc::MaxFloat32
1024 | AggregateFunc::MaxFloat64
1025 | AggregateFunc::MaxBool
1026 | AggregateFunc::MaxString
1027 | AggregateFunc::MaxDate
1028 | AggregateFunc::MaxTimestamp
1029 | AggregateFunc::MaxTimestampTz
1030 | AggregateFunc::MaxInterval
1031 | AggregateFunc::MaxTime
1032 | AggregateFunc::MinNumeric
1033 | AggregateFunc::MinInt16
1034 | AggregateFunc::MinInt32
1035 | AggregateFunc::MinInt64
1036 | AggregateFunc::MinUInt16
1037 | AggregateFunc::MinUInt32
1038 | AggregateFunc::MinUInt64
1039 | AggregateFunc::MinMzTimestamp
1040 | AggregateFunc::MinInterval
1041 | AggregateFunc::MinFloat32
1042 | AggregateFunc::MinFloat64
1043 | AggregateFunc::MinBool
1044 | AggregateFunc::MinString
1045 | AggregateFunc::MinDate
1046 | AggregateFunc::MinTimestamp
1047 | AggregateFunc::MinTimestampTz
1048 | AggregateFunc::MinTime => ReductionType::Hierarchical,
1049 AggregateFunc::JsonbAgg { .. }
1050 | AggregateFunc::JsonbObjectAgg { .. }
1051 | AggregateFunc::MapAgg { .. }
1052 | AggregateFunc::ArrayConcat { .. }
1053 | AggregateFunc::ListConcat { .. }
1054 | AggregateFunc::StringAgg { .. }
1055 | AggregateFunc::RowNumber { .. }
1056 | AggregateFunc::Rank { .. }
1057 | AggregateFunc::DenseRank { .. }
1058 | AggregateFunc::LagLead { .. }
1059 | AggregateFunc::FirstValue { .. }
1060 | AggregateFunc::LastValue { .. }
1061 | AggregateFunc::WindowAggregate { .. }
1062 | AggregateFunc::FusedValueWindowFunc { .. }
1063 | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
1064 }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use mz_ore::assert_ok;
1070 use mz_proto::protobuf_roundtrip;
1071 use proptest::prelude::*;
1072
1073 use super::*;
1074
1075 // This test causes stack overflows if not run with --release,
1076 // ignore by default.
1077 proptest! {
1078 #[mz_ore::test]
1079 #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1080 fn reduce_plan_protobuf_roundtrip(expect in any::<ReducePlan>() ) {
1081 let actual = protobuf_roundtrip::<_, ProtoReducePlan>(&expect);
1082 assert_ok!(actual);
1083 assert_eq!(actual.unwrap(), expect);
1084 }
1085 }
1086}