mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Data;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Collection, Diff as _};
27use itertools::Itertools;
28use mz_compute_types::plan::reduce::{
29    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
30    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
31};
32use mz_expr::{
33    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
34};
35use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
36use mz_repr::fixed_length::ToDatumIter;
37use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
38use mz_storage_types::errors::DataflowError;
39use mz_timely_util::operator::CollectionExt;
40use serde::{Deserialize, Serialize};
41use timely::Container;
42use timely::container::{CapacityContainerBuilder, PushInto};
43use timely::dataflow::Scope;
44use timely::progress::timestamp::Refines;
45use tracing::warn;
46
47use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
48use crate::extensions::reduce::{MzReduce, ReduceExt};
49use crate::render::context::{CollectionBundle, Context};
50use crate::render::errors::MaybeValidatingRow;
51use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
52use crate::render::{ArrangementFlavor, Pairer};
53use crate::row_spine::{
54    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
55};
56use crate::typedefs::{
57    ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
58    RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
59};
60
61impl<G, T> Context<G, T>
62where
63    G: Scope,
64    G::Timestamp: MzTimestamp + Refines<T>,
65    T: MzTimestamp,
66{
67    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
68    /// minimize worst-case incremental update times and memory footprint.
69    pub fn render_reduce(
70        &self,
71        input_key: Option<Vec<MirScalarExpr>>,
72        input: CollectionBundle<G, T>,
73        key_val_plan: KeyValPlan,
74        reduce_plan: ReducePlan,
75        mfp_after: Option<MapFilterProject>,
76    ) -> CollectionBundle<G, T> {
77        // Convert `mfp_after` to an actionable plan.
78        let mfp_after = mfp_after.map(|m| {
79            m.into_plan()
80                .expect("MFP planning must succeed")
81                .into_nontemporal()
82                .expect("Fused Reduce MFPs do not have temporal predicates")
83        });
84
85        input.scope().region_named("Reduce", |inner| {
86            let KeyValPlan {
87                mut key_plan,
88                mut val_plan,
89            } = key_val_plan;
90            let key_arity = key_plan.projection.len();
91            let mut datums = DatumVec::new();
92
93            // Determine the columns we'll need from the row.
94            let mut demand = Vec::new();
95            demand.extend(key_plan.demand());
96            demand.extend(val_plan.demand());
97            demand.sort();
98            demand.dedup();
99
100            // remap column references to the subset we use.
101            let mut demand_map = BTreeMap::new();
102            for column in demand.iter() {
103                demand_map.insert(*column, demand_map.len());
104            }
105            let demand_map_len = demand_map.len();
106            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
107            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
109            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
110
111            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
112                input_key.map(|k| (k, None)),
113                max_demand,
114                move |row_datums, time, diff| {
115                    let mut row_builder = SharedRow::get();
116                    let temp_storage = RowArena::new();
117
118                    let mut row_iter = row_datums.drain(..);
119                    let mut datums_local = datums.borrow();
120                    // Unpack only the demanded columns.
121                    for skip in skips.iter() {
122                        datums_local.push(row_iter.nth(*skip).unwrap());
123                    }
124
125                    // Evaluate the key expressions.
126                    let key =
127                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
128                    let key = match key {
129                        Err(e) => {
130                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
131                        }
132                        Ok(Some(key)) => key.clone(),
133                        Ok(None) => panic!("Row expected as no predicate was used"),
134                    };
135
136                    // Evaluate the value expressions.
137                    // The prior evaluation may have left additional columns we should delete.
138                    datums_local.truncate(skips.len());
139                    let val =
140                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
141                    let val = match val {
142                        Err(e) => {
143                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
144                        }
145                        Ok(Some(val)) => val.clone(),
146                        Ok(None) => panic!("Row expected as no predicate was used"),
147                    };
148
149                    Some((Ok((key, val)), time.clone(), diff.clone()))
150                },
151            );
152
153            // Demux out the potential errors from key and value selector evaluation.
154            type CB<T> = ConsolidatingContainerBuilder<T>;
155            let (ok, mut err) = key_val_input
156                .as_collection()
157                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
158
159            err = err.concat(&err_input);
160
161            // Render the reduce plan
162            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
163                .leave_region()
164        })
165    }
166
167    /// Render a dataflow based on the provided plan.
168    ///
169    /// The output will be an arrangements that looks the same as if
170    /// we just had a single reduce operator computing everything together, and
171    /// this arrangement can also be re-used.
172    fn render_reduce_plan<S>(
173        &self,
174        plan: ReducePlan,
175        collection: Collection<S, (Row, Row), Diff>,
176        err_input: Collection<S, DataflowError, Diff>,
177        key_arity: usize,
178        mfp_after: Option<SafeMfpPlan>,
179    ) -> CollectionBundle<S, T>
180    where
181        S: Scope<Timestamp = G::Timestamp>,
182    {
183        let mut errors = Default::default();
184        let arrangement =
185            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
186        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
187        CollectionBundle::from_columns(
188            0..key_arity,
189            ArrangementFlavor::Local(
190                arrangement,
191                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
192            ),
193        )
194    }
195
196    fn render_reduce_plan_inner<S>(
197        &self,
198        plan: ReducePlan,
199        collection: Collection<S, (Row, Row), Diff>,
200        errors: &mut Vec<Collection<S, DataflowError, Diff>>,
201        key_arity: usize,
202        mfp_after: Option<SafeMfpPlan>,
203    ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
204    where
205        S: Scope<Timestamp = G::Timestamp>,
206    {
207        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
208        // not only values (database-issues#6658).
209        let arrangement = match plan {
210            // If we have no aggregations or just a single type of reduction, we
211            // can go ahead and render them directly.
212            ReducePlan::Distinct => {
213                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
214                errors.push(errs);
215                arranged_output
216            }
217            ReducePlan::Accumulable(expr) => {
218                let (arranged_output, errs) =
219                    self.build_accumulable(collection, expr, key_arity, mfp_after);
220                errors.push(errs);
221                arranged_output
222            }
223            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
224                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
225                errors.push(errs);
226                output
227            }
228            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
229                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
230                errors.push(errs);
231                output
232            }
233            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
234                index,
235                expr,
236                fused_unnest_list,
237            })) => {
238                // Note that we skip validating for negative diffs when we have a fused unnest list,
239                // because this is already a CPU-intensive situation due to the non-incrementalness
240                // of window functions.
241                let validating = !fused_unnest_list;
242                let (output, errs) = self.build_basic_aggregate(
243                    collection,
244                    index,
245                    &expr,
246                    validating,
247                    key_arity,
248                    mfp_after,
249                    fused_unnest_list,
250                );
251                if validating {
252                    errors.push(errs.expect("validation should have occurred as it was requested"));
253                }
254                output
255            }
256            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257                let (output, errs) =
258                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259                errors.push(errs);
260                output
261            }
262            // Otherwise, we need to render something different for each type of
263            // reduction, and then stitch them together.
264            ReducePlan::Collation(expr) => {
265                // First, we need to render our constituent aggregations.
266                let mut to_collate = vec![];
267
268                for plan in [
269                    expr.hierarchical.map(ReducePlan::Hierarchical),
270                    expr.accumulable.map(ReducePlan::Accumulable),
271                    expr.basic.map(ReducePlan::Basic),
272                ]
273                .into_iter()
274                .flat_map(std::convert::identity)
275                {
276                    let r#type = ReductionType::try_from(&plan)
277                        .expect("only representable reduction types were used above");
278
279                    let arrangement = self.render_reduce_plan_inner(
280                        plan,
281                        collection.clone(),
282                        errors,
283                        key_arity,
284                        None,
285                    );
286                    to_collate.push((r#type, arrangement));
287                }
288
289                // Now we need to collate them together.
290                let (oks, errs) = self.build_collation(
291                    to_collate,
292                    expr.aggregate_types,
293                    &mut collection.scope(),
294                    mfp_after,
295                );
296                errors.push(errs);
297                oks
298            }
299        };
300        arrangement
301    }
302
303    /// Build the dataflow to combine arrangements containing results of different
304    /// aggregation types into a single arrangement.
305    ///
306    /// This computes the same thing as a join on the group key followed by shuffling
307    /// the values into the correct order. This implementation assumes that all input
308    /// arrangements present values in a way that respects the desired output order,
309    /// so we can do a linear merge to form the output.
310    fn build_collation<S>(
311        &self,
312        arrangements: Vec<(ReductionType, RowRowArrangement<S>)>,
313        aggregate_types: Vec<ReductionType>,
314        scope: &mut S,
315        mfp_after: Option<SafeMfpPlan>,
316    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
317    where
318        S: Scope<Timestamp = G::Timestamp>,
319    {
320        let error_logger = self.error_logger();
321
322        // We must have more than one arrangement to collate.
323        if arrangements.len() <= 1 {
324            error_logger.soft_panic_or_log(
325                "Incorrect number of arrangements in reduce collation",
326                &format!("len={}", arrangements.len()),
327            );
328        }
329
330        let mut to_concat = vec![];
331
332        // First, lets collect all results into a single collection.
333        for (reduction_type, arrangement) in arrangements.into_iter() {
334            let collection = arrangement
335                .as_collection(move |key, val| (key.to_row(), (reduction_type, val.to_row())));
336            to_concat.push(collection);
337        }
338
339        // For each key above, we need to have exactly as many rows as there are distinct
340        // reduction types required by `aggregate_types`. We thus prepare here a properly
341        // deduplicated version of `aggregate_types` for validation during processing below.
342        let mut distinct_aggregate_types = aggregate_types.clone();
343        distinct_aggregate_types.sort_unstable();
344        distinct_aggregate_types.dedup();
345        let n_distinct_aggregate_types = distinct_aggregate_types.len();
346
347        // Allocations for the two closures.
348        let mut datums1 = DatumVec::new();
349        let mut datums2 = DatumVec::new();
350        let mfp_after1 = mfp_after.clone();
351        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
352
353        let aggregate_types_err = aggregate_types.clone();
354        let (oks, errs) = differential_dataflow::collection::concatenate(scope, to_concat)
355            .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_,_,_>, RowValSpine<_, _, _>>("Arrange ReduceCollation")
356            .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
357                "ReduceCollation",
358                "ReduceCollation Errors",
359                {
360                    move |key, input, output| {
361                        // The inputs are pairs of a reduction type, and a row consisting of densely
362                        // packed fused aggregate values.
363                        //
364                        // We need to reconstitute the final value by:
365                        // 1. Extracting out the fused rows by type
366                        // 2. For each aggregate, figure out what type it is, and grab the relevant
367                        //    value from the corresponding fused row.
368                        // 3. Stitch all the values together into one row.
369                        let mut accumulable = DatumList::empty().iter();
370                        let mut hierarchical = DatumList::empty().iter();
371                        let mut basic = DatumList::empty().iter();
372
373                        // Note that hierarchical and basic reductions guard against negative
374                        // multiplicities, and if we only had accumulable aggregations, we would not
375                        // have produced a collation plan, so we do not repeat the check here.
376                        if input.len() != n_distinct_aggregate_types {
377                            return;
378                        }
379
380                        for (item, _) in input.iter() {
381                            let reduction_type = &item.0;
382                            let row = &item.1;
383                            match reduction_type {
384                                ReductionType::Accumulable => accumulable = row.iter(),
385                                ReductionType::Hierarchical => hierarchical = row.iter(),
386                                ReductionType::Basic => basic = row.iter(),
387                            }
388                        }
389
390                        let temp_storage = RowArena::new();
391                        let datum_iter = key.to_datum_iter();
392                        let mut datums_local = datums1.borrow();
393                        datums_local.extend(datum_iter);
394                        let key_len = datums_local.len();
395
396                        // Merge results into the order they were asked for.
397                        for typ in aggregate_types.iter() {
398                            let datum = match typ {
399                                ReductionType::Accumulable => accumulable.next(),
400                                ReductionType::Hierarchical => hierarchical.next(),
401                                ReductionType::Basic => basic.next(),
402                            };
403                            let Some(datum) = datum else { return };
404                            datums_local.push(datum);
405                        }
406
407                        // If we did not have enough values to stitch together, then we do not
408                        // generate an output row. Not outputting here corresponds to the semantics
409                        // of an equi-join on the key, similarly to the proposal in PR materialize#17013.
410                        //
411                        // Note that we also do not want to have anything left over to stich. If we
412                        // do, then we also have an error, reported elsewhere, and would violate
413                        // join semantics.
414                        if (accumulable.next(), hierarchical.next(), basic.next())
415                            == (None, None, None)
416                        {
417                            if let Some(row) = evaluate_mfp_after(
418                                &mfp_after1,
419                                &mut datums_local,
420                                &temp_storage,
421                                key_len,
422                            ) {
423                                output.push((row, Diff::ONE));
424                            }
425                        }
426                    }
427                },
428                move |key, input, output| {
429                    if input.len() != n_distinct_aggregate_types {
430                        // We expected to stitch together exactly as many aggregate types as requested
431                        // by the collation. If we cannot, we log an error and produce no output for
432                        // this key.
433                        let message = "Mismatched aggregates for key in ReduceCollation";
434                        error_logger.log(
435                            message,
436                            &format!(
437                                "key={key:?}, n_aggregates_requested={requested}, \
438                                 n_distinct_aggregate_types={n_distinct_aggregate_types}",
439                                requested = input.len(),
440                            ),
441                        );
442                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
443                        return;
444                    }
445
446                    let mut accumulable = DatumList::empty().iter();
447                    let mut hierarchical = DatumList::empty().iter();
448                    let mut basic = DatumList::empty().iter();
449                    for (item, _) in input.iter() {
450                        let reduction_type = &item.0;
451                        let row = &item.1;
452                        match reduction_type {
453                            ReductionType::Accumulable => accumulable = row.iter(),
454                            ReductionType::Hierarchical => hierarchical = row.iter(),
455                            ReductionType::Basic => basic = row.iter(),
456                        }
457                    }
458
459                    let temp_storage = RowArena::new();
460                    let datum_iter = key.to_datum_iter();
461                    let mut datums_local = datums2.borrow();
462                    datums_local.extend(datum_iter);
463
464                    for typ in aggregate_types_err.iter() {
465                        let datum = match typ {
466                            ReductionType::Accumulable => accumulable.next(),
467                            ReductionType::Hierarchical => hierarchical.next(),
468                            ReductionType::Basic => basic.next(),
469                        };
470                        if let Some(datum) = datum {
471                            datums_local.push(datum);
472                        } else {
473                            // We cannot properly reconstruct a row if aggregates are missing.
474                            // This situation is not expected, so we log an error if it occurs.
475                            let message = "Missing value for key in ReduceCollation";
476                            error_logger.log(message, &format!("typ={typ:?}, key={key:?}"));
477                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
478                            return;
479                        }
480                    }
481
482                    // Note that we also do not want to have anything left over to stich.
483                    // If we do, then we also have an error and would violate join semantics.
484                    if (accumulable.next(), hierarchical.next(), basic.next()) != (None, None, None)
485                    {
486                        let message = "Rows too large for key in ReduceCollation";
487                        error_logger.log(message, &format!("key={key:?}"));
488                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
489                    }
490
491                    // Finally, if `mfp_after` can produce errors, then we should also report
492                    // these here.
493                    let Some(mfp) = &mfp_after2 else { return };
494                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
495                        output.push((e.into(), Diff::ONE));
496                    }
497                },
498            );
499        (oks, errs.as_collection(|_, v| v.clone()))
500    }
501
502    /// Build the dataflow to compute the set of distinct keys.
503    fn build_distinct<S>(
504        &self,
505        collection: Collection<S, (Row, Row), Diff>,
506        mfp_after: Option<SafeMfpPlan>,
507    ) -> (
508        Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
509        Collection<S, DataflowError, Diff>,
510    )
511    where
512        S: Scope<Timestamp = G::Timestamp>,
513    {
514        let error_logger = self.error_logger();
515
516        // Allocations for the two closures.
517        let mut datums1 = DatumVec::new();
518        let mut datums2 = DatumVec::new();
519        let mfp_after1 = mfp_after.clone();
520        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
521
522        let (output, errors) = collection
523            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _,>, RowRowSpine<_, _>>("Arranged DistinctBy")
524            .reduce_pair::<_, RowRowBuilder<_, _,>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
525                "DistinctBy",
526                "DistinctByErrorCheck",
527                move |key, _input, output| {
528                    let temp_storage = RowArena::new();
529                    let mut datums_local = datums1.borrow();
530                    datums_local.extend(key.to_datum_iter());
531
532                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
533                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
534                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
535                    if mfp_after1
536                        .as_ref()
537                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
538                        .unwrap_or(Ok(true))
539                        == Ok(true)
540                    {
541                        // We're pushing a unit value here because the key is implicitly added by the
542                        // arrangement, and the permutation logic takes care of using the key part of the
543                        // output.
544                        output.push((Row::default(), Diff::ONE));
545                    }
546                },
547                move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
548                    for (_, count) in input.iter() {
549                        if count.is_positive() {
550                            continue;
551                        }
552                        let message = "Non-positive multiplicity in DistinctBy";
553                        error_logger.log(message, &format!("row={key:?}, count={count}"));
554                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
555                        return;
556                    }
557                    // If `mfp_after` can error, then evaluate it here.
558                    let Some(mfp) = &mfp_after2 else { return };
559                    let temp_storage = RowArena::new();
560                    let datum_iter = key.to_datum_iter();
561                    let mut datums_local = datums2.borrow();
562                    datums_local.extend(datum_iter);
563
564                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
565                        output.push((e.into(), Diff::ONE));
566                    }
567                },
568            );
569        (output, errors.as_collection(|_k, v| v.clone()))
570    }
571
572    /// Build the dataflow to compute and arrange multiple non-accumulable,
573    /// non-hierarchical aggregations on `input`.
574    ///
575    /// This function assumes that we are explicitly rendering multiple basic aggregations.
576    /// For each aggregate, we render a different reduce operator, and then fuse
577    /// results together into a final arrangement that presents all the results
578    /// in the order specified by `aggrs`.
579    fn build_basic_aggregates<S>(
580        &self,
581        input: Collection<S, (Row, Row), Diff>,
582        aggrs: Vec<(usize, AggregateExpr)>,
583        key_arity: usize,
584        mfp_after: Option<SafeMfpPlan>,
585    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
586    where
587        S: Scope<Timestamp = G::Timestamp>,
588    {
589        // We are only using this function to render multiple basic aggregates and
590        // stitch them together. If that's not true we should complain.
591        if aggrs.len() <= 1 {
592            self.error_logger().soft_panic_or_log(
593                "Too few aggregations when building basic aggregates",
594                &format!("len={}", aggrs.len()),
595            )
596        }
597        let mut err_output = None;
598        let mut to_collect = Vec::new();
599        for (index, aggr) in aggrs {
600            let (result, errs) = self.build_basic_aggregate(
601                input.clone(),
602                index,
603                &aggr,
604                err_output.is_none(),
605                key_arity,
606                None,
607                false,
608            );
609            if errs.is_some() {
610                err_output = errs
611            }
612            to_collect
613                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
614        }
615
616        // Allocations for the two closures.
617        let mut datums1 = DatumVec::new();
618        let mut datums2 = DatumVec::new();
619        let mfp_after1 = mfp_after.clone();
620        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
621
622        let arranged =
623            differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
624                .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
625                "Arranged ReduceFuseBasic input",
626            );
627
628        let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
629            "ReduceFuseBasic",
630            {
631                move |key, input, output| {
632                    let temp_storage = RowArena::new();
633                    let datum_iter = key.to_datum_iter();
634                    let mut datums_local = datums1.borrow();
635                    datums_local.extend(datum_iter);
636                    let key_len = datums_local.len();
637
638                    for ((_, row), _) in input.iter() {
639                        datums_local.push(row.unpack_first());
640                    }
641
642                    if let Some(row) =
643                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
644                    {
645                        output.push((row, Diff::ONE));
646                    }
647                }
648            },
649        );
650        // If `mfp_after` can error, then we need to render a paired reduction
651        // to scan for these potential errors. Note that we cannot directly use
652        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
653        // conditionally render the second component of the reduction pair.
654        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
655        if let Some(mfp) = mfp_after2 {
656            let mfp_errs = arranged
657                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
658                    "ReduceFuseBasic Error Check",
659                    move |key, input, output| {
660                        // Since negative accumulations are checked in at least one component
661                        // aggregate, we only need to look for MFP errors here.
662                        let temp_storage = RowArena::new();
663                        let datum_iter = key.to_datum_iter();
664                        let mut datums_local = datums2.borrow();
665                        datums_local.extend(datum_iter);
666
667                        for ((_, row), _) in input.iter() {
668                            datums_local.push(row.unpack_first());
669                        }
670
671                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
672                            output.push((e.into(), Diff::ONE));
673                        }
674                    },
675                )
676                .as_collection(|_, v| v.clone());
677            (output, validation_errs.concat(&mfp_errs))
678        } else {
679            (output, validation_errs)
680        }
681    }
682
683    /// Build the dataflow to compute a single basic aggregation.
684    ///
685    /// This method also applies distinctness if required.
686    fn build_basic_aggregate<S>(
687        &self,
688        input: Collection<S, (Row, Row), Diff>,
689        index: usize,
690        aggr: &AggregateExpr,
691        validating: bool,
692        key_arity: usize,
693        mfp_after: Option<SafeMfpPlan>,
694        fused_unnest_list: bool,
695    ) -> (
696        RowRowArrangement<S>,
697        Option<Collection<S, DataflowError, Diff>>,
698    )
699    where
700        S: Scope<Timestamp = G::Timestamp>,
701    {
702        let AggregateExpr {
703            func,
704            expr: _,
705            distinct,
706        } = aggr.clone();
707
708        // Extract the value we were asked to aggregate over.
709        let mut partial = input.map(move |(key, row)| {
710            let mut row_builder = SharedRow::get();
711            let value = row.iter().nth(index).unwrap();
712            row_builder.packer().push(value);
713            (key, row_builder.clone())
714        });
715
716        let mut err_output = None;
717
718        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
719        if distinct {
720            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
721            let pairer = Pairer::new(key_arity);
722            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
723            if validating {
724                let (oks, errs) = self
725                    .build_reduce_inaccumulable_distinct::<_,RowValBuilder<Result<(), String>, _,_>, RowValSpine<Result<(), String>, _, _>>(keyed, None)
726                    .as_collection(|k, v| (k.to_row(), v.as_ref().map(|&()| ()).map_err(|m| m.as_str().into())))
727                    .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>("Demux Errors", move |(key_val, result)| match result {
728                        Ok(()) => Ok(pairer.split(&key_val)),
729                        Err(m) => Err(EvalError::Internal(m).into()),
730                    });
731                err_output = Some(errs);
732                partial = oks;
733            } else {
734                partial = self
735                    .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
736                        keyed,
737                        Some(" [val: empty]"),
738                    )
739                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
740            }
741        }
742
743        // Allocations for the two closures.
744        let mut datums1 = DatumVec::new();
745        let mut datums2 = DatumVec::new();
746        let mut datums_key_1 = DatumVec::new();
747        let mut datums_key_2 = DatumVec::new();
748        let mfp_after1 = mfp_after.clone();
749        let func2 = func.clone();
750
751        let name = if !fused_unnest_list {
752            "ReduceInaccumulable"
753        } else {
754            "FusedReduceUnnestList"
755        };
756        let arranged = partial
757            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
758                "Arranged {name}"
759            ));
760        let oks = if !fused_unnest_list {
761            arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
762                move |key, source, target| {
763                    // We respect the multiplicity here (unlike in hierarchical aggregation)
764                    // because we don't know that the aggregation method is not sensitive
765                    // to the number of records.
766                    let iter = source.iter().flat_map(|(v, w)| {
767                        // Note that in the non-positive case, this is wrong, but harmless because
768                        // our other reduction will produce an error.
769                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
770                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
771                    });
772
773                    let temp_storage = RowArena::new();
774                    let datum_iter = key.to_datum_iter();
775                    let mut datums_local = datums1.borrow();
776                    datums_local.extend(datum_iter);
777                    let key_len = datums_local.len();
778                    datums_local.push(
779                        // Note that this is not necessarily a window aggregation, in which case
780                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
781                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
782                            iter,
783                            &temp_storage,
784                        ),
785                    );
786
787                    if let Some(row) =
788                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
789                    {
790                        target.push((row, Diff::ONE));
791                    }
792                }
793            })
794        } else {
795            arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
796                move |key, source, target| {
797                    // This part is the same as in the `!fused_unnest_list` if branch above.
798                    let iter = source.iter().flat_map(|(v, w)| {
799                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
800                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
801                    });
802
803                    // This is the part that is specific to the `fused_unnest_list` branch.
804                    let temp_storage = RowArena::new();
805                    let mut datums_local = datums_key_1.borrow();
806                    datums_local.extend(key.to_datum_iter());
807                    let key_len = datums_local.len();
808                    for datum in func
809                        .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
810                            iter,
811                            &temp_storage,
812                        )
813                    {
814                        datums_local.truncate(key_len);
815                        datums_local.push(datum);
816                        if let Some(row) = evaluate_mfp_after(
817                            &mfp_after1,
818                            &mut datums_local,
819                            &temp_storage,
820                            key_len,
821                        ) {
822                            target.push((row, Diff::ONE));
823                        }
824                    }
825                }
826            })
827        };
828
829        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
830        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
831        // rationale around using a second reduction here.
832        let must_validate = validating && err_output.is_none();
833        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
834        if must_validate || mfp_after2.is_some() {
835            let error_logger = self.error_logger();
836
837            let errs = if !fused_unnest_list {
838                arranged
839                    .mz_reduce_abelian::<_,  RowErrBuilder<_,_>, RowErrSpine<_, _>>(
840                        &format!("{name} Error Check"),
841                        move |key, source, target| {
842                            // Negative counts would be surprising, but until we are 100% certain we won't
843                            // see them, we should report when we do. We may want to bake even more info
844                            // in here in the future.
845                            if must_validate {
846                                for (value, count) in source.iter() {
847                                    if count.is_positive() {
848                                        continue;
849                                    }
850                                    let value = value.to_row();
851                                    let message = "Non-positive accumulation in ReduceInaccumulable";
852                                    error_logger
853                                        .log(message, &format!("value={value:?}, count={count}"));
854                                    target.push((EvalError::Internal(message.into()).into(), Diff::ONE));
855                                    return;
856                                }
857                            }
858
859                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
860                            let Some(mfp) = &mfp_after2 else { return };
861                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
862                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
863                                // This would ideally use `to_datum_iter` but we cannot as it needs to
864                                // borrow `v` and only presents datums with that lifetime, not any longer.
865                                std::iter::repeat(v.next().unwrap()).take(count)
866                            });
867
868                            let temp_storage = RowArena::new();
869                            let datum_iter = key.to_datum_iter();
870                            let mut datums_local = datums2.borrow();
871                            datums_local.extend(datum_iter);
872                            datums_local.push(
873                                func2.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
874                                    iter,
875                                    &temp_storage,
876                                ),
877                            );
878                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
879                            {
880                                target.push((e.into(), Diff::ONE));
881                            }
882                        },
883                    )
884                    .as_collection(|_, v| v.clone())
885            } else {
886                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
887                assert!(!must_validate);
888                // We couldn't have got into this if branch due to `must_validate`, so it must be
889                // because of the `mfp_after2.is_some()`.
890                let Some(mfp) = mfp_after2 else {
891                    unreachable!()
892                };
893                arranged
894                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
895                        &format!("{name} Error Check"),
896                        move |key, source, target| {
897                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
898                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
899                                // This would ideally use `to_datum_iter` but we cannot as it needs to
900                                // borrow `v` and only presents datums with that lifetime, not any longer.
901                                std::iter::repeat(v.next().unwrap()).take(count)
902                            });
903
904                            let temp_storage = RowArena::new();
905                            let mut datums_local = datums_key_2.borrow();
906                            datums_local.extend(key.to_datum_iter());
907                            let key_len = datums_local.len();
908                            for datum in func2
909                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
910                                    iter,
911                                    &temp_storage,
912                                )
913                            {
914                                datums_local.truncate(key_len);
915                                datums_local.push(datum);
916                                // We know that `mfp` can error (because of the `could_error` call
917                                // above), so try to evaluate it here.
918                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
919                                {
920                                    target.push((e.into(), Diff::ONE));
921                                }
922                            }
923                        },
924                    )
925                    .as_collection(|_, v| v.clone())
926            };
927
928            if let Some(e) = err_output {
929                err_output = Some(e.concat(&errs));
930            } else {
931                err_output = Some(errs);
932            }
933        }
934        (oks, err_output)
935    }
936
937    fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
938        &self,
939        input: Collection<S, Row, Diff>,
940        name_tag: Option<&str>,
941    ) -> Arranged<S, TraceAgent<Tr>>
942    where
943        S: Scope<Timestamp = G::Timestamp>,
944        Tr: for<'a> Trace<
945                Key<'a> = DatumSeq<'a>,
946                KeyOwn = Row,
947                Time = G::Timestamp,
948                Diff = Diff,
949                ValOwn: Data + MaybeValidatingRow<(), String>,
950            > + 'static,
951        Bu: Builder<
952                Time = G::Timestamp,
953                Input: Container + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
954                Output = Tr::Batch,
955            >,
956        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
957    {
958        let error_logger = self.error_logger();
959
960        let output_name = format!(
961            "ReduceInaccumulable Distinct{}",
962            name_tag.unwrap_or_default()
963        );
964
965        let input: KeyCollection<_, _, _> = input.into();
966        input
967            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
968                "Arranged ReduceInaccumulable Distinct [val: empty]",
969            )
970            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
971                if let Some(err) = Tr::ValOwn::into_error() {
972                    for (value, count) in source.iter() {
973                        if count.is_positive() {
974                            continue;
975                        }
976
977                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
978                        error_logger.log(message, &format!("value={value:?}, count={count}"));
979                        t.push((err(message.to_string()), Diff::ONE));
980                        return;
981                    }
982                }
983                t.push((Tr::ValOwn::ok(()), Diff::ONE))
984            })
985    }
986
987    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
988    /// on non-monotonic inputs.
989    ///
990    /// This function renders a single reduction tree that computes aggregations with
991    /// a priority queue implemented with a series of reduce operators that partition
992    /// the input into buckets, and compute the aggregation over very small buckets
993    /// and feed the results up to larger buckets.
994    ///
995    /// Note that this implementation currently ignores the distinct bit because we
996    /// currently only perform min / max hierarchically and the reduction tree
997    /// efficiently suppresses non-distinct updates.
998    ///
999    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
1000    /// trickery here to limit the memory usage per layer by internally
1001    /// holding only the elements that were rejected by this stage. However, the
1002    /// output collection maintains the `((key, bucket), (passing value)` for this
1003    /// stage.
1004    fn build_bucketed<S>(
1005        &self,
1006        input: Collection<S, (Row, Row), Diff>,
1007        BucketedPlan {
1008            aggr_funcs,
1009            skips,
1010            buckets,
1011        }: BucketedPlan,
1012        key_arity: usize,
1013        mfp_after: Option<SafeMfpPlan>,
1014    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1015    where
1016        S: Scope<Timestamp = G::Timestamp>,
1017    {
1018        let mut err_output: Option<Collection<S, _, _>> = None;
1019        let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
1020            let input = input.enter(inner);
1021
1022            // The first mod to apply to the hash.
1023            let first_mod = buckets.get(0).copied().unwrap_or(1);
1024
1025            // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
1026            let mut stage = input.map(move |(key, row)| {
1027                let mut row_builder = SharedRow::get();
1028                let mut row_packer = row_builder.packer();
1029                let mut row_iter = row.iter();
1030                for skip in skips.iter() {
1031                    row_packer.push(row_iter.nth(*skip).unwrap());
1032                }
1033                let values = row_builder.clone();
1034
1035                // Apply the initial mod here.
1036                let hash = values.hashed() % first_mod;
1037                let hash_key =
1038                    row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
1039                (hash_key, values)
1040            });
1041
1042            // Repeatedly apply hierarchical reduction with a progressively coarser key.
1043            for (index, b) in buckets.into_iter().enumerate() {
1044                // Apply subsequent bucket mods for all but the first round.
1045                let input = if index == 0 {
1046                    stage
1047                } else {
1048                    stage.map(move |(hash_key, values)| {
1049                        let mut hash_key_iter = hash_key.iter();
1050                        let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
1051                        // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
1052                        let hash_key = SharedRow::pack(
1053                            std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
1054                        );
1055                        (hash_key, values)
1056                    })
1057                };
1058
1059                // We only want the first stage to perform validation of whether invalid accumulations
1060                // were observed in the input. Subsequently, we will either produce an error in the error
1061                // stream or produce correct data in the output stream.
1062                let validating = err_output.is_none();
1063
1064                let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
1065                if let Some(errs) = errs {
1066                    err_output = Some(errs.leave_region());
1067                }
1068                stage = oks
1069            }
1070
1071            // Discard the hash from the key and return to the format of the input data.
1072            let partial = stage.map(move |(hash_key, values)| {
1073                let mut hash_key_iter = hash_key.iter();
1074                let _hash = hash_key_iter.next();
1075                (SharedRow::pack(hash_key_iter.take(key_arity)), values)
1076            });
1077
1078            // Allocations for the two closures.
1079            let mut datums1 = DatumVec::new();
1080            let mut datums2 = DatumVec::new();
1081            let mfp_after1 = mfp_after.clone();
1082            let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1083            let aggr_funcs2 = aggr_funcs.clone();
1084
1085            // Build a series of stages for the reduction
1086            // Arrange the final result into (key, Row)
1087            let error_logger = self.error_logger();
1088            // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1089            // view mz_introspection.mz_expected_group_size_advice.
1090            let arranged = partial
1091                .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1092                    "Arrange ReduceMinsMaxes",
1093                );
1094            // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
1095            // but we then wouldn't be able to do this error check conditionally.  See its documentation
1096            // for the rationale around using a second reduction here.
1097            let must_validate = err_output.is_none();
1098            if must_validate || mfp_after2.is_some() {
1099                let errs = arranged
1100                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1101                        "ReduceMinsMaxes Error Check",
1102                        move |key, source, target| {
1103                            // Negative counts would be surprising, but until we are 100% certain we wont
1104                            // see them, we should report when we do. We may want to bake even more info
1105                            // in here in the future.
1106                            if must_validate {
1107                                for (val, count) in source.iter() {
1108                                    if count.is_positive() {
1109                                        continue;
1110                                    }
1111                                    let val = val.to_row();
1112                                    let message = "Non-positive accumulation in ReduceMinsMaxes";
1113                                    error_logger
1114                                        .log(message, &format!("val={val:?}, count={count}"));
1115                                    target.push((
1116                                        EvalError::Internal(message.into()).into(),
1117                                        Diff::ONE,
1118                                    ));
1119                                    return;
1120                                }
1121                            }
1122
1123                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
1124                            let Some(mfp) = &mfp_after2 else { return };
1125                            let temp_storage = RowArena::new();
1126                            let datum_iter = key.to_datum_iter();
1127                            let mut datums_local = datums2.borrow();
1128                            datums_local.extend(datum_iter);
1129
1130                            let mut source_iters = source
1131                                .iter()
1132                                .map(|(values, _cnt)| *values)
1133                                .collect::<Vec<_>>();
1134                            for func in aggr_funcs2.iter() {
1135                                let column_iter = (0..source_iters.len())
1136                                    .map(|i| source_iters[i].next().unwrap());
1137                                datums_local.push(func.eval(column_iter, &temp_storage));
1138                            }
1139                            if let Result::Err(e) =
1140                                mfp.evaluate_inner(&mut datums_local, &temp_storage)
1141                            {
1142                                target.push((e.into(), Diff::ONE));
1143                            }
1144                        },
1145                    )
1146                    .as_collection(|_, v| v.clone())
1147                    .leave_region();
1148                if let Some(e) = &err_output {
1149                    err_output = Some(e.concat(&errs));
1150                } else {
1151                    err_output = Some(errs);
1152                }
1153            }
1154            arranged
1155                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1156                    "ReduceMinsMaxes",
1157                    move |key, source, target| {
1158                        let temp_storage = RowArena::new();
1159                        let datum_iter = key.to_datum_iter();
1160                        let mut datums_local = datums1.borrow();
1161                        datums_local.extend(datum_iter);
1162                        let key_len = datums_local.len();
1163
1164                        let mut source_iters = source
1165                            .iter()
1166                            .map(|(values, _cnt)| *values)
1167                            .collect::<Vec<_>>();
1168                        for func in aggr_funcs.iter() {
1169                            let column_iter =
1170                                (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1171                            datums_local.push(func.eval(column_iter, &temp_storage));
1172                        }
1173
1174                        if let Some(row) = evaluate_mfp_after(
1175                            &mfp_after1,
1176                            &mut datums_local,
1177                            &temp_storage,
1178                            key_len,
1179                        ) {
1180                            target.push((row, Diff::ONE));
1181                        }
1182                    },
1183                )
1184                .leave_region()
1185        });
1186        (
1187            arranged_output,
1188            err_output.expect("expected to validate in one level of the hierarchy"),
1189        )
1190    }
1191
1192    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
1193    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
1194    /// with the negation of what's produced by the reduction.
1195    /// `validating` indicates whether we want this stage to perform error detection
1196    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
1197    /// stages can skip validation.
1198    fn build_bucketed_stage<S>(
1199        &self,
1200        aggr_funcs: &Vec<AggregateFunc>,
1201        input: &Collection<S, (Row, Row), Diff>,
1202        validating: bool,
1203    ) -> (
1204        Collection<S, (Row, Row), Diff>,
1205        Option<Collection<S, DataflowError, Diff>>,
1206    )
1207    where
1208        S: Scope<Timestamp = G::Timestamp>,
1209    {
1210        let (input, negated_output, errs) = if validating {
1211            let (input, reduced) = self
1212                .build_bucketed_negated_output::<_, RowValBuilder<_,_,_>, RowValSpine<Result<Row, Row>, _, _>>(
1213                    input,
1214                    aggr_funcs.clone(),
1215                );
1216            let (oks, errs) = reduced
1217                .as_collection(|k, v| (k.to_row(), v.clone()))
1218                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1219                "Checked Invalid Accumulations",
1220                |(hash_key, result)| match result {
1221                    Err(hash_key) => {
1222                        let mut hash_key_iter = hash_key.iter();
1223                        let _hash = hash_key_iter.next();
1224                        let key = SharedRow::pack(hash_key_iter);
1225                        let message = format!(
1226                            "Invalid data in source, saw non-positive accumulation \
1227                                         for key {key:?} in hierarchical mins-maxes aggregate"
1228                        );
1229                        Err(EvalError::Internal(message.into()).into())
1230                    }
1231                    Ok(values) => Ok((hash_key, values)),
1232                },
1233            );
1234            (input, oks, Some(errs))
1235        } else {
1236            let (input, reduced) = self
1237                .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1238                    input,
1239                    aggr_funcs.clone(),
1240                );
1241            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1242            // that we need to apply the mod on both input and oks.
1243            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1244            (input, oks, None)
1245        };
1246
1247        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1248        let oks = negated_output.concat(&input);
1249        (oks, errs)
1250    }
1251
1252    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1253    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1254    /// with all diffs in the reduction's output negated.
1255    fn build_bucketed_negated_output<S, Bu, Tr>(
1256        &self,
1257        input: &Collection<S, (Row, Row), Diff>,
1258        aggrs: Vec<AggregateFunc>,
1259    ) -> (
1260        Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1261        Arranged<S, TraceAgent<Tr>>,
1262    )
1263    where
1264        S: Scope<Timestamp = G::Timestamp>,
1265        Tr: for<'a> Trace<
1266                Key<'a> = DatumSeq<'a>,
1267                KeyOwn = Row,
1268                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1269                Time = G::Timestamp,
1270                Diff = Diff,
1271            > + 'static,
1272        Bu: Builder<
1273                Time = G::Timestamp,
1274                Input: Container + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1275                Output = Tr::Batch,
1276            >,
1277        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1278    {
1279        let error_logger = self.error_logger();
1280        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1281        // view mz_introspection.mz_expected_group_size_advice.
1282        let arranged_input = input
1283            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1284                "Arranged MinsMaxesHierarchical input",
1285            );
1286
1287        let reduced = arranged_input.mz_reduce_abelian::<_, Bu, Tr>(
1288            "Reduced Fallibly MinsMaxesHierarchical",
1289            move |key, source, target| {
1290                if let Some(err) = Tr::ValOwn::into_error() {
1291                    // Should negative accumulations reach us, we should loudly complain.
1292                    for (value, count) in source.iter() {
1293                        if count.is_positive() {
1294                            continue;
1295                        }
1296                        error_logger.log(
1297                            "Non-positive accumulation in MinsMaxesHierarchical",
1298                            &format!("key={key:?}, value={value:?}, count={count}"),
1299                        );
1300                        // After complaining, output an error here so that we can eventually
1301                        // report it in an error stream.
1302                        target.push((err(Tr::owned_key(key)), Diff::ONE));
1303                        return;
1304                    }
1305                }
1306
1307                let mut row_builder = SharedRow::get();
1308                let mut row_packer = row_builder.packer();
1309
1310                let mut source_iters = source
1311                    .iter()
1312                    .map(|(values, _cnt)| *values)
1313                    .collect::<Vec<_>>();
1314                for func in aggrs.iter() {
1315                    let column_iter =
1316                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1317                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1318                }
1319                // We only want to arrange the parts of the input that are not part of the output.
1320                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1321                // gives us the intended value of this aggregate function. Also we assume that regardless
1322                // of the multiplicity of the final result in the input, we only want to have one copy
1323                // in the output.
1324                target.reserve(source.len().saturating_add(1));
1325                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1326                target.extend(source.iter().map(|(values, cnt)| {
1327                    let mut cnt = *cnt;
1328                    cnt.negate();
1329                    (Tr::ValOwn::ok(values.to_row()), cnt)
1330                }));
1331            },
1332        );
1333        (arranged_input, reduced)
1334    }
1335
1336    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1337    /// on monotonic inputs.
1338    fn build_monotonic<S>(
1339        &self,
1340        collection: Collection<S, (Row, Row), Diff>,
1341        MonotonicPlan {
1342            aggr_funcs,
1343            skips,
1344            must_consolidate,
1345        }: MonotonicPlan,
1346        mfp_after: Option<SafeMfpPlan>,
1347    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1348    where
1349        S: Scope<Timestamp = G::Timestamp>,
1350    {
1351        // Gather the relevant values into a vec of rows ordered by aggregation_index
1352        let collection = collection
1353            .map(move |(key, row)| {
1354                let mut row_builder = SharedRow::get();
1355                let mut values = Vec::with_capacity(skips.len());
1356                let mut row_iter = row.iter();
1357                for skip in skips.iter() {
1358                    values.push(
1359                        row_builder.pack_using(std::iter::once(row_iter.nth(*skip).unwrap())),
1360                    );
1361                }
1362
1363                (key, values)
1364            })
1365            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1366                must_consolidate,
1367                "Consolidated ReduceMonotonic input",
1368            );
1369
1370        // It should be now possible to ensure that we have a monotonic collection.
1371        let error_logger = self.error_logger();
1372        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1373            error_logger.log(
1374                "Non-monotonic input to ReduceMonotonic",
1375                &format!("data={data:?}, diff={diff}"),
1376            );
1377            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1378            (EvalError::Internal(m).into(), Diff::ONE)
1379        });
1380        // We can place our rows directly into the diff field, and
1381        // only keep the relevant one corresponding to evaluating our
1382        // aggregate, instead of having to do a hierarchical reduction.
1383        let partial = partial.explode_one(move |(key, values)| {
1384            let mut output = Vec::new();
1385            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1386                output.push(monoids::get_monoid(row, func).expect(
1387                    "hierarchical aggregations are expected to have monoid implementations",
1388                ));
1389            }
1390            (key, output)
1391        });
1392
1393        // Allocations for the two closures.
1394        let mut datums1 = DatumVec::new();
1395        let mut datums2 = DatumVec::new();
1396        let mfp_after1 = mfp_after.clone();
1397        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1398
1399        let partial: KeyCollection<_, _, _> = partial.into();
1400        let arranged = partial
1401            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1402                "ArrangeMonotonic [val: empty]",
1403            );
1404        let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1405            "ReduceMonotonic",
1406            {
1407                move |key, input, output| {
1408                    let temp_storage = RowArena::new();
1409                    let datum_iter = key.to_datum_iter();
1410                    let mut datums_local = datums1.borrow();
1411                    datums_local.extend(datum_iter);
1412                    let key_len = datums_local.len();
1413                    let accum = &input[0].1;
1414                    for monoid in accum.iter() {
1415                        datums_local.extend(monoid.finalize().iter());
1416                    }
1417
1418                    if let Some(row) =
1419                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1420                    {
1421                        output.push((row, Diff::ONE));
1422                    }
1423                }
1424            },
1425        );
1426
1427        // If `mfp_after` can error, then we need to render a paired reduction
1428        // to scan for these potential errors. Note that we cannot directly use
1429        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1430        // conditionally render the second component of the reduction pair.
1431        if let Some(mfp) = mfp_after2 {
1432            let mfp_errs = arranged
1433                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1434                    "ReduceMonotonic Error Check",
1435                    move |key, input, output| {
1436                        let temp_storage = RowArena::new();
1437                        let datum_iter = key.to_datum_iter();
1438                        let mut datums_local = datums2.borrow();
1439                        datums_local.extend(datum_iter);
1440                        let accum = &input[0].1;
1441                        for monoid in accum.iter() {
1442                            datums_local.extend(monoid.finalize().iter());
1443                        }
1444                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1445                        {
1446                            output.push((e.into(), Diff::ONE));
1447                        }
1448                    },
1449                )
1450                .as_collection(|_k, v| v.clone());
1451            (output, validation_errs.concat(&mfp_errs))
1452        } else {
1453            (output, validation_errs)
1454        }
1455    }
1456
1457    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1458    ///
1459    /// The incoming values are moved to the update's "difference" field, at which point
1460    /// they can be accumulated in place. The `count` operator promotes the accumulated
1461    /// values to data, at which point a final map applies operator-specific logic to
1462    /// yield the final aggregate.
1463    fn build_accumulable<S>(
1464        &self,
1465        collection: Collection<S, (Row, Row), Diff>,
1466        AccumulablePlan {
1467            full_aggrs,
1468            simple_aggrs,
1469            distinct_aggrs,
1470        }: AccumulablePlan,
1471        key_arity: usize,
1472        mfp_after: Option<SafeMfpPlan>,
1473    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1474    where
1475        S: Scope<Timestamp = G::Timestamp>,
1476    {
1477        // we must have called this function with something to reduce
1478        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1479            self.error_logger().soft_panic_or_log(
1480                "Incorrect numbers of aggregates in accummulable reduction rendering",
1481                &format!(
1482                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1483                    full_aggrs.len(),
1484                    simple_aggrs.len(),
1485                    distinct_aggrs.len(),
1486                ),
1487            );
1488        }
1489
1490        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1491        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1492        // Other aggregations can be directly moved in to the `diff` field.
1493        //
1494        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1495        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1496        // generally the count, and then two aggregation-specific values. The size could be
1497        // reduced if we want to specialize for the aggregations.
1498
1499        // Instantiate a default vector for diffs with the correct types at each
1500        // position.
1501        let zero_diffs: (Vec<_>, Diff) = (
1502            full_aggrs
1503                .iter()
1504                .map(|f| accumulable_zero(&f.func))
1505                .collect(),
1506            Diff::ZERO,
1507        );
1508
1509        let mut to_aggregate = Vec::new();
1510        if simple_aggrs.len() > 0 {
1511            // First, collect all non-distinct aggregations in one pass.
1512            let easy_cases = collection.explode_one({
1513                let zero_diffs = zero_diffs.clone();
1514                move |(key, row)| {
1515                    let mut diffs = zero_diffs.clone();
1516                    // Try to unpack only the datums we need. Unfortunately, since we
1517                    // can't random access into a Row, we have to iterate through one by one.
1518                    // TODO: Even though we don't have random access, we could still avoid unpacking
1519                    // everything that we don't care about, and it might be worth it to extend the
1520                    // Row API to do that.
1521                    let mut row_iter = row.iter().enumerate();
1522                    for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
1523                        let mut datum = row_iter.next().unwrap();
1524                        while datum_index != &datum.0 {
1525                            datum = row_iter.next().unwrap();
1526                        }
1527                        let datum = datum.1;
1528                        diffs.0[*accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1529                        diffs.1 = Diff::ONE;
1530                    }
1531                    ((key, ()), diffs)
1532                }
1533            });
1534            to_aggregate.push(easy_cases);
1535        }
1536
1537        // Next, collect all aggregations that require distinctness.
1538        for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
1539            let pairer = Pairer::new(key_arity);
1540            let collection = collection
1541                .map(move |(key, row)| {
1542                    let value = row.iter().nth(datum_index).unwrap();
1543                    (pairer.merge(&key, std::iter::once(value)), ())
1544                })
1545                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1546                    "Arranged Accumulable Distinct [val: empty]",
1547                )
1548                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1549                    "Reduced Accumulable Distinct [val: empty]",
1550                    move |_k, _s, t| t.push(((), Diff::ONE)),
1551                )
1552                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1553                .explode_one({
1554                    let zero_diffs = zero_diffs.clone();
1555                    move |(key, row)| {
1556                        let datum = row.iter().next().unwrap();
1557                        let mut diffs = zero_diffs.clone();
1558                        diffs.0[accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1559                        diffs.1 = Diff::ONE;
1560                        ((key, ()), diffs)
1561                    }
1562                });
1563            to_aggregate.push(collection);
1564        }
1565
1566        // now concatenate, if necessary, multiple aggregations
1567        let collection = if to_aggregate.len() == 1 {
1568            to_aggregate.remove(0)
1569        } else {
1570            differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1571        };
1572
1573        // Allocations for the two closures.
1574        let mut datums1 = DatumVec::new();
1575        let mut datums2 = DatumVec::new();
1576        let mfp_after1 = mfp_after.clone();
1577        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1578        let full_aggrs2 = full_aggrs.clone();
1579
1580        let error_logger = self.error_logger();
1581        let err_full_aggrs = full_aggrs.clone();
1582        let (arranged_output, arranged_errs) = collection
1583            .mz_arrange::<RowBatcher<_,_>, RowBuilder<_,_>, RowSpine<_, (Vec<Accum>, Diff)>>("ArrangeAccumulable [val: empty]")
1584            .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
1585                "ReduceAccumulable",
1586                "AccumulableErrorCheck",
1587                {
1588                    move |key, input, output| {
1589                        let (ref accums, total) = input[0].1;
1590
1591                        let temp_storage = RowArena::new();
1592                        let datum_iter = key.to_datum_iter();
1593                        let mut datums_local = datums1.borrow();
1594                        datums_local.extend(datum_iter);
1595                        let key_len = datums_local.len();
1596                        for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1597                            datums_local.push(finalize_accum(&aggr.func, accum, total));
1598                        }
1599
1600                        if let Some(row) = evaluate_mfp_after(
1601                            &mfp_after1,
1602                            &mut datums_local,
1603                            &temp_storage,
1604                            key_len,
1605                        ) {
1606                            output.push((row, Diff::ONE));
1607                        }
1608                    }
1609                },
1610                move |key, input, output| {
1611                    let (ref accums, total) = input[0].1;
1612                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1613                        // We first test here if inputs without net-positive records are present,
1614                        // producing an error to the logs and to the query output if that is the case.
1615                        if total == Diff::ZERO && !accum.is_zero() {
1616                            error_logger.log(
1617                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1618                                &format!("aggr={aggr:?}, accum={accum:?}"),
1619                            );
1620                            let key = key.to_row();
1621                            let message = format!(
1622                                "Invalid data in source, saw net-zero records for key {key} \
1623                                 with non-zero accumulation in accumulable aggregate"
1624                            );
1625                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1626                        }
1627                        match (&aggr.func, &accum) {
1628                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1629                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1630                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1631                                if accum.is_negative() {
1632                                    error_logger.log(
1633                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1634                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1635                                );
1636                                    let key = key.to_row();
1637                                    let message = format!(
1638                                        "Invalid data in source, saw negative accumulation with \
1639                                         unsigned type for key {key}"
1640                                    );
1641                                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1642                                }
1643                            }
1644                            _ => (), // no more errors to check for at this point!
1645                        }
1646                    }
1647
1648                    // If `mfp_after` can error, then evaluate it here.
1649                    let Some(mfp) = &mfp_after2 else { return };
1650                    let temp_storage = RowArena::new();
1651                    let datum_iter = key.to_datum_iter();
1652                    let mut datums_local = datums2.borrow();
1653                    datums_local.extend(datum_iter);
1654                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1655                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1656                    }
1657
1658                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1659                        output.push((e.into(), Diff::ONE));
1660                    }
1661                },
1662            );
1663        (
1664            arranged_output,
1665            arranged_errs.as_collection(|_key, error| error.clone()),
1666        )
1667    }
1668}
1669
1670/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1671/// containing key and aggregate values, then returns a result `Row` or `None`
1672/// if the MFP filters the result out.
1673fn evaluate_mfp_after<'a, 'b>(
1674    mfp_after: &'a Option<SafeMfpPlan>,
1675    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1676    temp_storage: &'a RowArena,
1677    key_len: usize,
1678) -> Option<Row> {
1679    let mut row_builder = SharedRow::get();
1680    // Apply MFP if it exists and pack a Row of
1681    // aggregate values from `datums_local`.
1682    if let Some(mfp) = mfp_after {
1683        // It must ignore errors here, but they are scanned
1684        // for elsewhere if the MFP can error.
1685        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1686            // The `mfp_after` must preserve the key columns,
1687            // so we can skip them to form aggregation results.
1688            Some(row_builder.pack_using(iter.skip(key_len)))
1689        } else {
1690            None
1691        }
1692    } else {
1693        Some(row_builder.pack_using(&datums_local[key_len..]))
1694    }
1695}
1696
1697fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1698    match aggr_func {
1699        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1700            trues: Diff::ZERO,
1701            falses: Diff::ZERO,
1702        },
1703        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1704            accum: AccumCount::ZERO,
1705            pos_infs: Diff::ZERO,
1706            neg_infs: Diff::ZERO,
1707            nans: Diff::ZERO,
1708            non_nulls: Diff::ZERO,
1709        },
1710        AggregateFunc::SumNumeric => Accum::Numeric {
1711            accum: OrderedDecimal(NumericAgg::zero()),
1712            pos_infs: Diff::ZERO,
1713            neg_infs: Diff::ZERO,
1714            nans: Diff::ZERO,
1715            non_nulls: Diff::ZERO,
1716        },
1717        _ => Accum::SimpleNumber {
1718            accum: AccumCount::ZERO,
1719            non_nulls: Diff::ZERO,
1720        },
1721    }
1722}
1723
1724static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1725
1726fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1727    match aggregate_func {
1728        AggregateFunc::Count => Accum::SimpleNumber {
1729            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1730            non_nulls: if datum.is_null() {
1731                Diff::ZERO
1732            } else {
1733                Diff::ONE
1734            },
1735        },
1736        AggregateFunc::Any | AggregateFunc::All => match datum {
1737            Datum::True => Accum::Bool {
1738                trues: Diff::ONE,
1739                falses: Diff::ZERO,
1740            },
1741            Datum::Null => Accum::Bool {
1742                trues: Diff::ZERO,
1743                falses: Diff::ZERO,
1744            },
1745            Datum::False => Accum::Bool {
1746                trues: Diff::ZERO,
1747                falses: Diff::ONE,
1748            },
1749            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1750        },
1751        AggregateFunc::Dummy => match datum {
1752            Datum::Dummy => Accum::SimpleNumber {
1753                accum: AccumCount::ZERO,
1754                non_nulls: Diff::ZERO,
1755            },
1756            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1757        },
1758        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1759            let n = match datum {
1760                Datum::Float32(n) => f64::from(*n),
1761                Datum::Float64(n) => *n,
1762                Datum::Null => 0f64,
1763                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1764            };
1765
1766            let nans = Diff::from(n.is_nan());
1767            let pos_infs = Diff::from(n == f64::INFINITY);
1768            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1769            let non_nulls = Diff::from(datum != Datum::Null);
1770
1771            // Map the floating point value onto a fixed precision domain
1772            // All special values should map to zero, since they are tracked separately
1773            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1774                AccumCount::ZERO
1775            } else {
1776                // This operation will truncate to i128::MAX if out of range.
1777                // TODO(benesch): rewrite to avoid `as`.
1778                #[allow(clippy::as_conversions)]
1779                { (n * *FLOAT_SCALE) as i128 }.into()
1780            };
1781
1782            Accum::Float {
1783                accum,
1784                pos_infs,
1785                neg_infs,
1786                nans,
1787                non_nulls,
1788            }
1789        }
1790        AggregateFunc::SumNumeric => match datum {
1791            Datum::Numeric(n) => {
1792                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1793                    if n.0.is_negative() {
1794                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1795                    } else {
1796                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1797                    }
1798                } else if n.0.is_nan() {
1799                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1800                } else {
1801                    // Take a narrow decimal (datum) into a wide decimal
1802                    // (aggregator).
1803                    let mut cx_agg = numeric::cx_agg();
1804                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1805                };
1806
1807                Accum::Numeric {
1808                    accum: OrderedDecimal(accum),
1809                    pos_infs,
1810                    neg_infs,
1811                    nans,
1812                    non_nulls: Diff::ONE,
1813                }
1814            }
1815            Datum::Null => Accum::Numeric {
1816                accum: OrderedDecimal(NumericAgg::zero()),
1817                pos_infs: Diff::ZERO,
1818                neg_infs: Diff::ZERO,
1819                nans: Diff::ZERO,
1820                non_nulls: Diff::ZERO,
1821            },
1822            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1823        },
1824        _ => {
1825            // Other accumulations need to disentangle the accumulable
1826            // value from its NULL-ness, which is not quite as easily
1827            // accumulated.
1828            match datum {
1829                Datum::Int16(i) => Accum::SimpleNumber {
1830                    accum: i.into(),
1831                    non_nulls: Diff::ONE,
1832                },
1833                Datum::Int32(i) => Accum::SimpleNumber {
1834                    accum: i.into(),
1835                    non_nulls: Diff::ONE,
1836                },
1837                Datum::Int64(i) => Accum::SimpleNumber {
1838                    accum: i.into(),
1839                    non_nulls: Diff::ONE,
1840                },
1841                Datum::UInt16(u) => Accum::SimpleNumber {
1842                    accum: u.into(),
1843                    non_nulls: Diff::ONE,
1844                },
1845                Datum::UInt32(u) => Accum::SimpleNumber {
1846                    accum: u.into(),
1847                    non_nulls: Diff::ONE,
1848                },
1849                Datum::UInt64(u) => Accum::SimpleNumber {
1850                    accum: u.into(),
1851                    non_nulls: Diff::ONE,
1852                },
1853                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1854                    accum: u64::from(t).into(),
1855                    non_nulls: Diff::ONE,
1856                },
1857                Datum::Null => Accum::SimpleNumber {
1858                    accum: AccumCount::ZERO,
1859                    non_nulls: Diff::ZERO,
1860                },
1861                x => panic!("Accumulating non-integer data: {x:?}"),
1862            }
1863        }
1864    }
1865}
1866
1867fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1868    // The finished value depends on the aggregation function in a variety of ways.
1869    // For all aggregates but count, if only null values were
1870    // accumulated, then the output is null.
1871    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1872        Datum::Null
1873    } else {
1874        match (&aggr_func, &accum) {
1875            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1876                Datum::Int64(non_nulls.into_inner())
1877            }
1878            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1879                // If any false, else if all true, else must be no false and some nulls.
1880                if falses.is_positive() {
1881                    Datum::False
1882                } else if *trues == total {
1883                    Datum::True
1884                } else {
1885                    Datum::Null
1886                }
1887            }
1888            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1889                // If any true, else if all false, else must be no true and some nulls.
1890                if trues.is_positive() {
1891                    Datum::True
1892                } else if *falses == total {
1893                    Datum::False
1894                } else {
1895                    Datum::Null
1896                }
1897            }
1898            (AggregateFunc::Dummy, _) => Datum::Dummy,
1899            // If any non-nulls, just report the aggregate.
1900            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1901            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1902                // This conversion is safe, as long as we have less than 2^32
1903                // summands.
1904                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1905                // If so, rewrite to avoid `as`.
1906                #[allow(clippy::as_conversions)]
1907                Datum::Int64(accum.into_inner() as i64)
1908            }
1909            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1910            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1911            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1912                if !accum.is_negative() {
1913                    // Our semantics of overflow are not clearly articulated wrt.
1914                    // unsigned vs. signed types (database-issues#5172). We adopt an
1915                    // unsigned wrapping behavior to match what we do above for
1916                    // signed types.
1917                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1918                    #[allow(clippy::as_conversions)]
1919                    Datum::UInt64(accum.into_inner() as u64)
1920                } else {
1921                    // Note that we return a value here, but an error in the other
1922                    // operator of the reduce_pair. Therefore, we expect that this
1923                    // value will never be exposed as an output.
1924                    Datum::Null
1925                }
1926            }
1927            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1928                if !accum.is_negative() {
1929                    Datum::from(*accum)
1930                } else {
1931                    // Note that we return a value here, but an error in the other
1932                    // operator of the reduce_pair. Therefore, we expect that this
1933                    // value will never be exposed as an output.
1934                    Datum::Null
1935                }
1936            }
1937            (
1938                AggregateFunc::SumFloat32,
1939                Accum::Float {
1940                    accum,
1941                    pos_infs,
1942                    neg_infs,
1943                    nans,
1944                    non_nulls: _,
1945                },
1946            ) => {
1947                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1948                    // NaNs are NaNs and cases where we've seen a
1949                    // mixture of positive and negative infinities.
1950                    Datum::from(f32::NAN)
1951                } else if pos_infs.is_positive() {
1952                    Datum::from(f32::INFINITY)
1953                } else if neg_infs.is_positive() {
1954                    Datum::from(f32::NEG_INFINITY)
1955                } else {
1956                    // TODO(benesch): remove potentially dangerous usage of `as`.
1957                    #[allow(clippy::as_conversions)]
1958                    {
1959                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1960                    }
1961                }
1962            }
1963            (
1964                AggregateFunc::SumFloat64,
1965                Accum::Float {
1966                    accum,
1967                    pos_infs,
1968                    neg_infs,
1969                    nans,
1970                    non_nulls: _,
1971                },
1972            ) => {
1973                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1974                    // NaNs are NaNs and cases where we've seen a
1975                    // mixture of positive and negative infinities.
1976                    Datum::from(f64::NAN)
1977                } else if pos_infs.is_positive() {
1978                    Datum::from(f64::INFINITY)
1979                } else if neg_infs.is_positive() {
1980                    Datum::from(f64::NEG_INFINITY)
1981                } else {
1982                    // TODO(benesch): remove potentially dangerous usage of `as`.
1983                    #[allow(clippy::as_conversions)]
1984                    {
1985                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1986                    }
1987                }
1988            }
1989            (
1990                AggregateFunc::SumNumeric,
1991                Accum::Numeric {
1992                    accum,
1993                    pos_infs,
1994                    neg_infs,
1995                    nans,
1996                    non_nulls: _,
1997                },
1998            ) => {
1999                let mut cx_datum = numeric::cx_datum();
2000                let d = cx_datum.to_width(accum.0);
2001                // Take a wide decimal (aggregator) into a
2002                // narrow decimal (datum). If this operation
2003                // overflows the datum, this new value will be
2004                // +/- infinity. However, the aggregator tracks
2005                // the amount of overflow, making it invertible.
2006                let inf_d = d.is_infinite();
2007                let neg_d = d.is_negative();
2008                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
2009                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
2010                if nans.is_positive() || (pos_inf && neg_inf) {
2011                    // NaNs are NaNs and cases where we've seen a
2012                    // mixture of positive and negative infinities.
2013                    Datum::from(Numeric::nan())
2014                } else if pos_inf {
2015                    Datum::from(Numeric::infinity())
2016                } else if neg_inf {
2017                    let mut cx = numeric::cx_datum();
2018                    let mut d = Numeric::infinity();
2019                    cx.neg(&mut d);
2020                    Datum::from(d)
2021                } else {
2022                    Datum::from(d)
2023                }
2024            }
2025            _ => panic!(
2026                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
2027                aggr_func
2028            ),
2029        }
2030    }
2031}
2032
2033/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
2034type AccumCount = mz_ore::Overflowing<i128>;
2035
2036/// Accumulates values for the various types of accumulable aggregations.
2037///
2038/// We assume that there are not more than 2^32 elements for the aggregation.
2039/// Thus we can perform a summation over i32 in an i64 accumulator
2040/// and not worry about exceeding its bounds.
2041///
2042/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
2043/// point representation has less precision than a double. It is entirely possible
2044/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
2045/// to preserve group guarantees.
2046#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2047enum Accum {
2048    /// Accumulates boolean values.
2049    Bool {
2050        /// The number of `true` values observed.
2051        trues: Diff,
2052        /// The number of `false` values observed.
2053        falses: Diff,
2054    },
2055    /// Accumulates simple numeric values.
2056    SimpleNumber {
2057        /// The accumulation of all non-NULL values observed.
2058        accum: AccumCount,
2059        /// The number of non-NULL values observed.
2060        non_nulls: Diff,
2061    },
2062    /// Accumulates float values.
2063    Float {
2064        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
2065        /// preserve associativity and commutativity
2066        accum: AccumCount,
2067        /// Counts +inf
2068        pos_infs: Diff,
2069        /// Counts -inf
2070        neg_infs: Diff,
2071        /// Counts NaNs
2072        nans: Diff,
2073        /// Counts non-NULL values
2074        non_nulls: Diff,
2075    },
2076    /// Accumulates arbitrary precision decimals.
2077    Numeric {
2078        /// Accumulates non-special values
2079        accum: OrderedDecimal<NumericAgg>,
2080        /// Counts +inf
2081        pos_infs: Diff,
2082        /// Counts -inf
2083        neg_infs: Diff,
2084        /// Counts NaNs
2085        nans: Diff,
2086        /// Counts non-NULL values
2087        non_nulls: Diff,
2088    },
2089}
2090
2091impl IsZero for Accum {
2092    fn is_zero(&self) -> bool {
2093        match self {
2094            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2095            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2096            Accum::Float {
2097                accum,
2098                pos_infs,
2099                neg_infs,
2100                nans,
2101                non_nulls,
2102            } => {
2103                accum.is_zero()
2104                    && pos_infs.is_zero()
2105                    && neg_infs.is_zero()
2106                    && nans.is_zero()
2107                    && non_nulls.is_zero()
2108            }
2109            Accum::Numeric {
2110                accum,
2111                pos_infs,
2112                neg_infs,
2113                nans,
2114                non_nulls,
2115            } => {
2116                accum.0.is_zero()
2117                    && pos_infs.is_zero()
2118                    && neg_infs.is_zero()
2119                    && nans.is_zero()
2120                    && non_nulls.is_zero()
2121            }
2122        }
2123    }
2124}
2125
2126impl Semigroup for Accum {
2127    fn plus_equals(&mut self, other: &Accum) {
2128        match (&mut *self, other) {
2129            (
2130                Accum::Bool { trues, falses },
2131                Accum::Bool {
2132                    trues: other_trues,
2133                    falses: other_falses,
2134                },
2135            ) => {
2136                *trues += other_trues;
2137                *falses += other_falses;
2138            }
2139            (
2140                Accum::SimpleNumber { accum, non_nulls },
2141                Accum::SimpleNumber {
2142                    accum: other_accum,
2143                    non_nulls: other_non_nulls,
2144                },
2145            ) => {
2146                *accum += other_accum;
2147                *non_nulls += other_non_nulls;
2148            }
2149            (
2150                Accum::Float {
2151                    accum,
2152                    pos_infs,
2153                    neg_infs,
2154                    nans,
2155                    non_nulls,
2156                },
2157                Accum::Float {
2158                    accum: other_accum,
2159                    pos_infs: other_pos_infs,
2160                    neg_infs: other_neg_infs,
2161                    nans: other_nans,
2162                    non_nulls: other_non_nulls,
2163                },
2164            ) => {
2165                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2166                    warn!("Float accumulator overflow. Incorrect results possible");
2167                    accum.wrapping_add(*other_accum)
2168                });
2169                *pos_infs += other_pos_infs;
2170                *neg_infs += other_neg_infs;
2171                *nans += other_nans;
2172                *non_nulls += other_non_nulls;
2173            }
2174            (
2175                Accum::Numeric {
2176                    accum,
2177                    pos_infs,
2178                    neg_infs,
2179                    nans,
2180                    non_nulls,
2181                },
2182                Accum::Numeric {
2183                    accum: other_accum,
2184                    pos_infs: other_pos_infs,
2185                    neg_infs: other_neg_infs,
2186                    nans: other_nans,
2187                    non_nulls: other_non_nulls,
2188                },
2189            ) => {
2190                let mut cx_agg = numeric::cx_agg();
2191                cx_agg.add(&mut accum.0, &other_accum.0);
2192                // `rounded` signals we have exceeded the aggregator's max
2193                // precision, which means we've lost commutativity and
2194                // associativity; nothing to be done here, so panic. For more
2195                // context, see the DEC_Rounded definition at
2196                // http://speleotrove.com/decimal/dncont.html
2197                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2198                // Reduce to reclaim unused decimal precision. Note that this
2199                // reduction must happen somewhere to make the following
2200                // invertible:
2201                // ```
2202                // CREATE TABLE a (a numeric);
2203                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2204                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2205                // ```
2206                // This will now return infinity. However, we can retract the
2207                // value that blew up its precision:
2208                // ```
2209                // INSERT INTO a VALUES ('-9e-39');
2210                // ```
2211                // This leaves `t`'s aggregator with a value of 9e39. However,
2212                // without doing a reduction, `libdecnum` will store the value
2213                // as 9e39+0e-39, which still exceeds the narrower context's
2214                // precision. By doing the reduction, we can "reclaim" the 39
2215                // digits of precision.
2216                cx_agg.reduce(&mut accum.0);
2217                *pos_infs += other_pos_infs;
2218                *neg_infs += other_neg_infs;
2219                *nans += other_nans;
2220                *non_nulls += other_non_nulls;
2221            }
2222            (l, r) => unreachable!(
2223                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2224            ),
2225        }
2226    }
2227}
2228
2229impl Multiply<Diff> for Accum {
2230    type Output = Accum;
2231
2232    fn multiply(self, factor: &Diff) -> Accum {
2233        let factor = *factor;
2234        match self {
2235            Accum::Bool { trues, falses } => Accum::Bool {
2236                trues: trues * factor,
2237                falses: falses * factor,
2238            },
2239            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2240                accum: accum * AccumCount::from(factor),
2241                non_nulls: non_nulls * factor,
2242            },
2243            Accum::Float {
2244                accum,
2245                pos_infs,
2246                neg_infs,
2247                nans,
2248                non_nulls,
2249            } => Accum::Float {
2250                accum: accum
2251                    .checked_mul(AccumCount::from(factor))
2252                    .unwrap_or_else(|| {
2253                        warn!("Float accumulator overflow. Incorrect results possible");
2254                        accum.wrapping_mul(AccumCount::from(factor))
2255                    }),
2256                pos_infs: pos_infs * factor,
2257                neg_infs: neg_infs * factor,
2258                nans: nans * factor,
2259                non_nulls: non_nulls * factor,
2260            },
2261            Accum::Numeric {
2262                accum,
2263                pos_infs,
2264                neg_infs,
2265                nans,
2266                non_nulls,
2267            } => {
2268                let mut cx = numeric::cx_agg();
2269                let mut f = NumericAgg::from(factor.into_inner());
2270                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2271                // always be an integer, i.e. we are never increasing the
2272                // values' scale.
2273                cx.mul(&mut f, &accum.0);
2274                // `rounded` signals we have exceeded the aggregator's max
2275                // precision, which means we've lost commutativity and
2276                // associativity; nothing to be done here, so panic. For more
2277                // context, see the DEC_Rounded definition at
2278                // http://speleotrove.com/decimal/dncont.html
2279                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2280                Accum::Numeric {
2281                    accum: OrderedDecimal(f),
2282                    pos_infs: pos_infs * factor,
2283                    neg_infs: neg_infs * factor,
2284                    nans: nans * factor,
2285                    non_nulls: non_nulls * factor,
2286                }
2287            }
2288        }
2289    }
2290}
2291
2292impl Columnation for Accum {
2293    type InnerRegion = CopyRegion<Self>;
2294}
2295
2296/// Monoids for in-place compaction of monotonic streams.
2297mod monoids {
2298
2299    // We can improve the performance of some aggregations through the use of algebra.
2300    // In particular, we can move some of the aggregations in to the `diff` field of
2301    // updates, by changing `diff` from integers to a different algebraic structure.
2302    //
2303    // The one we use is called a "semigroup", and it means that the structure has a
2304    // symmetric addition operator. The trait we use also allows the semigroup elements
2305    // to present as "zero", meaning they always act as the identity under +. Here,
2306    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2307    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2308    // of DD, this Semigroup should _not_ have a zero.
2309    //
2310    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2311    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2312    // assumes this.
2313
2314    use differential_dataflow::containers::{Columnation, Region};
2315    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2316    use mz_expr::AggregateFunc;
2317    use mz_ore::soft_panic_or_log;
2318    use mz_repr::{Datum, Diff, Row};
2319    use serde::{Deserialize, Serialize};
2320
2321    /// A monoid containing a single-datum row.
2322    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2323    pub enum ReductionMonoid {
2324        Min(Row),
2325        Max(Row),
2326    }
2327
2328    impl ReductionMonoid {
2329        pub fn finalize(&self) -> &Row {
2330            use ReductionMonoid::*;
2331            match self {
2332                Min(row) | Max(row) => row,
2333            }
2334        }
2335    }
2336
2337    impl Clone for ReductionMonoid {
2338        fn clone(&self) -> Self {
2339            use ReductionMonoid::*;
2340            match self {
2341                Min(row) => Min(row.clone()),
2342                Max(row) => Max(row.clone()),
2343            }
2344        }
2345
2346        fn clone_from(&mut self, source: &Self) {
2347            use ReductionMonoid::*;
2348
2349            let mut row = std::mem::take(match self {
2350                Min(row) | Max(row) => row,
2351            });
2352
2353            let source_row = match source {
2354                Min(row) | Max(row) => row,
2355            };
2356
2357            row.clone_from(source_row);
2358
2359            match source {
2360                Min(_) => *self = Min(row),
2361                Max(_) => *self = Max(row),
2362            }
2363        }
2364    }
2365
2366    impl Multiply<Diff> for ReductionMonoid {
2367        type Output = Self;
2368
2369        fn multiply(self, factor: &Diff) -> Self {
2370            // Multiplication in ReductionMonoid is idempotent, and
2371            // its users must ascertain its monotonicity beforehand
2372            // (typically with ensure_monotonic) since it has no zero
2373            // value for us to use here.
2374            assert!(factor.is_positive());
2375            self
2376        }
2377    }
2378
2379    impl Semigroup for ReductionMonoid {
2380        fn plus_equals(&mut self, rhs: &Self) {
2381            match (self, rhs) {
2382                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2383                    let swap = {
2384                        let lhs_val = lhs.unpack_first();
2385                        let rhs_val = rhs.unpack_first();
2386                        // Datum::Null is the identity, not a small element.
2387                        match (lhs_val, rhs_val) {
2388                            (_, Datum::Null) => false,
2389                            (Datum::Null, _) => true,
2390                            (lhs, rhs) => rhs < lhs,
2391                        }
2392                    };
2393                    if swap {
2394                        lhs.clone_from(rhs);
2395                    }
2396                }
2397                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2398                    let swap = {
2399                        let lhs_val = lhs.unpack_first();
2400                        let rhs_val = rhs.unpack_first();
2401                        // Datum::Null is the identity, not a large element.
2402                        match (lhs_val, rhs_val) {
2403                            (_, Datum::Null) => false,
2404                            (Datum::Null, _) => true,
2405                            (lhs, rhs) => rhs > lhs,
2406                        }
2407                    };
2408                    if swap {
2409                        lhs.clone_from(rhs);
2410                    }
2411                }
2412                (lhs, rhs) => {
2413                    soft_panic_or_log!(
2414                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2415                    );
2416                }
2417            }
2418        }
2419    }
2420
2421    impl IsZero for ReductionMonoid {
2422        fn is_zero(&self) -> bool {
2423            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2424            // DD uses true results of this method to make stuff disappear. This makes sense when
2425            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2426            // We don't want funny stuff, like disappearing, happening to reduction results even
2427            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2428            false
2429        }
2430    }
2431
2432    impl Columnation for ReductionMonoid {
2433        type InnerRegion = ReductionMonoidRegion;
2434    }
2435
2436    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2437    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2438    /// the former for simplicity reasons.
2439    #[derive(Default)]
2440    pub struct ReductionMonoidRegion {
2441        inner: <Row as Columnation>::InnerRegion,
2442    }
2443
2444    impl Region for ReductionMonoidRegion {
2445        type Item = ReductionMonoid;
2446
2447        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2448            use ReductionMonoid::*;
2449            match item {
2450                Min(row) => Min(unsafe { self.inner.copy(row) }),
2451                Max(row) => Max(unsafe { self.inner.copy(row) }),
2452            }
2453        }
2454
2455        fn clear(&mut self) {
2456            self.inner.clear();
2457        }
2458
2459        fn reserve_items<'a, I>(&mut self, items: I)
2460        where
2461            Self: 'a,
2462            I: Iterator<Item = &'a Self::Item> + Clone,
2463        {
2464            self.inner
2465                .reserve_items(items.map(ReductionMonoid::finalize));
2466        }
2467
2468        fn reserve_regions<'a, I>(&mut self, regions: I)
2469        where
2470            Self: 'a,
2471            I: Iterator<Item = &'a Self> + Clone,
2472        {
2473            self.inner.reserve_regions(regions.map(|r| &r.inner));
2474        }
2475
2476        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2477            self.inner.heap_size(callback);
2478        }
2479    }
2480
2481    /// Get the correct monoid implementation for a given aggregation function. Note that
2482    /// all hierarchical aggregation functions need to supply a monoid implementation.
2483    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2484        match func {
2485            AggregateFunc::MaxNumeric
2486            | AggregateFunc::MaxInt16
2487            | AggregateFunc::MaxInt32
2488            | AggregateFunc::MaxInt64
2489            | AggregateFunc::MaxUInt16
2490            | AggregateFunc::MaxUInt32
2491            | AggregateFunc::MaxUInt64
2492            | AggregateFunc::MaxMzTimestamp
2493            | AggregateFunc::MaxFloat32
2494            | AggregateFunc::MaxFloat64
2495            | AggregateFunc::MaxBool
2496            | AggregateFunc::MaxString
2497            | AggregateFunc::MaxDate
2498            | AggregateFunc::MaxTimestamp
2499            | AggregateFunc::MaxTimestampTz
2500            | AggregateFunc::MaxInterval
2501            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2502            AggregateFunc::MinNumeric
2503            | AggregateFunc::MinInt16
2504            | AggregateFunc::MinInt32
2505            | AggregateFunc::MinInt64
2506            | AggregateFunc::MinUInt16
2507            | AggregateFunc::MinUInt32
2508            | AggregateFunc::MinUInt64
2509            | AggregateFunc::MinMzTimestamp
2510            | AggregateFunc::MinFloat32
2511            | AggregateFunc::MinFloat64
2512            | AggregateFunc::MinBool
2513            | AggregateFunc::MinString
2514            | AggregateFunc::MinDate
2515            | AggregateFunc::MinTimestamp
2516            | AggregateFunc::MinTimestampTz
2517            | AggregateFunc::MinInterval
2518            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2519            AggregateFunc::SumInt16
2520            | AggregateFunc::SumInt32
2521            | AggregateFunc::SumInt64
2522            | AggregateFunc::SumUInt16
2523            | AggregateFunc::SumUInt32
2524            | AggregateFunc::SumUInt64
2525            | AggregateFunc::SumFloat32
2526            | AggregateFunc::SumFloat64
2527            | AggregateFunc::SumNumeric
2528            | AggregateFunc::Count
2529            | AggregateFunc::Any
2530            | AggregateFunc::All
2531            | AggregateFunc::Dummy
2532            | AggregateFunc::JsonbAgg { .. }
2533            | AggregateFunc::JsonbObjectAgg { .. }
2534            | AggregateFunc::MapAgg { .. }
2535            | AggregateFunc::ArrayConcat { .. }
2536            | AggregateFunc::ListConcat { .. }
2537            | AggregateFunc::StringAgg { .. }
2538            | AggregateFunc::RowNumber { .. }
2539            | AggregateFunc::Rank { .. }
2540            | AggregateFunc::DenseRank { .. }
2541            | AggregateFunc::LagLead { .. }
2542            | AggregateFunc::FirstValue { .. }
2543            | AggregateFunc::LastValue { .. }
2544            | AggregateFunc::WindowAggregate { .. }
2545            | AggregateFunc::FusedValueWindowFunc { .. }
2546            | AggregateFunc::FusedWindowAggregate { .. } => None,
2547        }
2548    }
2549}
2550
2551mod window_agg_helpers {
2552    use crate::render::reduce::*;
2553
2554    /// TODO: It would be better for performance to do the branching that is in the methods of this
2555    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2556    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2557    /// directly.
2558    pub enum OneByOneAggrImpls {
2559        Accumulable(AccumulableOneByOneAggr),
2560        Hierarchical(HierarchicalOneByOneAggr),
2561        Basic(mz_expr::NaiveOneByOneAggr),
2562    }
2563
2564    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2565        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2566            match reduction_type(agg) {
2567                ReductionType::Basic => {
2568                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2569                }
2570                ReductionType::Accumulable => {
2571                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2572                }
2573                ReductionType::Hierarchical => {
2574                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2575                }
2576            }
2577        }
2578
2579        fn give(&mut self, d: &Datum) {
2580            match self {
2581                OneByOneAggrImpls::Basic(i) => i.give(d),
2582                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2583                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2584            }
2585        }
2586
2587        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2588            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2589            match self {
2590                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2591                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2592                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2593            }
2594        }
2595    }
2596
2597    pub struct AccumulableOneByOneAggr {
2598        aggr_func: AggregateFunc,
2599        accum: Accum,
2600        total: Diff,
2601    }
2602
2603    impl AccumulableOneByOneAggr {
2604        fn new(aggr_func: &AggregateFunc) -> Self {
2605            AccumulableOneByOneAggr {
2606                aggr_func: aggr_func.clone(),
2607                accum: accumulable_zero(aggr_func),
2608                total: Diff::ZERO,
2609            }
2610        }
2611
2612        fn give(&mut self, d: &Datum) {
2613            self.accum
2614                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2615            self.total += Diff::ONE;
2616        }
2617
2618        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2619            temp_storage.make_datum(|packer| {
2620                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2621            })
2622        }
2623    }
2624
2625    pub struct HierarchicalOneByOneAggr {
2626        aggr_func: AggregateFunc,
2627        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2628        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2629        monoid: ReductionMonoid,
2630    }
2631
2632    impl HierarchicalOneByOneAggr {
2633        fn new(aggr_func: &AggregateFunc) -> Self {
2634            let mut row_buf = Row::default();
2635            row_buf.packer().push(Datum::Null);
2636            HierarchicalOneByOneAggr {
2637                aggr_func: aggr_func.clone(),
2638                monoid: get_monoid(row_buf, aggr_func)
2639                    .expect("aggr_func should be a hierarchical aggregation function"),
2640            }
2641        }
2642
2643        fn give(&mut self, d: &Datum) {
2644            let mut row_buf = Row::default();
2645            row_buf.packer().push(d);
2646            let m = get_monoid(row_buf, &self.aggr_func)
2647                .expect("aggr_func should be a hierarchical aggregation function");
2648            self.monoid.plus_equals(&m);
2649        }
2650
2651        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2652            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2653        }
2654    }
2655}