mz_compute_types/plan/reduce.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction execution planning and dataflow construction.
11
12//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
13//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
14//! all of the decision making about what kind of dataflow do we need to render and what each
15//! operator needs to do, and then actually rendering the plan can be a relatively simple application
16//! of (as much as possible) straight line code.
17//!
18//! Materialize needs to be able to maintain reductions incrementally (roughly, using
19//! time proportional to the number of changes in the input) and ideally, with a
20//! memory footprint proportional to the number of reductions being computed. We have to employ
21//! several tricks to achieve that, and these tricks constitute most of the complexity involved
22//! with planning and rendering reduce expressions. There's some additional complexity involved
23//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
24//! duplicate updates.
25//!
26//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
27//! into three distinct types. Each type gets rendered separately, with its own specialized plan
28//! and dataflow. The three types are as follows:
29//!
30//! 1. Accumulable:
31//! Accumulable reductions can be computed inline in a Differential update's `difference`
32//! field because they basically boil down to tracking counts of things. `sum()` is an
33//! example of an accumulable reduction, and when some element `x` is removed from the set
34//! of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
35//! formally, accumulable reductions correspond to instances of commutative Abelian groups.
36//! 2. Hierarchical:
37//! Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
38//! they are still commutative and associative, which lets us compute the reduction over subsets
39//! of the input, and then compute the reduction again on those results. For example:
40//! `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
41//! reductions this way, we can maintain the computation in sublinear time with respect to
42//! the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
43//! hierarchical reductions correspond to instances of semigroups, in that they are associative,
44//! but in order to benefit from being computed hierarchically, they need to have some reduction
45//! in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
46//! hierarchical evaluation.
47//!
48//! When the input is append-only, or monotonic, reductions that would otherwise have to be computed
49//! hierarchically can instead be computed in-place, because we only need to keep the value that's
50//! better than the "best" (minimal or maximal for min and max) seen so far.
51//! 3. Basic:
52//! Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
53//! hierarchical (most likely they are associative but don't involve any data reduction) and so for these
54//! we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
55//!
56//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
57//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
58//! requested aggregations by type, render each type separately and then take those results and collate them
59//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
60//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
61//! return the output arrangement directly and avoid the extra collation arrangement.
62
63use std::collections::BTreeMap;
64
65use mz_expr::{
66 AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
67};
68use mz_ore::{assert_none, soft_assert_or_log};
69use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
70use proptest::prelude::{Arbitrary, BoxedStrategy, any};
71use proptest::strategy::Strategy;
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
76
77include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.reduce.rs"));
78
79/// This enum represents the three potential types of aggregations.
80#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
81pub enum ReductionType {
82 /// Accumulable functions can be subtracted from (are invertible), and associative.
83 /// We can compute these results by moving some data to the diff field under arbitrary
84 /// changes to inputs. Examples include sum or count.
85 Accumulable,
86 /// Hierarchical functions are associative, which means we can split up the work of
87 /// computing them across subsets. Note that hierarchical reductions should also
88 /// reduce the data in some way, as otherwise rendering them hierarchically is not
89 /// worth it. Examples include min or max.
90 Hierarchical,
91 /// Basic, for lack of a better word, are functions that are neither accumulable
92 /// nor hierarchical. Examples include jsonb_agg.
93 Basic,
94}
95
96impl columnation::Columnation for ReductionType {
97 type InnerRegion = columnation::CopyRegion<ReductionType>;
98}
99
100impl RustType<ProtoReductionType> for ReductionType {
101 fn into_proto(&self) -> ProtoReductionType {
102 use proto_reduction_type::Kind;
103 ProtoReductionType {
104 kind: Some(match self {
105 ReductionType::Accumulable => Kind::Accumulable(()),
106 ReductionType::Hierarchical => Kind::Hierarchical(()),
107 ReductionType::Basic => Kind::Basic(()),
108 }),
109 }
110 }
111
112 fn from_proto(proto: ProtoReductionType) -> Result<Self, TryFromProtoError> {
113 use proto_reduction_type::Kind;
114 let kind = proto
115 .kind
116 .ok_or_else(|| TryFromProtoError::missing_field("kind"))?;
117 Ok(match kind {
118 Kind::Accumulable(()) => ReductionType::Accumulable,
119 Kind::Hierarchical(()) => ReductionType::Hierarchical,
120 Kind::Basic(()) => ReductionType::Basic,
121 })
122 }
123}
124
125impl TryFrom<&ReducePlan> for ReductionType {
126 type Error = ();
127
128 fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
129 match plan {
130 ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
131 ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
132 ReducePlan::Basic(_) => Ok(ReductionType::Basic),
133 _ => Err(()),
134 }
135 }
136}
137
138/// A `ReducePlan` provides a concise description for how we will
139/// execute a given reduce expression.
140///
141/// The provided reduce expression can have no
142/// aggregations, in which case its just a `Distinct` and otherwise
143/// it's composed of a combination of accumulable, hierarchical and
144/// basic aggregations.
145///
146/// We want to try to centralize as much decision making about the
147/// shape / general computation of the rendered dataflow graph
148/// in this plan, and then make actually rendering the graph
149/// be as simple (and compiler verifiable) as possible.
150#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
151pub enum ReducePlan {
152 /// Plan for not computing any aggregations, just determining the set of
153 /// distinct keys.
154 Distinct,
155 /// Plan for computing only accumulable aggregations.
156 Accumulable(AccumulablePlan),
157 /// Plan for computing only hierarchical aggregations.
158 Hierarchical(HierarchicalPlan),
159 /// Plan for computing only basic aggregations.
160 Basic(BasicPlan),
161 /// Plan for computing a mix of different kinds of aggregations.
162 /// We need to do extra work here to reassemble results back in the
163 /// requested order.
164 Collation(CollationPlan),
165}
166
167proptest::prop_compose! {
168 /// `expected_group_size` is a u64, but instead of a uniform distribution,
169 /// we want a logarithmic distribution so that we have an even distribution
170 /// in the number of layers of buckets that a hierarchical plan would have.
171 fn any_group_size()
172 (bits in 0..u64::BITS)
173 (integer in (((1_u64) << bits) - 1)
174 ..(if bits == (u64::BITS - 1){ u64::MAX }
175 else { (1_u64) << (bits + 1) - 1 }))
176 -> u64 {
177 integer
178 }
179}
180
181/// To avoid stack overflow, this limits the arbitrarily-generated test
182/// `ReducePlan`s to involve at most 8 aggregations.
183///
184/// To have better coverage of realistic expected group sizes, the
185/// `expected group size` has a logarithmic distribution.
186impl Arbitrary for ReducePlan {
187 type Parameters = ();
188
189 type Strategy = BoxedStrategy<Self>;
190
191 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
192 (
193 proptest::collection::vec(any::<AggregateExpr>(), 0..8),
194 any::<bool>(),
195 any::<bool>(),
196 any_group_size(),
197 any::<bool>(),
198 )
199 .prop_map(
200 |(
201 exprs,
202 monotonic,
203 any_expected_size,
204 expected_group_size,
205 mut fused_unnest_list,
206 )| {
207 let expected_group_size = if any_expected_size {
208 Some(expected_group_size)
209 } else {
210 None
211 };
212 if !(exprs.len() == 1
213 && matches!(reduction_type(&exprs[0].func), ReductionType::Basic))
214 {
215 fused_unnest_list = false;
216 }
217 ReducePlan::create_from(
218 exprs,
219 monotonic,
220 expected_group_size,
221 fused_unnest_list,
222 )
223 },
224 )
225 .boxed()
226 }
227}
228
229impl RustType<ProtoReducePlan> for ReducePlan {
230 fn into_proto(&self) -> ProtoReducePlan {
231 use proto_reduce_plan::Kind::*;
232 ProtoReducePlan {
233 kind: Some(match self {
234 ReducePlan::Distinct => Distinct(()),
235 ReducePlan::Accumulable(plan) => Accumulable(plan.into_proto()),
236 ReducePlan::Hierarchical(plan) => Hierarchical(plan.into_proto()),
237 ReducePlan::Basic(plan) => Basic(plan.into_proto()),
238 ReducePlan::Collation(plan) => Collation(plan.into_proto()),
239 }),
240 }
241 }
242
243 fn from_proto(proto: ProtoReducePlan) -> Result<Self, TryFromProtoError> {
244 use proto_reduce_plan::Kind::*;
245 let kind = proto
246 .kind
247 .ok_or_else(|| TryFromProtoError::missing_field("ProtoReducePlan::kind"))?;
248 Ok(match kind {
249 Distinct(()) => ReducePlan::Distinct,
250 Accumulable(plan) => ReducePlan::Accumulable(plan.into_rust()?),
251 Hierarchical(plan) => ReducePlan::Hierarchical(plan.into_rust()?),
252 Basic(plan) => ReducePlan::Basic(plan.into_rust()?),
253 Collation(plan) => ReducePlan::Collation(plan.into_rust()?),
254 })
255 }
256}
257
258/// Plan for computing a set of accumulable aggregations.
259///
260/// We fuse all of the accumulable aggregations together
261/// and compute them with one dataflow fragment. We need to
262/// be careful to separate out the aggregations that
263/// apply only to the distinct set of values. We need
264/// to apply a distinct operator to those before we
265/// combine them with everything else.
266#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
267pub struct AccumulablePlan {
268 /// All of the aggregations we were asked to compute, stored
269 /// in order.
270 pub full_aggrs: Vec<AggregateExpr>,
271 /// All of the non-distinct accumulable aggregates.
272 /// Each element represents:
273 /// (index of the aggregation among accumulable aggregations,
274 /// index of the datum among inputs, aggregation expr)
275 /// These will all be rendered together in one dataflow fragment.
276 pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
277 /// Same as above but for all of the `DISTINCT` accumulable aggregations.
278 pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
279}
280
281impl RustType<proto_accumulable_plan::ProtoAggr> for (usize, usize, AggregateExpr) {
282 fn into_proto(&self) -> proto_accumulable_plan::ProtoAggr {
283 proto_accumulable_plan::ProtoAggr {
284 index_agg: self.0.into_proto(),
285 index_inp: self.1.into_proto(),
286 expr: Some(self.2.into_proto()),
287 }
288 }
289
290 fn from_proto(proto: proto_accumulable_plan::ProtoAggr) -> Result<Self, TryFromProtoError> {
291 Ok((
292 proto.index_agg.into_rust()?,
293 proto.index_inp.into_rust()?,
294 proto.expr.into_rust_if_some("ProtoAggr::expr")?,
295 ))
296 }
297}
298
299impl RustType<ProtoAccumulablePlan> for AccumulablePlan {
300 fn into_proto(&self) -> ProtoAccumulablePlan {
301 ProtoAccumulablePlan {
302 full_aggrs: self.full_aggrs.into_proto(),
303 simple_aggrs: self.simple_aggrs.into_proto(),
304 distinct_aggrs: self.distinct_aggrs.into_proto(),
305 }
306 }
307
308 fn from_proto(proto: ProtoAccumulablePlan) -> Result<Self, TryFromProtoError> {
309 Ok(Self {
310 full_aggrs: proto.full_aggrs.into_rust()?,
311 simple_aggrs: proto.simple_aggrs.into_rust()?,
312 distinct_aggrs: proto.distinct_aggrs.into_rust()?,
313 })
314 }
315}
316
317/// Plan for computing a set of hierarchical aggregations.
318///
319/// In the append-only setting we can render them in-place
320/// with monotonic plans, but otherwise, we need to render
321/// them with a reduction tree that splits the inputs into
322/// small, and then progressively larger, buckets
323#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
324pub enum HierarchicalPlan {
325 /// Plan hierarchical aggregations under monotonic inputs.
326 Monotonic(MonotonicPlan),
327 /// Plan for hierarchical aggregations under non-monotonic inputs.
328 Bucketed(BucketedPlan),
329}
330
331impl HierarchicalPlan {
332 /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
333 /// and sets consolidation requirements.
334 pub fn as_monotonic(&mut self, must_consolidate: bool) {
335 match self {
336 HierarchicalPlan::Bucketed(bucketed) => {
337 // TODO: ideally we would not have the `clone()` but ownership
338 // seems fraught here as we are behind a `&mut self` reference.
339 *self =
340 HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
341 }
342 HierarchicalPlan::Monotonic(monotonic) => {
343 monotonic.must_consolidate = must_consolidate;
344 }
345 }
346 }
347}
348
349impl RustType<ProtoHierarchicalPlan> for HierarchicalPlan {
350 fn into_proto(&self) -> ProtoHierarchicalPlan {
351 use proto_hierarchical_plan::Kind;
352 ProtoHierarchicalPlan {
353 kind: Some(match self {
354 HierarchicalPlan::Monotonic(plan) => Kind::Monotonic(plan.into_proto()),
355 HierarchicalPlan::Bucketed(plan) => Kind::Bucketed(plan.into_proto()),
356 }),
357 }
358 }
359
360 fn from_proto(proto: ProtoHierarchicalPlan) -> Result<Self, TryFromProtoError> {
361 use proto_hierarchical_plan::Kind;
362 let kind = proto
363 .kind
364 .ok_or_else(|| TryFromProtoError::missing_field("ProtoHierarchicalPlan::Kind"))?;
365 Ok(match kind {
366 Kind::Monotonic(plan) => HierarchicalPlan::Monotonic(plan.into_rust()?),
367 Kind::Bucketed(plan) => HierarchicalPlan::Bucketed(plan.into_rust()?),
368 })
369 }
370}
371
372/// Plan for computing a set of hierarchical aggregations with a
373/// monotonic input.
374///
375/// Here, the aggregations will be rendered in place. We don't
376/// need to worry about retractions because the inputs are
377/// append only, so we can change our computation to
378/// only retain the "best" value in the diff field, instead
379/// of holding onto all values.
380#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
381pub struct MonotonicPlan {
382 /// All of the aggregations we were asked to compute.
383 pub aggr_funcs: Vec<AggregateFunc>,
384 /// Set of "skips" or calls to `nth()` an iterator needs to do over
385 /// the input to extract the relevant datums.
386 pub skips: Vec<usize>,
387 /// True if the input is not physically monotonic, and the operator must perform
388 /// consolidation to remove potential negations. The operator implementation is
389 /// free to consolidate as late as possible while ensuring correctness, so it is
390 /// not a requirement that the input be directly subjected to consolidation.
391 /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
392 ///
393 /// [^1] <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
394 pub must_consolidate: bool,
395}
396
397impl RustType<ProtoMonotonicPlan> for MonotonicPlan {
398 fn into_proto(&self) -> ProtoMonotonicPlan {
399 ProtoMonotonicPlan {
400 aggr_funcs: self.aggr_funcs.into_proto(),
401 skips: self.skips.into_proto(),
402 must_consolidate: self.must_consolidate.into_proto(),
403 }
404 }
405
406 fn from_proto(proto: ProtoMonotonicPlan) -> Result<Self, TryFromProtoError> {
407 Ok(Self {
408 aggr_funcs: proto.aggr_funcs.into_rust()?,
409 skips: proto.skips.into_rust()?,
410 must_consolidate: proto.must_consolidate.into_rust()?,
411 })
412 }
413}
414
415/// Plan for computing a set of hierarchical aggregations
416/// with non-monotonic inputs.
417///
418/// To perform hierarchical aggregations with stable runtimes
419/// under updates we'll subdivide the group key into buckets, compute
420/// the reduction in each of those subdivided buckets and then combine
421/// the results into a coarser bucket (one that represents a larger
422/// fraction of the original input) and redo the reduction in another
423/// layer. Effectively, we'll construct a min / max heap out of a series
424/// of reduce operators (each one is a separate layer).
425#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
426pub struct BucketedPlan {
427 /// All of the aggregations we were asked to compute.
428 pub aggr_funcs: Vec<AggregateFunc>,
429 /// Set of "skips" or calls to `nth()` an iterator needs to do over
430 /// the input to extract the relevant datums.
431 pub skips: Vec<usize>,
432 /// The number of buckets in each layer of the reduction tree. Should
433 /// be decreasing, and ideally, a power of two so that we can easily
434 /// distribute values to buckets with `value.hashed() % buckets[layer]`.
435 pub buckets: Vec<u64>,
436}
437
438impl BucketedPlan {
439 /// Convert to a monotonic plan, indicate whether the operator must apply
440 /// consolidation to its input.
441 fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
442 MonotonicPlan {
443 aggr_funcs: self.aggr_funcs,
444 skips: self.skips,
445 must_consolidate,
446 }
447 }
448}
449
450impl RustType<ProtoBucketedPlan> for BucketedPlan {
451 fn into_proto(&self) -> ProtoBucketedPlan {
452 ProtoBucketedPlan {
453 aggr_funcs: self.aggr_funcs.into_proto(),
454 skips: self.skips.into_proto(),
455 buckets: self.buckets.clone(),
456 }
457 }
458
459 fn from_proto(proto: ProtoBucketedPlan) -> Result<Self, TryFromProtoError> {
460 Ok(Self {
461 aggr_funcs: proto.aggr_funcs.into_rust()?,
462 skips: proto.skips.into_rust()?,
463 buckets: proto.buckets,
464 })
465 }
466}
467
468/// Plan for computing a set of basic aggregations.
469///
470/// There's much less complexity when rendering basic aggregations.
471/// Each aggregation corresponds to one Differential reduce operator.
472/// That's it. However, we still want to present one final arrangement
473/// so basic aggregations present results with the same interface
474/// (one arrangement containing a row with all results) that accumulable
475/// and hierarchical aggregations do. To provide that, we render an
476/// additional reduce operator whenever we have multiple reduce aggregates
477/// to combine and present results in the appropriate order. If we
478/// were only asked to compute a single aggregation, we can skip
479/// that step and return the arrangement provided by computing the aggregation
480/// directly.
481#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
482pub enum BasicPlan {
483 /// Plan for rendering a single basic aggregation.
484 Single(SingleBasicPlan),
485 /// Plan for rendering multiple basic aggregations.
486 /// These need to then be collated together in an additional
487 /// reduction. Each element represents the:
488 /// `(index of the set of the input we are aggregating over,
489 /// the aggregation function)`
490 Multiple(Vec<(usize, AggregateExpr)>),
491}
492
493/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
494/// this aggregation.
495#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
496pub struct SingleBasicPlan {
497 /// The index in the set of inputs that we are aggregating over.
498 pub index: usize,
499 /// The aggregation that we should perform.
500 pub expr: AggregateExpr,
501 /// Whether we fused a `FlatMap UnnestList` with this aggregation.
502 pub fused_unnest_list: bool,
503}
504
505impl RustType<proto_basic_plan::ProtoSimpleSingleBasicPlan> for (usize, AggregateExpr) {
506 fn into_proto(&self) -> proto_basic_plan::ProtoSimpleSingleBasicPlan {
507 proto_basic_plan::ProtoSimpleSingleBasicPlan {
508 index: self.0.into_proto(),
509 expr: Some(self.1.into_proto()),
510 }
511 }
512
513 fn from_proto(
514 proto: proto_basic_plan::ProtoSimpleSingleBasicPlan,
515 ) -> Result<Self, TryFromProtoError> {
516 Ok((
517 proto.index.into_rust()?,
518 proto
519 .expr
520 .into_rust_if_some("ProtoSimpleSingleBasicPlan::expr")?,
521 ))
522 }
523}
524
525impl RustType<proto_basic_plan::ProtoSingleBasicPlan> for SingleBasicPlan {
526 fn into_proto(&self) -> proto_basic_plan::ProtoSingleBasicPlan {
527 proto_basic_plan::ProtoSingleBasicPlan {
528 index: self.index.into_proto(),
529 expr: Some(self.expr.into_proto()),
530 fused_unnest_list: self.fused_unnest_list.into_proto(),
531 }
532 }
533
534 fn from_proto(
535 proto: proto_basic_plan::ProtoSingleBasicPlan,
536 ) -> Result<Self, TryFromProtoError> {
537 Ok(SingleBasicPlan {
538 index: proto.index.into_rust()?,
539 expr: proto.expr.into_rust_if_some("ProtoSingleBasicPlan::expr")?,
540 fused_unnest_list: proto.fused_unnest_list.into_rust()?,
541 })
542 }
543}
544
545impl RustType<ProtoBasicPlan> for BasicPlan {
546 fn into_proto(&self) -> ProtoBasicPlan {
547 use proto_basic_plan::*;
548
549 ProtoBasicPlan {
550 kind: Some(match self {
551 BasicPlan::Single(plan) => Kind::Single(plan.into_proto()),
552 BasicPlan::Multiple(aggrs) => Kind::Multiple(ProtoMultipleBasicPlan {
553 aggrs: aggrs.into_proto(),
554 }),
555 }),
556 }
557 }
558
559 fn from_proto(proto: ProtoBasicPlan) -> Result<Self, TryFromProtoError> {
560 use proto_basic_plan::Kind;
561 let kind = proto
562 .kind
563 .ok_or_else(|| TryFromProtoError::missing_field("ProtoBasicPlan::kind"))?;
564
565 Ok(match kind {
566 Kind::Single(plan) => BasicPlan::Single(plan.into_rust()?),
567 Kind::Multiple(x) => BasicPlan::Multiple(x.aggrs.into_rust()?),
568 })
569 }
570}
571
572/// Plan for collating the results of computing multiple aggregation
573/// types.
574///
575/// TODO: could we express this as a delta join
576#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
577pub struct CollationPlan {
578 /// Accumulable aggregation results to collate, if any.
579 pub accumulable: Option<AccumulablePlan>,
580 /// Hierarchical aggregation results to collate, if any.
581 pub hierarchical: Option<HierarchicalPlan>,
582 /// Basic aggregation results to collate, if any.
583 pub basic: Option<BasicPlan>,
584 /// When we get results back from each of the different
585 /// aggregation types, they will be subsequences of
586 /// the sequence aggregations in the original reduce expression.
587 /// We keep a map from output position -> reduction type
588 /// to easily merge results back into the requested order.
589 pub aggregate_types: Vec<ReductionType>,
590}
591
592impl CollationPlan {
593 /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
594 /// and sets consolidation requirements.
595 pub fn as_monotonic(&mut self, must_consolidate: bool) {
596 self.hierarchical
597 .as_mut()
598 .map(|plan| plan.as_monotonic(must_consolidate));
599 }
600}
601
602impl RustType<ProtoCollationPlan> for CollationPlan {
603 fn into_proto(&self) -> ProtoCollationPlan {
604 ProtoCollationPlan {
605 accumulable: self.accumulable.into_proto(),
606 hierarchical: self.hierarchical.into_proto(),
607 basic: self.basic.into_proto(),
608 aggregate_types: self.aggregate_types.into_proto(),
609 }
610 }
611
612 fn from_proto(proto: ProtoCollationPlan) -> Result<Self, TryFromProtoError> {
613 Ok(Self {
614 accumulable: proto.accumulable.into_rust()?,
615 hierarchical: proto.hierarchical.into_rust()?,
616 basic: proto.basic.into_rust()?,
617 aggregate_types: proto.aggregate_types.into_rust()?,
618 })
619 }
620}
621
622impl ReducePlan {
623 /// Generate a plan for computing the supplied aggregations.
624 ///
625 /// The resulting plan summarizes what the dataflow to be created
626 /// and how the aggregations will be executed.
627 pub fn create_from(
628 aggregates: Vec<AggregateExpr>,
629 monotonic: bool,
630 expected_group_size: Option<u64>,
631 fused_unnest_list: bool,
632 ) -> Self {
633 // If we don't have any aggregations we are just computing a distinct.
634 if aggregates.is_empty() {
635 return ReducePlan::Distinct;
636 }
637
638 // Otherwise, we need to group aggregations according to their
639 // reduction type (accumulable, hierarchical, or basic)
640 let mut reduction_types = BTreeMap::new();
641 // We need to make sure that each list of aggregates by type forms
642 // a subsequence of the overall sequence of aggregates.
643 for index in 0..aggregates.len() {
644 let typ = reduction_type(&aggregates[index].func);
645 let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
646 aggregates_list.push((index, aggregates[index].clone()));
647 }
648
649 // Convert each grouped list of reductions into a plan.
650 let plan: Vec<_> = reduction_types
651 .into_iter()
652 .map(|(typ, aggregates_list)| {
653 ReducePlan::create_inner(
654 typ,
655 aggregates_list,
656 monotonic,
657 expected_group_size,
658 fused_unnest_list,
659 )
660 })
661 .collect();
662
663 // If we only have a single type of aggregation present we can
664 // render that directly
665 if plan.len() == 1 {
666 return plan[0].clone();
667 }
668
669 // Otherwise, we have to stitch reductions together.
670
671 // First, lets sanity check that we don't have an impossible number
672 // of reduction types.
673 assert!(plan.len() <= 3);
674
675 let mut collation: CollationPlan = Default::default();
676
677 // Construct a mapping from output_position -> reduction that we can
678 // use to reconstruct the output in the correct order.
679 let aggregate_types = aggregates
680 .iter()
681 .map(|a| reduction_type(&a.func))
682 .collect::<Vec<_>>();
683
684 collation.aggregate_types = aggregate_types;
685
686 for expr in plan.into_iter() {
687 match expr {
688 ReducePlan::Accumulable(e) => {
689 assert_none!(collation.accumulable);
690 collation.accumulable = Some(e);
691 }
692 ReducePlan::Hierarchical(e) => {
693 assert_none!(collation.hierarchical);
694 collation.hierarchical = Some(e);
695 }
696 ReducePlan::Basic(e) => {
697 assert_none!(collation.basic);
698 collation.basic = Some(e);
699 }
700 ReducePlan::Distinct | ReducePlan::Collation(_) => {
701 panic!("Inner reduce plan was unsupported type!")
702 }
703 }
704 }
705
706 ReducePlan::Collation(collation)
707 }
708
709 /// Generate a plan for computing the specified type of aggregations.
710 ///
711 /// This function assumes that all of the supplied aggregates are
712 /// actually of the correct reduction type, and are a subsequence
713 /// of the total list of requested aggregations.
714 fn create_inner(
715 typ: ReductionType,
716 aggregates_list: Vec<(usize, AggregateExpr)>,
717 monotonic: bool,
718 expected_group_size: Option<u64>,
719 fused_unnest_list: bool,
720 ) -> Self {
721 if fused_unnest_list {
722 assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
723 }
724 assert!(
725 aggregates_list.len() > 0,
726 "error: tried to render a reduce dataflow with no aggregates"
727 );
728 match typ {
729 ReductionType::Accumulable => {
730 let mut simple_aggrs = vec![];
731 let mut distinct_aggrs = vec![];
732 let full_aggrs: Vec<_> = aggregates_list
733 .iter()
734 .cloned()
735 .map(|(_, aggr)| aggr)
736 .collect();
737 for (accumulable_index, (datum_index, aggr)) in
738 aggregates_list.into_iter().enumerate()
739 {
740 // Accumulable aggregations need to do extra per-aggregate work
741 // for aggregations with the distinct bit set, so we'll separate
742 // those out now.
743 if aggr.distinct {
744 distinct_aggrs.push((accumulable_index, datum_index, aggr));
745 } else {
746 simple_aggrs.push((accumulable_index, datum_index, aggr));
747 };
748 }
749 ReducePlan::Accumulable(AccumulablePlan {
750 full_aggrs,
751 simple_aggrs,
752 distinct_aggrs,
753 })
754 }
755 ReductionType::Hierarchical => {
756 let aggr_funcs: Vec<_> = aggregates_list
757 .iter()
758 .cloned()
759 .map(|(_, aggr)| aggr.func)
760 .collect();
761 let indexes: Vec<_> = aggregates_list
762 .into_iter()
763 .map(|(index, _)| index)
764 .collect();
765
766 // We don't have random access over Rows so we can simplify the
767 // task of grabbing the inputs we are aggregating over by
768 // generating a list of "skips" an iterator over the Row needs
769 // to do to get the desired indexes.
770 let skips = convert_indexes_to_skips(indexes);
771 if monotonic {
772 let monotonic = MonotonicPlan {
773 aggr_funcs,
774 skips,
775 must_consolidate: false,
776 };
777 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
778 } else {
779 let buckets = bucketing_of_expected_group_size(expected_group_size);
780 let bucketed = BucketedPlan {
781 aggr_funcs,
782 skips,
783 buckets,
784 };
785
786 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
787 }
788 }
789 ReductionType::Basic => {
790 if aggregates_list.len() == 1 {
791 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
792 index: aggregates_list[0].0,
793 expr: aggregates_list[0].1.clone(),
794 fused_unnest_list,
795 }))
796 } else {
797 ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
798 }
799 }
800 }
801 }
802
803 /// Reports all keys of produced arrangements.
804 ///
805 /// This is likely either an empty vector, for no arrangement,
806 /// or a singleton vector containing the list of expressions
807 /// that key a single arrangement.
808 pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
809 let key = (0..key_arity)
810 .map(MirScalarExpr::Column)
811 .collect::<Vec<_>>();
812 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
813 AvailableCollections::new_arranged(vec![(key, permutation, thinning)], None)
814 }
815
816 /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
817 /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
818 /// one sitting on top of the reduction.
819 ///
820 /// Non-fusable parts include temporal predicates or any other parts that cannot be
821 /// conservatively asserted to not increase the memory requirements of the output
822 /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
823 /// being the identity MFP.
824 pub fn extract_mfp_after(
825 &self,
826 mut mfp: MapFilterProject,
827 key_arity: usize,
828 ) -> (MapFilterProject, MapFilterProject, usize) {
829 // Extract temporal predicates, as we cannot push them into `Reduce`.
830 let temporal_mfp = mfp.extract_temporal();
831 let non_temporal = mfp;
832 mfp = temporal_mfp;
833
834 // We ensure we do not attempt to project away the key, as we cannot accomplish
835 // this. This is done by a simple analysis of the non-temporal part of `mfp` to
836 // check if can be directly absorbed; if it can't, we then default to a general
837 // strategy that unpacks the MFP to absorb only the filter and supporting map
838 // parts, followed by a post-MFP step.
839 let input_arity = non_temporal.input_arity;
840 let key = Vec::from_iter(0..key_arity);
841 let mut mfp_push;
842 let output_arity;
843
844 if non_temporal.projection.len() <= input_arity
845 && non_temporal.projection.iter().all(|c| *c < input_arity)
846 && non_temporal.projection.starts_with(&key)
847 {
848 // Special case: The key is preserved as a prefix and the projection is only
849 // of output fields from the reduction. So we know that: (a) We can process the
850 // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
851 // and (c) The output projection is at most as wide as the output that would be
852 // produced by the reduction, so we are sure to never regress the memory
853 // requirements of the output arrangement.
854 // Note that this strategy may change the arity of the output arrangement.
855 output_arity = non_temporal.projection.len();
856 mfp_push = non_temporal;
857 } else {
858 // General strategy: Unpack MFP as MF followed by P' that removes all M
859 // columns, then MP afterwards.
860 // Note that this strategy does not result in any changes to the arity of
861 // the output arrangement.
862 let (m, f, p) = non_temporal.into_map_filter_project();
863 mfp_push = MapFilterProject::new(input_arity)
864 .map(m.clone())
865 .filter(f)
866 .project(0..input_arity);
867 output_arity = input_arity;
868
869 // We still need to perform the map and projection for the actual output.
870 let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
871
872 // Compose the non-pushed MFP components.
873 mfp = MapFilterProject::compose(mfp_left, mfp);
874 }
875 mfp_push.optimize();
876 mfp.optimize();
877 (mfp_push, mfp, output_arity)
878 }
879}
880
881/// Plan for extracting keys and values in preparation for a reduction.
882#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
883pub struct KeyValPlan {
884 /// Extracts the columns used as the key.
885 pub key_plan: mz_expr::SafeMfpPlan,
886 /// Extracts the columns used to feed the aggregations.
887 pub val_plan: mz_expr::SafeMfpPlan,
888}
889
890impl RustType<ProtoKeyValPlan> for KeyValPlan {
891 fn into_proto(&self) -> ProtoKeyValPlan {
892 ProtoKeyValPlan {
893 key_plan: Some(self.key_plan.into_proto()),
894 val_plan: Some(self.val_plan.into_proto()),
895 }
896 }
897
898 fn from_proto(proto: ProtoKeyValPlan) -> Result<Self, TryFromProtoError> {
899 Ok(Self {
900 key_plan: proto
901 .key_plan
902 .into_rust_if_some("ProtoKeyValPlan::key_plan")?,
903 val_plan: proto
904 .val_plan
905 .into_rust_if_some("ProtoKeyValPlan::val_plan")?,
906 })
907 }
908}
909
910impl KeyValPlan {
911 /// Create a new [KeyValPlan] from aggregation arguments.
912 pub fn new(
913 input_arity: usize,
914 group_key: &[MirScalarExpr],
915 aggregates: &[AggregateExpr],
916 input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
917 ) -> Self {
918 // Form an operator for evaluating key expressions.
919 let mut key_mfp = MapFilterProject::new(input_arity)
920 .map(group_key.iter().cloned())
921 .project(input_arity..(input_arity + group_key.len()));
922 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
923 key_mfp.permute_fn(|c| input_permutation[c], new_arity);
924 }
925
926 // Form an operator for evaluating value expressions.
927 let mut val_mfp = MapFilterProject::new(input_arity)
928 .map(aggregates.iter().map(|a| a.expr.clone()))
929 .project(input_arity..(input_arity + aggregates.len()));
930 if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
931 val_mfp.permute_fn(|c| input_permutation[c], new_arity);
932 }
933
934 key_mfp.optimize();
935 let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
936 val_mfp.optimize();
937 let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
938
939 Self { key_plan, val_plan }
940 }
941
942 /// The arity of the key plan
943 pub fn key_arity(&self) -> usize {
944 self.key_plan.projection.len()
945 }
946}
947
948/// Transforms a vector containing indexes of needed columns into one containing
949/// the "skips" an iterator over a Row would need to perform to see those values.
950///
951/// This function requires that all of the elements in `indexes` are strictly
952/// increasing.
953///
954/// # Examples
955///
956/// ```
957/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
958/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
959/// ```
960pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
961 for i in 1..indexes.len() {
962 soft_assert_or_log!(
963 indexes[i - 1] < indexes[i],
964 "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
965 indexes,
966 );
967 }
968
969 for i in (1..indexes.len()).rev() {
970 indexes[i] -= indexes[i - 1];
971 indexes[i] -= 1;
972 }
973
974 indexes
975}
976
977/// Determines whether a function can be accumulated in an update's "difference" field,
978/// and whether it can be subjected to recursive (hierarchical) aggregation.
979///
980/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
981/// which can be accumulated in-place using the addition operation on the type. Aggregations
982/// that indicate they are accumulable will still need to provide an action that takes their
983/// data and introduces it as a difference, and the post-processing when the accumulated value
984/// is presented as data.
985///
986/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
987/// increasingly large subsets of each key. This has the intended property that no invocation
988/// is on a significantly large set of values (and so, no incremental update needs to reform
989/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
990/// input stream is append-only as then we only need to retain the "currently winning" value.
991/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
992pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
993 match func {
994 AggregateFunc::SumInt16
995 | AggregateFunc::SumInt32
996 | AggregateFunc::SumInt64
997 | AggregateFunc::SumUInt16
998 | AggregateFunc::SumUInt32
999 | AggregateFunc::SumUInt64
1000 | AggregateFunc::SumFloat32
1001 | AggregateFunc::SumFloat64
1002 | AggregateFunc::SumNumeric
1003 | AggregateFunc::Count
1004 | AggregateFunc::Any
1005 | AggregateFunc::All
1006 | AggregateFunc::Dummy => ReductionType::Accumulable,
1007 AggregateFunc::MaxNumeric
1008 | AggregateFunc::MaxInt16
1009 | AggregateFunc::MaxInt32
1010 | AggregateFunc::MaxInt64
1011 | AggregateFunc::MaxUInt16
1012 | AggregateFunc::MaxUInt32
1013 | AggregateFunc::MaxUInt64
1014 | AggregateFunc::MaxMzTimestamp
1015 | AggregateFunc::MaxFloat32
1016 | AggregateFunc::MaxFloat64
1017 | AggregateFunc::MaxBool
1018 | AggregateFunc::MaxString
1019 | AggregateFunc::MaxDate
1020 | AggregateFunc::MaxTimestamp
1021 | AggregateFunc::MaxTimestampTz
1022 | AggregateFunc::MaxInterval
1023 | AggregateFunc::MaxTime
1024 | AggregateFunc::MinNumeric
1025 | AggregateFunc::MinInt16
1026 | AggregateFunc::MinInt32
1027 | AggregateFunc::MinInt64
1028 | AggregateFunc::MinUInt16
1029 | AggregateFunc::MinUInt32
1030 | AggregateFunc::MinUInt64
1031 | AggregateFunc::MinMzTimestamp
1032 | AggregateFunc::MinInterval
1033 | AggregateFunc::MinFloat32
1034 | AggregateFunc::MinFloat64
1035 | AggregateFunc::MinBool
1036 | AggregateFunc::MinString
1037 | AggregateFunc::MinDate
1038 | AggregateFunc::MinTimestamp
1039 | AggregateFunc::MinTimestampTz
1040 | AggregateFunc::MinTime => ReductionType::Hierarchical,
1041 AggregateFunc::JsonbAgg { .. }
1042 | AggregateFunc::JsonbObjectAgg { .. }
1043 | AggregateFunc::MapAgg { .. }
1044 | AggregateFunc::ArrayConcat { .. }
1045 | AggregateFunc::ListConcat { .. }
1046 | AggregateFunc::StringAgg { .. }
1047 | AggregateFunc::RowNumber { .. }
1048 | AggregateFunc::Rank { .. }
1049 | AggregateFunc::DenseRank { .. }
1050 | AggregateFunc::LagLead { .. }
1051 | AggregateFunc::FirstValue { .. }
1052 | AggregateFunc::LastValue { .. }
1053 | AggregateFunc::WindowAggregate { .. }
1054 | AggregateFunc::FusedValueWindowFunc { .. }
1055 | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use mz_ore::assert_ok;
1062 use mz_proto::protobuf_roundtrip;
1063 use proptest::prelude::*;
1064
1065 use super::*;
1066
1067 // This test causes stack overflows if not run with --release,
1068 // ignore by default.
1069 proptest! {
1070 #[mz_ore::test]
1071 #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1072 fn reduce_plan_protobuf_roundtrip(expect in any::<ReducePlan>() ) {
1073 let actual = protobuf_roundtrip::<_, ProtoReducePlan>(&expect);
1074 assert_ok!(actual);
1075 assert_eq!(actual.unwrap(), expect);
1076 }
1077 }
1078}