Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Data, VecCollection};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58    ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59    RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64    G: Scope,
65    G::Timestamp: MzTimestamp + Refines<T>,
66    T: MzTimestamp,
67{
68    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
69    /// minimize worst-case incremental update times and memory footprint.
70    pub fn render_reduce(
71        &self,
72        input_key: Option<Vec<MirScalarExpr>>,
73        input: CollectionBundle<G, T>,
74        key_val_plan: KeyValPlan,
75        reduce_plan: ReducePlan,
76        mfp_after: Option<MapFilterProject>,
77    ) -> CollectionBundle<G, T> {
78        // Convert `mfp_after` to an actionable plan.
79        let mfp_after = mfp_after.map(|m| {
80            m.into_plan()
81                .expect("MFP planning must succeed")
82                .into_nontemporal()
83                .expect("Fused Reduce MFPs do not have temporal predicates")
84        });
85
86        input.scope().region_named("Reduce", |inner| {
87            let KeyValPlan {
88                mut key_plan,
89                mut val_plan,
90            } = key_val_plan;
91            let key_arity = key_plan.projection.len();
92            let mut datums = DatumVec::new();
93
94            // Determine the columns we'll need from the row.
95            let mut demand = Vec::new();
96            demand.extend(key_plan.demand());
97            demand.extend(val_plan.demand());
98            demand.sort();
99            demand.dedup();
100
101            // remap column references to the subset we use.
102            let mut demand_map = BTreeMap::new();
103            for column in demand.iter() {
104                demand_map.insert(*column, demand_map.len());
105            }
106            let demand_map_len = demand_map.len();
107            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113                input_key.map(|k| (k, None)),
114                max_demand,
115                move |row_datums, time, diff| {
116                    let mut row_builder = SharedRow::get();
117                    let temp_storage = RowArena::new();
118
119                    let mut row_iter = row_datums.drain(..);
120                    let mut datums_local = datums.borrow();
121                    // Unpack only the demanded columns.
122                    for skip in skips.iter() {
123                        datums_local.push(row_iter.nth(*skip).unwrap());
124                    }
125
126                    // Evaluate the key expressions.
127                    let key =
128                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129                    let key = match key {
130                        Err(e) => {
131                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132                        }
133                        Ok(Some(key)) => key.clone(),
134                        Ok(None) => panic!("Row expected as no predicate was used"),
135                    };
136
137                    // Evaluate the value expressions.
138                    // The prior evaluation may have left additional columns we should delete.
139                    datums_local.truncate(skips.len());
140                    let val =
141                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142                    let val = match val {
143                        Err(e) => {
144                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145                        }
146                        Ok(Some(val)) => val.clone(),
147                        Ok(None) => panic!("Row expected as no predicate was used"),
148                    };
149
150                    Some((Ok((key, val)), time.clone(), diff.clone()))
151                },
152            );
153
154            // Demux out the potential errors from key and value selector evaluation.
155            type CB<T> = ConsolidatingContainerBuilder<T>;
156            let (ok, mut err) = key_val_input
157                .as_collection()
158                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160            err = err.concat(err_input);
161
162            // Render the reduce plan
163            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164                .leave_region()
165        })
166    }
167
168    /// Render a dataflow based on the provided plan.
169    ///
170    /// The output will be an arrangements that looks the same as if
171    /// we just had a single reduce operator computing everything together, and
172    /// this arrangement can also be re-used.
173    fn render_reduce_plan<S>(
174        &self,
175        plan: ReducePlan,
176        collection: VecCollection<S, (Row, Row), Diff>,
177        err_input: VecCollection<S, DataflowError, Diff>,
178        key_arity: usize,
179        mfp_after: Option<SafeMfpPlan>,
180    ) -> CollectionBundle<S, T>
181    where
182        S: Scope<Timestamp = G::Timestamp>,
183    {
184        let mut errors = Default::default();
185        let arrangement =
186            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188        CollectionBundle::from_columns(
189            0..key_arity,
190            ArrangementFlavor::Local(
191                arrangement,
192                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193            ),
194        )
195    }
196
197    fn render_reduce_plan_inner<S>(
198        &self,
199        plan: ReducePlan,
200        collection: VecCollection<S, (Row, Row), Diff>,
201        errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
202        key_arity: usize,
203        mfp_after: Option<SafeMfpPlan>,
204    ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205    where
206        S: Scope<Timestamp = G::Timestamp>,
207    {
208        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
209        // not only values (database-issues#6658).
210        let arrangement = match plan {
211            // If we have no aggregations or just a single type of reduction, we
212            // can go ahead and render them directly.
213            ReducePlan::Distinct => {
214                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215                errors.push(errs);
216                arranged_output
217            }
218            ReducePlan::Accumulable(expr) => {
219                let (arranged_output, errs) =
220                    self.build_accumulable(collection, expr, key_arity, mfp_after);
221                errors.push(errs);
222                arranged_output
223            }
224            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226                errors.push(errs);
227                output
228            }
229            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231                errors.push(errs);
232                output
233            }
234            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235                expr,
236                fused_unnest_list,
237            })) => {
238                // Note that we skip validating for negative diffs when we have a fused unnest list,
239                // because this is already a CPU-intensive situation due to the non-incrementalness
240                // of window functions.
241                let validating = !fused_unnest_list;
242                let (output, errs) = self.build_basic_aggregate(
243                    collection,
244                    0,
245                    &expr,
246                    validating,
247                    key_arity,
248                    mfp_after,
249                    fused_unnest_list,
250                );
251                if validating {
252                    errors.push(errs.expect("validation should have occurred as it was requested"));
253                }
254                output
255            }
256            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257                let (output, errs) =
258                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259                errors.push(errs);
260                output
261            }
262        };
263        arrangement
264    }
265
266    /// Build the dataflow to compute the set of distinct keys.
267    fn build_distinct<S>(
268        &self,
269        collection: VecCollection<S, (Row, Row), Diff>,
270        mfp_after: Option<SafeMfpPlan>,
271    ) -> (
272        Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
273        VecCollection<S, DataflowError, Diff>,
274    )
275    where
276        S: Scope<Timestamp = G::Timestamp>,
277    {
278        let error_logger = self.error_logger();
279
280        // Allocations for the two closures.
281        let mut datums1 = DatumVec::new();
282        let mut datums2 = DatumVec::new();
283        let mfp_after1 = mfp_after.clone();
284        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
285
286        let (output, errors) = collection
287            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
288                "Arranged DistinctBy",
289            )
290            .reduce_pair::<
291                _,
292                RowRowBuilder<_, _>,
293                RowRowSpine<_, _>,
294                _,
295                RowErrBuilder<_, _>,
296                RowErrSpine<_, _>,
297            >(
298                "DistinctBy",
299                "DistinctByErrorCheck",
300                move |key, _input, output| {
301                    let temp_storage = RowArena::new();
302                    let mut datums_local = datums1.borrow();
303                    datums_local.extend(key.to_datum_iter());
304
305                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
306                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
307                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
308                    if mfp_after1
309                        .as_ref()
310                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
311                        .unwrap_or(Ok(true))
312                        == Ok(true)
313                    {
314                        // We're pushing a unit value here because the key is implicitly added by the
315                        // arrangement, and the permutation logic takes care of using the key part of the
316                        // output.
317                        output.push((Row::default(), Diff::ONE));
318                    }
319                },
320                move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
321                    for (_, count) in input.iter() {
322                        if count.is_positive() {
323                            continue;
324                        }
325                        let message = "Non-positive multiplicity in DistinctBy";
326                        error_logger.log(message, &format!("row={key:?}, count={count}"));
327                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
328                        return;
329                    }
330                    // If `mfp_after` can error, then evaluate it here.
331                    let Some(mfp) = &mfp_after2 else { return };
332                    let temp_storage = RowArena::new();
333                    let datum_iter = key.to_datum_iter();
334                    let mut datums_local = datums2.borrow();
335                    datums_local.extend(datum_iter);
336
337                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
338                        output.push((e.into(), Diff::ONE));
339                    }
340                },
341            );
342        (output, errors.as_collection(|_k, v| v.clone()))
343    }
344
345    /// Build the dataflow to compute and arrange multiple non-accumulable,
346    /// non-hierarchical aggregations on `input`.
347    ///
348    /// This function assumes that we are explicitly rendering multiple basic aggregations.
349    /// For each aggregate, we render a different reduce operator, and then fuse
350    /// results together into a final arrangement that presents all the results
351    /// in the order specified by `aggrs`.
352    fn build_basic_aggregates<S>(
353        &self,
354        input: VecCollection<S, (Row, Row), Diff>,
355        aggrs: Vec<AggregateExpr>,
356        key_arity: usize,
357        mfp_after: Option<SafeMfpPlan>,
358    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
359    where
360        S: Scope<Timestamp = G::Timestamp>,
361    {
362        // We are only using this function to render multiple basic aggregates and
363        // stitch them together. If that's not true we should complain.
364        if aggrs.len() <= 1 {
365            self.error_logger().soft_panic_or_log(
366                "Too few aggregations when building basic aggregates",
367                &format!("len={}", aggrs.len()),
368            )
369        }
370        let mut err_output = None;
371        let mut to_collect = Vec::new();
372        for (index, aggr) in aggrs.into_iter().enumerate() {
373            let (result, errs) = self.build_basic_aggregate(
374                input.clone(),
375                index,
376                &aggr,
377                err_output.is_none(),
378                key_arity,
379                None,
380                false,
381            );
382            if errs.is_some() {
383                err_output = errs
384            }
385            to_collect
386                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
387        }
388
389        // Allocations for the two closures.
390        let mut datums1 = DatumVec::new();
391        let mut datums2 = DatumVec::new();
392        let mfp_after1 = mfp_after.clone();
393        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
394
395        let arranged =
396            differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
397                .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
398                "Arranged ReduceFuseBasic input",
399            );
400
401        let output = arranged
402            .clone()
403            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
404                move |key, input, output| {
405                    let temp_storage = RowArena::new();
406                    let datum_iter = key.to_datum_iter();
407                    let mut datums_local = datums1.borrow();
408                    datums_local.extend(datum_iter);
409                    let key_len = datums_local.len();
410
411                    for ((_, row), _) in input.iter() {
412                        datums_local.push(row.unpack_first());
413                    }
414
415                    if let Some(row) =
416                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
417                    {
418                        output.push((row, Diff::ONE));
419                    }
420                }
421            });
422        // If `mfp_after` can error, then we need to render a paired reduction
423        // to scan for these potential errors. Note that we cannot directly use
424        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
425        // conditionally render the second component of the reduction pair.
426        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
427        if let Some(mfp) = mfp_after2 {
428            let mfp_errs = arranged
429                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
430                    "ReduceFuseBasic Error Check",
431                    move |key, input, output| {
432                        // Since negative accumulations are checked in at least one component
433                        // aggregate, we only need to look for MFP errors here.
434                        let temp_storage = RowArena::new();
435                        let datum_iter = key.to_datum_iter();
436                        let mut datums_local = datums2.borrow();
437                        datums_local.extend(datum_iter);
438
439                        for ((_, row), _) in input.iter() {
440                            datums_local.push(row.unpack_first());
441                        }
442
443                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
444                            output.push((e.into(), Diff::ONE));
445                        }
446                    },
447                )
448                .as_collection(|_, v| v.clone());
449            (output, validation_errs.concat(mfp_errs))
450        } else {
451            (output, validation_errs)
452        }
453    }
454
455    /// Build the dataflow to compute a single basic aggregation.
456    ///
457    /// This method also applies distinctness if required.
458    fn build_basic_aggregate<S>(
459        &self,
460        input: VecCollection<S, (Row, Row), Diff>,
461        index: usize,
462        aggr: &AggregateExpr,
463        validating: bool,
464        key_arity: usize,
465        mfp_after: Option<SafeMfpPlan>,
466        fused_unnest_list: bool,
467    ) -> (
468        RowRowArrangement<S>,
469        Option<VecCollection<S, DataflowError, Diff>>,
470    )
471    where
472        S: Scope<Timestamp = G::Timestamp>,
473    {
474        let AggregateExpr {
475            func,
476            expr: _,
477            distinct,
478        } = aggr.clone();
479
480        // Extract the value we were asked to aggregate over.
481        let mut partial = input.map(move |(key, row)| {
482            let mut row_builder = SharedRow::get();
483            let value = row.iter().nth(index).unwrap();
484            row_builder.packer().push(value);
485            (key, row_builder.clone())
486        });
487
488        let mut err_output = None;
489
490        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
491        if distinct {
492            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
493            let pairer = Pairer::new(key_arity);
494            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
495            if validating {
496                let (oks, errs) = self
497                    .build_reduce_inaccumulable_distinct::<
498                        _,
499                        RowValBuilder<Result<(), String>, _, _>,
500                        RowValSpine<Result<(), String>, _, _>,
501                    >(keyed, None)
502                    .as_collection(|k, v| {
503                        (
504                            k.to_row(),
505                            v.as_ref()
506                                .map(|&()| ())
507                                .map_err(|m| m.as_str().into()),
508                        )
509                    })
510                    .map_fallible::<
511                        CapacityContainerBuilder<_>,
512                        CapacityContainerBuilder<_>,
513                        _,
514                        _,
515                        _,
516                    >(
517                        "Demux Errors",
518                        move |(key_val, result)| match result {
519                            Ok(()) => Ok(pairer.split(&key_val)),
520                            Err(m) => {
521                                Err(EvalError::Internal(m).into())
522                            }
523                        },
524                    );
525                err_output = Some(errs);
526                partial = oks;
527            } else {
528                partial = self
529                    .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
530                        keyed,
531                        Some(" [val: empty]"),
532                    )
533                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
534            }
535        }
536
537        // Allocations for the two closures.
538        let mut datums1 = DatumVec::new();
539        let mut datums2 = DatumVec::new();
540        let mut datums_key_1 = DatumVec::new();
541        let mut datums_key_2 = DatumVec::new();
542        let mfp_after1 = mfp_after.clone();
543        let func2 = func.clone();
544
545        let name = if !fused_unnest_list {
546            "ReduceInaccumulable"
547        } else {
548            "FusedReduceUnnestList"
549        };
550        let arranged = partial
551            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
552                "Arranged {name}"
553            ));
554        let oks = if !fused_unnest_list {
555            arranged
556                .clone()
557                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
558                    move |key, source, target| {
559                        // We respect the multiplicity here (unlike in hierarchical aggregation)
560                        // because we don't know that the aggregation method is not sensitive
561                        // to the number of records.
562                        let iter = source.iter().flat_map(|(v, w)| {
563                            // Note that in the non-positive case, this is wrong, but harmless because
564                            // our other reduction will produce an error.
565                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
566                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
567                        });
568
569                        let temp_storage = RowArena::new();
570                        let datum_iter = key.to_datum_iter();
571                        let mut datums_local = datums1.borrow();
572                        datums_local.extend(datum_iter);
573                        let key_len = datums_local.len();
574                        datums_local.push(
575                        // Note that this is not necessarily a window aggregation, in which case
576                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
577                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
578                            iter,
579                            &temp_storage,
580                        ),
581                    );
582
583                        if let Some(row) = evaluate_mfp_after(
584                            &mfp_after1,
585                            &mut datums_local,
586                            &temp_storage,
587                            key_len,
588                        ) {
589                            target.push((row, Diff::ONE));
590                        }
591                    }
592                })
593        } else {
594            arranged
595                .clone()
596                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
597                    move |key, source, target| {
598                        // This part is the same as in the `!fused_unnest_list` if branch above.
599                        let iter = source.iter().flat_map(|(v, w)| {
600                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
601                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
602                        });
603
604                        // This is the part that is specific to the `fused_unnest_list` branch.
605                        let temp_storage = RowArena::new();
606                        let mut datums_local = datums_key_1.borrow();
607                        datums_local.extend(key.to_datum_iter());
608                        let key_len = datums_local.len();
609                        for datum in func
610                            .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
611                                iter,
612                                &temp_storage,
613                            )
614                        {
615                            datums_local.truncate(key_len);
616                            datums_local.push(datum);
617                            if let Some(row) = evaluate_mfp_after(
618                                &mfp_after1,
619                                &mut datums_local,
620                                &temp_storage,
621                                key_len,
622                            ) {
623                                target.push((row, Diff::ONE));
624                            }
625                        }
626                    }
627                })
628        };
629
630        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
631        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
632        // rationale around using a second reduction here.
633        let must_validate = validating && err_output.is_none();
634        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
635        if must_validate || mfp_after2.is_some() {
636            let error_logger = self.error_logger();
637
638            let errs = if !fused_unnest_list {
639                arranged
640                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
641                        &format!("{name} Error Check"),
642                        move |key, source, target| {
643                            // Negative counts would be surprising, but until we are 100% certain we won't
644                            // see them, we should report when we do. We may want to bake even more info
645                            // in here in the future.
646                            if must_validate {
647                                for (value, count) in source.iter() {
648                                    if count.is_positive() {
649                                        continue;
650                                    }
651                                    let value = value.to_row();
652                                    let message =
653                                        "Non-positive accumulation in ReduceInaccumulable";
654                                    error_logger
655                                        .log(message, &format!("value={value:?}, count={count}"));
656                                    let err = EvalError::Internal(message.into());
657                                    target.push((err.into(), Diff::ONE));
658                                    return;
659                                }
660                            }
661
662                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
663                            let Some(mfp) = &mfp_after2 else { return };
664                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
665                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
666                                // This would ideally use `to_datum_iter` but we cannot as it needs to
667                                // borrow `v` and only presents datums with that lifetime, not any longer.
668                                std::iter::repeat(v.next().unwrap()).take(count)
669                            });
670
671                            let temp_storage = RowArena::new();
672                            let datum_iter = key.to_datum_iter();
673                            let mut datums_local = datums2.borrow();
674                            datums_local.extend(datum_iter);
675                            datums_local.push(
676                                func2.eval_with_fast_window_agg::<
677                                    _,
678                                    window_agg_helpers::OneByOneAggrImpls,
679                                >(
680                                    iter, &temp_storage
681                                ),
682                            );
683                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
684                                target.push((e.into(), Diff::ONE));
685                            }
686                        },
687                    )
688                    .as_collection(|_, v| v.clone())
689            } else {
690                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
691                assert!(!must_validate);
692                // We couldn't have got into this if branch due to `must_validate`, so it must be
693                // because of the `mfp_after2.is_some()`.
694                let Some(mfp) = mfp_after2 else {
695                    unreachable!()
696                };
697                arranged
698                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
699                        &format!("{name} Error Check"),
700                        move |key, source, target| {
701                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
702                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
703                                // This would ideally use `to_datum_iter` but we cannot as it needs to
704                                // borrow `v` and only presents datums with that lifetime, not any longer.
705                                std::iter::repeat(v.next().unwrap()).take(count)
706                            });
707
708                            let temp_storage = RowArena::new();
709                            let mut datums_local = datums_key_2.borrow();
710                            datums_local.extend(key.to_datum_iter());
711                            let key_len = datums_local.len();
712                            for datum in func2
713                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
714                                    iter,
715                                    &temp_storage,
716                                )
717                            {
718                                datums_local.truncate(key_len);
719                                datums_local.push(datum);
720                                // We know that `mfp` can error (because of the `could_error` call
721                                // above), so try to evaluate it here.
722                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
723                                {
724                                    target.push((e.into(), Diff::ONE));
725                                }
726                            }
727                        },
728                    )
729                    .as_collection(|_, v| v.clone())
730            };
731
732            if let Some(e) = err_output {
733                err_output = Some(e.concat(errs));
734            } else {
735                err_output = Some(errs);
736            }
737        }
738        (oks, err_output)
739    }
740
741    fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
742        &self,
743        input: VecCollection<S, Row, Diff>,
744        name_tag: Option<&str>,
745    ) -> Arranged<S, TraceAgent<Tr>>
746    where
747        S: Scope<Timestamp = G::Timestamp>,
748        Tr: for<'a> Trace<
749                Key<'a> = DatumSeq<'a>,
750                KeyOwn = Row,
751                Time = G::Timestamp,
752                Diff = Diff,
753                ValOwn: Data + MaybeValidatingRow<(), String>,
754            > + 'static,
755        Bu: Builder<
756                Time = G::Timestamp,
757                Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
758                Output = Tr::Batch,
759            >,
760        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
761    {
762        let error_logger = self.error_logger();
763
764        let output_name = format!(
765            "ReduceInaccumulable Distinct{}",
766            name_tag.unwrap_or_default()
767        );
768
769        let input: KeyCollection<_, _, _> = input.into();
770        input
771            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
772                "Arranged ReduceInaccumulable Distinct [val: empty]",
773            )
774            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
775                if let Some(err) = Tr::ValOwn::into_error() {
776                    for (value, count) in source.iter() {
777                        if count.is_positive() {
778                            continue;
779                        }
780
781                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
782                        error_logger.log(message, &format!("value={value:?}, count={count}"));
783                        t.push((err(message.to_string()), Diff::ONE));
784                        return;
785                    }
786                }
787                t.push((Tr::ValOwn::ok(()), Diff::ONE))
788            })
789    }
790
791    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
792    /// on non-monotonic inputs.
793    ///
794    /// This function renders a single reduction tree that computes aggregations with
795    /// a priority queue implemented with a series of reduce operators that partition
796    /// the input into buckets, and compute the aggregation over very small buckets
797    /// and feed the results up to larger buckets.
798    ///
799    /// Note that this implementation currently ignores the distinct bit because we
800    /// currently only perform min / max hierarchically and the reduction tree
801    /// efficiently suppresses non-distinct updates.
802    ///
803    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
804    /// trickery here to limit the memory usage per layer by internally
805    /// holding only the elements that were rejected by this stage. However, the
806    /// output collection maintains the `((key, bucket), (passing value)` for this
807    /// stage.
808    fn build_bucketed<S>(
809        &self,
810        input: VecCollection<S, (Row, Row), Diff>,
811        BucketedPlan {
812            aggr_funcs,
813            buckets,
814        }: BucketedPlan,
815        key_arity: usize,
816        mfp_after: Option<SafeMfpPlan>,
817    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
818    where
819        S: Scope<Timestamp = G::Timestamp>,
820    {
821        let mut err_output: Option<VecCollection<S, _, _>> = None;
822        let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
823            let input = input.enter(inner);
824
825            // The first mod to apply to the hash.
826            let first_mod = buckets.get(0).copied().unwrap_or(1);
827            let aggregations = aggr_funcs.len();
828
829            // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
830            let mut stage = input.map(move |(key, row)| {
831                let mut row_builder = SharedRow::get();
832                let mut row_packer = row_builder.packer();
833                row_packer.extend(row.iter().take(aggregations));
834                let values = row_builder.clone();
835
836                // Apply the initial mod here.
837                let hash = values.hashed() % first_mod;
838                let hash_key =
839                    row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
840                (hash_key, values)
841            });
842
843            // Repeatedly apply hierarchical reduction with a progressively coarser key.
844            for (index, b) in buckets.into_iter().enumerate() {
845                // Apply subsequent bucket mods for all but the first round.
846                let input = if index == 0 {
847                    stage
848                } else {
849                    stage.map(move |(hash_key, values)| {
850                        let mut hash_key_iter = hash_key.iter();
851                        let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
852                        // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
853                        let hash_key = SharedRow::pack(
854                            std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
855                        );
856                        (hash_key, values)
857                    })
858                };
859
860                // We only want the first stage to perform validation of whether invalid accumulations
861                // were observed in the input. Subsequently, we will either produce an error in the error
862                // stream or produce correct data in the output stream.
863                let validating = err_output.is_none();
864
865                let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
866                if let Some(errs) = errs {
867                    err_output = Some(errs.leave_region());
868                }
869                stage = oks
870            }
871
872            // Discard the hash from the key and return to the format of the input data.
873            let partial = stage.map(move |(hash_key, values)| {
874                let mut hash_key_iter = hash_key.iter();
875                let _hash = hash_key_iter.next();
876                (SharedRow::pack(hash_key_iter.take(key_arity)), values)
877            });
878
879            // Allocations for the two closures.
880            let mut datums1 = DatumVec::new();
881            let mut datums2 = DatumVec::new();
882            let mfp_after1 = mfp_after.clone();
883            let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
884            let aggr_funcs2 = aggr_funcs.clone();
885
886            // Build a series of stages for the reduction
887            // Arrange the final result into (key, Row)
888            let error_logger = self.error_logger();
889            // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
890            // view mz_introspection.mz_expected_group_size_advice.
891            let arranged = partial
892                .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
893                    "Arrange ReduceMinsMaxes",
894                );
895            // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
896            // but we then wouldn't be able to do this error check conditionally.  See its documentation
897            // for the rationale around using a second reduction here.
898            let must_validate = err_output.is_none();
899            if must_validate || mfp_after2.is_some() {
900                let errs = arranged
901                    .clone()
902                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
903                        "ReduceMinsMaxes Error Check",
904                        move |key, source, target| {
905                            // Negative counts would be surprising, but until we are 100% certain we wont
906                            // see them, we should report when we do. We may want to bake even more info
907                            // in here in the future.
908                            if must_validate {
909                                for (val, count) in source.iter() {
910                                    if count.is_positive() {
911                                        continue;
912                                    }
913                                    let val = val.to_row();
914                                    let message = "Non-positive accumulation in ReduceMinsMaxes";
915                                    error_logger
916                                        .log(message, &format!("val={val:?}, count={count}"));
917                                    target.push((
918                                        EvalError::Internal(message.into()).into(),
919                                        Diff::ONE,
920                                    ));
921                                    return;
922                                }
923                            }
924
925                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
926                            let Some(mfp) = &mfp_after2 else { return };
927                            let temp_storage = RowArena::new();
928                            let datum_iter = key.to_datum_iter();
929                            let mut datums_local = datums2.borrow();
930                            datums_local.extend(datum_iter);
931
932                            let mut source_iters = source
933                                .iter()
934                                .map(|(values, _cnt)| *values)
935                                .collect::<Vec<_>>();
936                            for func in aggr_funcs2.iter() {
937                                let column_iter = (0..source_iters.len())
938                                    .map(|i| source_iters[i].next().unwrap());
939                                datums_local.push(func.eval(column_iter, &temp_storage));
940                            }
941                            if let Result::Err(e) =
942                                mfp.evaluate_inner(&mut datums_local, &temp_storage)
943                            {
944                                target.push((e.into(), Diff::ONE));
945                            }
946                        },
947                    )
948                    .as_collection(|_, v| v.clone())
949                    .leave_region();
950                if let Some(e) = err_output.take() {
951                    err_output = Some(e.concat(errs));
952                } else {
953                    err_output = Some(errs);
954                }
955            }
956            arranged
957                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
958                    "ReduceMinsMaxes",
959                    move |key, source, target| {
960                        let temp_storage = RowArena::new();
961                        let datum_iter = key.to_datum_iter();
962                        let mut datums_local = datums1.borrow();
963                        datums_local.extend(datum_iter);
964                        let key_len = datums_local.len();
965
966                        let mut source_iters = source
967                            .iter()
968                            .map(|(values, _cnt)| *values)
969                            .collect::<Vec<_>>();
970                        for func in aggr_funcs.iter() {
971                            let column_iter =
972                                (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
973                            datums_local.push(func.eval(column_iter, &temp_storage));
974                        }
975
976                        if let Some(row) = evaluate_mfp_after(
977                            &mfp_after1,
978                            &mut datums_local,
979                            &temp_storage,
980                            key_len,
981                        ) {
982                            target.push((row, Diff::ONE));
983                        }
984                    },
985                )
986                .leave_region()
987        });
988        (
989            arranged_output,
990            err_output.expect("expected to validate in one level of the hierarchy"),
991        )
992    }
993
994    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
995    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
996    /// with the negation of what's produced by the reduction.
997    /// `validating` indicates whether we want this stage to perform error detection
998    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
999    /// stages can skip validation.
1000    fn build_bucketed_stage<S>(
1001        &self,
1002        aggr_funcs: &Vec<AggregateFunc>,
1003        input: VecCollection<S, (Row, Row), Diff>,
1004        validating: bool,
1005    ) -> (
1006        VecCollection<S, (Row, Row), Diff>,
1007        Option<VecCollection<S, DataflowError, Diff>>,
1008    )
1009    where
1010        S: Scope<Timestamp = G::Timestamp>,
1011    {
1012        let (input, negated_output, errs) = if validating {
1013            let (input, reduced) = self
1014                .build_bucketed_negated_output::<
1015                    _,
1016                    RowValBuilder<_, _, _>,
1017                    RowValSpine<Result<Row, Row>, _, _>,
1018                >(
1019                    input.clone(),
1020                    aggr_funcs.clone(),
1021                );
1022            let (oks, errs) = reduced
1023                .as_collection(|k, v| (k.to_row(), v.clone()))
1024                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1025                "Checked Invalid Accumulations",
1026                |(hash_key, result)| match result {
1027                    Err(hash_key) => {
1028                        let mut hash_key_iter = hash_key.iter();
1029                        let _hash = hash_key_iter.next();
1030                        let key = SharedRow::pack(hash_key_iter);
1031                        let message = format!(
1032                            "Invalid data in source, saw non-positive accumulation \
1033                                         for key {key:?} in hierarchical mins-maxes aggregate"
1034                        );
1035                        Err(EvalError::Internal(message.into()).into())
1036                    }
1037                    Ok(values) => Ok((hash_key, values)),
1038                },
1039            );
1040            (input, oks, Some(errs))
1041        } else {
1042            let (input, reduced) = self
1043                .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1044                    input,
1045                    aggr_funcs.clone(),
1046                );
1047            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1048            // that we need to apply the mod on both input and oks.
1049            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1050            (input, oks, None)
1051        };
1052
1053        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1054        let oks = negated_output.concat(input);
1055        (oks, errs)
1056    }
1057
1058    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1059    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1060    /// with all diffs in the reduction's output negated.
1061    fn build_bucketed_negated_output<S, Bu, Tr>(
1062        &self,
1063        input: VecCollection<S, (Row, Row), Diff>,
1064        aggrs: Vec<AggregateFunc>,
1065    ) -> (
1066        Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1067        Arranged<S, TraceAgent<Tr>>,
1068    )
1069    where
1070        S: Scope<Timestamp = G::Timestamp>,
1071        Tr: for<'a> Trace<
1072                Key<'a> = DatumSeq<'a>,
1073                KeyOwn = Row,
1074                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1075                Time = G::Timestamp,
1076                Diff = Diff,
1077            > + 'static,
1078        Bu: Builder<
1079                Time = G::Timestamp,
1080                Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1081                Output = Tr::Batch,
1082            >,
1083        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1084    {
1085        let error_logger = self.error_logger();
1086        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1087        // view mz_introspection.mz_expected_group_size_advice.
1088        let arranged_input = input
1089            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1090                "Arranged MinsMaxesHierarchical input",
1091            );
1092
1093        let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1094            "Reduced Fallibly MinsMaxesHierarchical",
1095            move |key, source, target| {
1096                if let Some(err) = Tr::ValOwn::into_error() {
1097                    // Should negative accumulations reach us, we should loudly complain.
1098                    for (value, count) in source.iter() {
1099                        if count.is_positive() {
1100                            continue;
1101                        }
1102                        error_logger.log(
1103                            "Non-positive accumulation in MinsMaxesHierarchical",
1104                            &format!("key={key:?}, value={value:?}, count={count}"),
1105                        );
1106                        // After complaining, output an error here so that we can eventually
1107                        // report it in an error stream.
1108                        target.push((err(Tr::owned_key(key)), Diff::ONE));
1109                        return;
1110                    }
1111                }
1112
1113                let mut row_builder = SharedRow::get();
1114                let mut row_packer = row_builder.packer();
1115
1116                let mut source_iters = source
1117                    .iter()
1118                    .map(|(values, _cnt)| *values)
1119                    .collect::<Vec<_>>();
1120                for func in aggrs.iter() {
1121                    let column_iter =
1122                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1123                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1124                }
1125                // We only want to arrange the parts of the input that are not part of the output.
1126                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1127                // gives us the intended value of this aggregate function. Also we assume that regardless
1128                // of the multiplicity of the final result in the input, we only want to have one copy
1129                // in the output.
1130                target.reserve(source.len().saturating_add(1));
1131                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1132                target.extend(source.iter().map(|(values, cnt)| {
1133                    let mut cnt = *cnt;
1134                    cnt.negate();
1135                    (Tr::ValOwn::ok(values.to_row()), cnt)
1136                }));
1137            },
1138        );
1139        (arranged_input, reduced)
1140    }
1141
1142    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1143    /// on monotonic inputs.
1144    fn build_monotonic<S>(
1145        &self,
1146        collection: VecCollection<S, (Row, Row), Diff>,
1147        MonotonicPlan {
1148            aggr_funcs,
1149            must_consolidate,
1150        }: MonotonicPlan,
1151        mfp_after: Option<SafeMfpPlan>,
1152    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1153    where
1154        S: Scope<Timestamp = G::Timestamp>,
1155    {
1156        let aggregations = aggr_funcs.len();
1157        // Gather the relevant values into a vec of rows ordered by aggregation_index
1158        let collection = collection
1159            .map(move |(key, row)| {
1160                let mut row_builder = SharedRow::get();
1161                let mut values = Vec::with_capacity(aggregations);
1162                values.extend(
1163                    row.iter()
1164                        .take(aggregations)
1165                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1166                );
1167
1168                (key, values)
1169            })
1170            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1171                must_consolidate,
1172                "Consolidated ReduceMonotonic input",
1173            );
1174
1175        // It should be now possible to ensure that we have a monotonic collection.
1176        let error_logger = self.error_logger();
1177        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1178            error_logger.log(
1179                "Non-monotonic input to ReduceMonotonic",
1180                &format!("data={data:?}, diff={diff}"),
1181            );
1182            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1183            (EvalError::Internal(m).into(), Diff::ONE)
1184        });
1185        // We can place our rows directly into the diff field, and
1186        // only keep the relevant one corresponding to evaluating our
1187        // aggregate, instead of having to do a hierarchical reduction.
1188        let partial = partial.explode_one(move |(key, values)| {
1189            let mut output = Vec::new();
1190            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1191                output.push(monoids::get_monoid(row, func).expect(
1192                    "hierarchical aggregations are expected to have monoid implementations",
1193                ));
1194            }
1195            (key, output)
1196        });
1197
1198        // Allocations for the two closures.
1199        let mut datums1 = DatumVec::new();
1200        let mut datums2 = DatumVec::new();
1201        let mfp_after1 = mfp_after.clone();
1202        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1203
1204        let partial: KeyCollection<_, _, _> = partial.into();
1205        let arranged = partial
1206            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1207                "ArrangeMonotonic [val: empty]",
1208            );
1209        let output = arranged
1210            .clone()
1211            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1212                move |key, input, output| {
1213                    let temp_storage = RowArena::new();
1214                    let datum_iter = key.to_datum_iter();
1215                    let mut datums_local = datums1.borrow();
1216                    datums_local.extend(datum_iter);
1217                    let key_len = datums_local.len();
1218                    let accum = &input[0].1;
1219                    for monoid in accum.iter() {
1220                        datums_local.extend(monoid.finalize().iter());
1221                    }
1222
1223                    if let Some(row) =
1224                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1225                    {
1226                        output.push((row, Diff::ONE));
1227                    }
1228                }
1229            });
1230
1231        // If `mfp_after` can error, then we need to render a paired reduction
1232        // to scan for these potential errors. Note that we cannot directly use
1233        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1234        // conditionally render the second component of the reduction pair.
1235        if let Some(mfp) = mfp_after2 {
1236            let mfp_errs = arranged
1237                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1238                    "ReduceMonotonic Error Check",
1239                    move |key, input, output| {
1240                        let temp_storage = RowArena::new();
1241                        let datum_iter = key.to_datum_iter();
1242                        let mut datums_local = datums2.borrow();
1243                        datums_local.extend(datum_iter);
1244                        let accum = &input[0].1;
1245                        for monoid in accum.iter() {
1246                            datums_local.extend(monoid.finalize().iter());
1247                        }
1248                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1249                        {
1250                            output.push((e.into(), Diff::ONE));
1251                        }
1252                    },
1253                )
1254                .as_collection(|_k, v| v.clone());
1255            (output, validation_errs.concat(mfp_errs))
1256        } else {
1257            (output, validation_errs)
1258        }
1259    }
1260
1261    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1262    ///
1263    /// The incoming values are moved to the update's "difference" field, at which point
1264    /// they can be accumulated in place. The `count` operator promotes the accumulated
1265    /// values to data, at which point a final map applies operator-specific logic to
1266    /// yield the final aggregate.
1267    fn build_accumulable<S>(
1268        &self,
1269        collection: VecCollection<S, (Row, Row), Diff>,
1270        AccumulablePlan {
1271            full_aggrs,
1272            simple_aggrs,
1273            distinct_aggrs,
1274        }: AccumulablePlan,
1275        key_arity: usize,
1276        mfp_after: Option<SafeMfpPlan>,
1277    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1278    where
1279        S: Scope<Timestamp = G::Timestamp>,
1280    {
1281        let mut collection_scope = collection.scope();
1282
1283        // we must have called this function with something to reduce
1284        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1285            self.error_logger().soft_panic_or_log(
1286                "Incorrect numbers of aggregates in accummulable reduction rendering",
1287                &format!(
1288                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1289                    full_aggrs.len(),
1290                    simple_aggrs.len(),
1291                    distinct_aggrs.len(),
1292                ),
1293            );
1294        }
1295
1296        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1297        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1298        // Other aggregations can be directly moved in to the `diff` field.
1299        //
1300        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1301        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1302        // generally the count, and then two aggregation-specific values. The size could be
1303        // reduced if we want to specialize for the aggregations.
1304
1305        // Instantiate a default vector for diffs with the correct types at each
1306        // position.
1307        let zero_diffs: (Vec<_>, Diff) = (
1308            full_aggrs
1309                .iter()
1310                .map(|f| accumulable_zero(&f.func))
1311                .collect(),
1312            Diff::ZERO,
1313        );
1314
1315        let mut to_aggregate = Vec::new();
1316        if simple_aggrs.len() > 0 {
1317            // First, collect all non-distinct aggregations in one pass.
1318            let collection = collection.clone();
1319            let easy_cases = collection.explode_one({
1320                let zero_diffs = zero_diffs.clone();
1321                move |(key, row)| {
1322                    let mut diffs = zero_diffs.clone();
1323                    // Try to unpack only the datums we need. Unfortunately, since we
1324                    // can't random access into a Row, we have to iterate through one by one.
1325                    // TODO: Even though we don't have random access, we could still avoid unpacking
1326                    // everything that we don't care about, and it might be worth it to extend the
1327                    // Row API to do that.
1328                    let mut row_iter = row.iter().enumerate();
1329                    for (datum_index, aggr) in simple_aggrs.iter() {
1330                        let mut datum = row_iter.next().unwrap();
1331                        while datum_index != &datum.0 {
1332                            datum = row_iter.next().unwrap();
1333                        }
1334                        let datum = datum.1;
1335                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1336                        diffs.1 = Diff::ONE;
1337                    }
1338                    ((key, ()), diffs)
1339                }
1340            });
1341            to_aggregate.push(easy_cases);
1342        }
1343
1344        // Next, collect all aggregations that require distinctness.
1345        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1346            let pairer = Pairer::new(key_arity);
1347            let collection = collection
1348                .clone()
1349                .map(move |(key, row)| {
1350                    let value = row.iter().nth(datum_index).unwrap();
1351                    (pairer.merge(&key, std::iter::once(value)), ())
1352                })
1353                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1354                    "Arranged Accumulable Distinct [val: empty]",
1355                )
1356                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1357                    "Reduced Accumulable Distinct [val: empty]",
1358                    move |_k, _s, t| t.push(((), Diff::ONE)),
1359                )
1360                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1361                .explode_one({
1362                    let zero_diffs = zero_diffs.clone();
1363                    move |(key, row)| {
1364                        let datum = row.iter().next().unwrap();
1365                        let mut diffs = zero_diffs.clone();
1366                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1367                        diffs.1 = Diff::ONE;
1368                        ((key, ()), diffs)
1369                    }
1370                });
1371            to_aggregate.push(collection);
1372        }
1373
1374        // now concatenate, if necessary, multiple aggregations
1375        let collection = if to_aggregate.len() == 1 {
1376            to_aggregate.remove(0)
1377        } else {
1378            differential_dataflow::collection::concatenate(&mut collection_scope, to_aggregate)
1379        };
1380
1381        // Allocations for the two closures.
1382        let mut datums1 = DatumVec::new();
1383        let mut datums2 = DatumVec::new();
1384        let mfp_after1 = mfp_after.clone();
1385        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1386        let full_aggrs2 = full_aggrs.clone();
1387
1388        let error_logger = self.error_logger();
1389        let err_full_aggrs = full_aggrs.clone();
1390        let (arranged_output, arranged_errs) = collection
1391            .mz_arrange::<
1392                RowBatcher<_, _>,
1393                RowBuilder<_, _>,
1394                RowSpine<_, (Vec<Accum>, Diff)>,
1395            >("ArrangeAccumulable [val: empty]")
1396            .reduce_pair::<
1397                _,
1398                RowRowBuilder<_, _>,
1399                RowRowSpine<_, _>,
1400                _,
1401                RowErrBuilder<_, _>,
1402                RowErrSpine<_, _>,
1403            >(
1404                "ReduceAccumulable",
1405                "AccumulableErrorCheck",
1406                {
1407                    move |key, input, output| {
1408                        let (ref accums, total) = input[0].1;
1409
1410                        let temp_storage = RowArena::new();
1411                        let datum_iter = key.to_datum_iter();
1412                        let mut datums_local = datums1.borrow();
1413                        datums_local.extend(datum_iter);
1414                        let key_len = datums_local.len();
1415                        for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1416                            datums_local.push(finalize_accum(&aggr.func, accum, total));
1417                        }
1418
1419                        if let Some(row) = evaluate_mfp_after(
1420                            &mfp_after1,
1421                            &mut datums_local,
1422                            &temp_storage,
1423                            key_len,
1424                        ) {
1425                            output.push((row, Diff::ONE));
1426                        }
1427                    }
1428                },
1429                move |key, input, output| {
1430                    let (ref accums, total) = input[0].1;
1431                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1432                        // We first test here if inputs without net-positive records are present,
1433                        // producing an error to the logs and to the query output if that is the case.
1434                        if total == Diff::ZERO && !accum.is_zero() {
1435                            error_logger.log(
1436                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1437                                &format!("aggr={aggr:?}, accum={accum:?}"),
1438                            );
1439                            let key = key.to_row();
1440                            let message = format!(
1441                                "Invalid data in source, saw net-zero records for key {key} \
1442                                 with non-zero accumulation in accumulable aggregate"
1443                            );
1444                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1445                        }
1446                        match (&aggr.func, &accum) {
1447                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1448                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1449                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1450                                if accum.is_negative() {
1451                                    error_logger.log(
1452                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1453                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1454                                );
1455                                    let key = key.to_row();
1456                                    let message = format!(
1457                                        "Invalid data in source, saw negative accumulation with \
1458                                         unsigned type for key {key}"
1459                                    );
1460                                    let err =
1461                                        EvalError::Internal(message.into());
1462                                    output.push((err.into(), Diff::ONE));
1463                                }
1464                            }
1465                            _ => (), // no more errors to check for at this point!
1466                        }
1467                    }
1468
1469                    // If `mfp_after` can error, then evaluate it here.
1470                    let Some(mfp) = &mfp_after2 else { return };
1471                    let temp_storage = RowArena::new();
1472                    let datum_iter = key.to_datum_iter();
1473                    let mut datums_local = datums2.borrow();
1474                    datums_local.extend(datum_iter);
1475                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1476                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1477                    }
1478
1479                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1480                        output.push((e.into(), Diff::ONE));
1481                    }
1482                },
1483            );
1484        (
1485            arranged_output,
1486            arranged_errs.as_collection(|_key, error| error.clone()),
1487        )
1488    }
1489}
1490
1491/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1492/// containing key and aggregate values, then returns a result `Row` or `None`
1493/// if the MFP filters the result out.
1494fn evaluate_mfp_after<'a, 'b>(
1495    mfp_after: &'a Option<SafeMfpPlan>,
1496    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1497    temp_storage: &'a RowArena,
1498    key_len: usize,
1499) -> Option<Row> {
1500    let mut row_builder = SharedRow::get();
1501    // Apply MFP if it exists and pack a Row of
1502    // aggregate values from `datums_local`.
1503    if let Some(mfp) = mfp_after {
1504        // It must ignore errors here, but they are scanned
1505        // for elsewhere if the MFP can error.
1506        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1507            // The `mfp_after` must preserve the key columns,
1508            // so we can skip them to form aggregation results.
1509            Some(row_builder.pack_using(iter.skip(key_len)))
1510        } else {
1511            None
1512        }
1513    } else {
1514        Some(row_builder.pack_using(&datums_local[key_len..]))
1515    }
1516}
1517
1518fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1519    match aggr_func {
1520        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1521            trues: Diff::ZERO,
1522            falses: Diff::ZERO,
1523        },
1524        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1525            accum: AccumCount::ZERO,
1526            pos_infs: Diff::ZERO,
1527            neg_infs: Diff::ZERO,
1528            nans: Diff::ZERO,
1529            non_nulls: Diff::ZERO,
1530        },
1531        AggregateFunc::SumNumeric => Accum::Numeric {
1532            accum: OrderedDecimal(NumericAgg::zero()),
1533            pos_infs: Diff::ZERO,
1534            neg_infs: Diff::ZERO,
1535            nans: Diff::ZERO,
1536            non_nulls: Diff::ZERO,
1537        },
1538        _ => Accum::SimpleNumber {
1539            accum: AccumCount::ZERO,
1540            non_nulls: Diff::ZERO,
1541        },
1542    }
1543}
1544
1545static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1546
1547fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1548    match aggregate_func {
1549        AggregateFunc::Count => Accum::SimpleNumber {
1550            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1551            non_nulls: if datum.is_null() {
1552                Diff::ZERO
1553            } else {
1554                Diff::ONE
1555            },
1556        },
1557        AggregateFunc::Any | AggregateFunc::All => match datum {
1558            Datum::True => Accum::Bool {
1559                trues: Diff::ONE,
1560                falses: Diff::ZERO,
1561            },
1562            Datum::Null => Accum::Bool {
1563                trues: Diff::ZERO,
1564                falses: Diff::ZERO,
1565            },
1566            Datum::False => Accum::Bool {
1567                trues: Diff::ZERO,
1568                falses: Diff::ONE,
1569            },
1570            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1571        },
1572        AggregateFunc::Dummy => match datum {
1573            Datum::Dummy => Accum::SimpleNumber {
1574                accum: AccumCount::ZERO,
1575                non_nulls: Diff::ZERO,
1576            },
1577            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1578        },
1579        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1580            let n = match datum {
1581                Datum::Float32(n) => f64::from(*n),
1582                Datum::Float64(n) => *n,
1583                Datum::Null => 0f64,
1584                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1585            };
1586
1587            let nans = Diff::from(n.is_nan());
1588            let pos_infs = Diff::from(n == f64::INFINITY);
1589            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1590            let non_nulls = Diff::from(datum != Datum::Null);
1591
1592            // Map the floating point value onto a fixed precision domain
1593            // All special values should map to zero, since they are tracked separately
1594            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1595                AccumCount::ZERO
1596            } else {
1597                // This operation will truncate to i128::MAX if out of range.
1598                // TODO(benesch): rewrite to avoid `as`.
1599                #[allow(clippy::as_conversions)]
1600                { (n * *FLOAT_SCALE) as i128 }.into()
1601            };
1602
1603            Accum::Float {
1604                accum,
1605                pos_infs,
1606                neg_infs,
1607                nans,
1608                non_nulls,
1609            }
1610        }
1611        AggregateFunc::SumNumeric => match datum {
1612            Datum::Numeric(n) => {
1613                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1614                    if n.0.is_negative() {
1615                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1616                    } else {
1617                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1618                    }
1619                } else if n.0.is_nan() {
1620                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1621                } else {
1622                    // Take a narrow decimal (datum) into a wide decimal
1623                    // (aggregator).
1624                    let mut cx_agg = numeric::cx_agg();
1625                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1626                };
1627
1628                Accum::Numeric {
1629                    accum: OrderedDecimal(accum),
1630                    pos_infs,
1631                    neg_infs,
1632                    nans,
1633                    non_nulls: Diff::ONE,
1634                }
1635            }
1636            Datum::Null => Accum::Numeric {
1637                accum: OrderedDecimal(NumericAgg::zero()),
1638                pos_infs: Diff::ZERO,
1639                neg_infs: Diff::ZERO,
1640                nans: Diff::ZERO,
1641                non_nulls: Diff::ZERO,
1642            },
1643            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1644        },
1645        _ => {
1646            // Other accumulations need to disentangle the accumulable
1647            // value from its NULL-ness, which is not quite as easily
1648            // accumulated.
1649            match datum {
1650                Datum::Int16(i) => Accum::SimpleNumber {
1651                    accum: i.into(),
1652                    non_nulls: Diff::ONE,
1653                },
1654                Datum::Int32(i) => Accum::SimpleNumber {
1655                    accum: i.into(),
1656                    non_nulls: Diff::ONE,
1657                },
1658                Datum::Int64(i) => Accum::SimpleNumber {
1659                    accum: i.into(),
1660                    non_nulls: Diff::ONE,
1661                },
1662                Datum::UInt16(u) => Accum::SimpleNumber {
1663                    accum: u.into(),
1664                    non_nulls: Diff::ONE,
1665                },
1666                Datum::UInt32(u) => Accum::SimpleNumber {
1667                    accum: u.into(),
1668                    non_nulls: Diff::ONE,
1669                },
1670                Datum::UInt64(u) => Accum::SimpleNumber {
1671                    accum: u.into(),
1672                    non_nulls: Diff::ONE,
1673                },
1674                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1675                    accum: u64::from(t).into(),
1676                    non_nulls: Diff::ONE,
1677                },
1678                Datum::Null => Accum::SimpleNumber {
1679                    accum: AccumCount::ZERO,
1680                    non_nulls: Diff::ZERO,
1681                },
1682                x => panic!("Accumulating non-integer data: {x:?}"),
1683            }
1684        }
1685    }
1686}
1687
1688fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1689    // The finished value depends on the aggregation function in a variety of ways.
1690    // For all aggregates but count, if only null values were
1691    // accumulated, then the output is null.
1692    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1693        Datum::Null
1694    } else {
1695        match (&aggr_func, &accum) {
1696            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1697                Datum::Int64(non_nulls.into_inner())
1698            }
1699            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1700                // If any false, else if all true, else must be no false and some nulls.
1701                if falses.is_positive() {
1702                    Datum::False
1703                } else if *trues == total {
1704                    Datum::True
1705                } else {
1706                    Datum::Null
1707                }
1708            }
1709            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1710                // If any true, else if all false, else must be no true and some nulls.
1711                if trues.is_positive() {
1712                    Datum::True
1713                } else if *falses == total {
1714                    Datum::False
1715                } else {
1716                    Datum::Null
1717                }
1718            }
1719            (AggregateFunc::Dummy, _) => Datum::Dummy,
1720            // If any non-nulls, just report the aggregate.
1721            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1722            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1723                // This conversion is safe, as long as we have less than 2^32
1724                // summands.
1725                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1726                // If so, rewrite to avoid `as`.
1727                #[allow(clippy::as_conversions)]
1728                Datum::Int64(accum.into_inner() as i64)
1729            }
1730            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1731            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1732            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1733                if !accum.is_negative() {
1734                    // Our semantics of overflow are not clearly articulated wrt.
1735                    // unsigned vs. signed types (database-issues#5172). We adopt an
1736                    // unsigned wrapping behavior to match what we do above for
1737                    // signed types.
1738                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1739                    #[allow(clippy::as_conversions)]
1740                    Datum::UInt64(accum.into_inner() as u64)
1741                } else {
1742                    // Note that we return a value here, but an error in the other
1743                    // operator of the reduce_pair. Therefore, we expect that this
1744                    // value will never be exposed as an output.
1745                    Datum::Null
1746                }
1747            }
1748            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1749                if !accum.is_negative() {
1750                    Datum::from(*accum)
1751                } else {
1752                    // Note that we return a value here, but an error in the other
1753                    // operator of the reduce_pair. Therefore, we expect that this
1754                    // value will never be exposed as an output.
1755                    Datum::Null
1756                }
1757            }
1758            (
1759                AggregateFunc::SumFloat32,
1760                Accum::Float {
1761                    accum,
1762                    pos_infs,
1763                    neg_infs,
1764                    nans,
1765                    non_nulls: _,
1766                },
1767            ) => {
1768                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1769                    // NaNs are NaNs and cases where we've seen a
1770                    // mixture of positive and negative infinities.
1771                    Datum::from(f32::NAN)
1772                } else if pos_infs.is_positive() {
1773                    Datum::from(f32::INFINITY)
1774                } else if neg_infs.is_positive() {
1775                    Datum::from(f32::NEG_INFINITY)
1776                } else {
1777                    // TODO(benesch): remove potentially dangerous usage of `as`.
1778                    #[allow(clippy::as_conversions)]
1779                    {
1780                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1781                    }
1782                }
1783            }
1784            (
1785                AggregateFunc::SumFloat64,
1786                Accum::Float {
1787                    accum,
1788                    pos_infs,
1789                    neg_infs,
1790                    nans,
1791                    non_nulls: _,
1792                },
1793            ) => {
1794                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1795                    // NaNs are NaNs and cases where we've seen a
1796                    // mixture of positive and negative infinities.
1797                    Datum::from(f64::NAN)
1798                } else if pos_infs.is_positive() {
1799                    Datum::from(f64::INFINITY)
1800                } else if neg_infs.is_positive() {
1801                    Datum::from(f64::NEG_INFINITY)
1802                } else {
1803                    // TODO(benesch): remove potentially dangerous usage of `as`.
1804                    #[allow(clippy::as_conversions)]
1805                    {
1806                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1807                    }
1808                }
1809            }
1810            (
1811                AggregateFunc::SumNumeric,
1812                Accum::Numeric {
1813                    accum,
1814                    pos_infs,
1815                    neg_infs,
1816                    nans,
1817                    non_nulls: _,
1818                },
1819            ) => {
1820                let mut cx_datum = numeric::cx_datum();
1821                let d = cx_datum.to_width(accum.0);
1822                // Take a wide decimal (aggregator) into a
1823                // narrow decimal (datum). If this operation
1824                // overflows the datum, this new value will be
1825                // +/- infinity. However, the aggregator tracks
1826                // the amount of overflow, making it invertible.
1827                let inf_d = d.is_infinite();
1828                let neg_d = d.is_negative();
1829                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1830                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1831                if nans.is_positive() || (pos_inf && neg_inf) {
1832                    // NaNs are NaNs and cases where we've seen a
1833                    // mixture of positive and negative infinities.
1834                    Datum::from(Numeric::nan())
1835                } else if pos_inf {
1836                    Datum::from(Numeric::infinity())
1837                } else if neg_inf {
1838                    let mut cx = numeric::cx_datum();
1839                    let mut d = Numeric::infinity();
1840                    cx.neg(&mut d);
1841                    Datum::from(d)
1842                } else {
1843                    Datum::from(d)
1844                }
1845            }
1846            _ => panic!(
1847                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1848                aggr_func
1849            ),
1850        }
1851    }
1852}
1853
1854/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1855type AccumCount = mz_ore::Overflowing<i128>;
1856
1857/// Accumulates values for the various types of accumulable aggregations.
1858///
1859/// We assume that there are not more than 2^32 elements for the aggregation.
1860/// Thus we can perform a summation over i32 in an i64 accumulator
1861/// and not worry about exceeding its bounds.
1862///
1863/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1864/// point representation has less precision than a double. It is entirely possible
1865/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1866/// to preserve group guarantees.
1867#[derive(
1868    Debug,
1869    Clone,
1870    Copy,
1871    PartialEq,
1872    Eq,
1873    PartialOrd,
1874    Ord,
1875    Serialize,
1876    Deserialize
1877)]
1878enum Accum {
1879    /// Accumulates boolean values.
1880    Bool {
1881        /// The number of `true` values observed.
1882        trues: Diff,
1883        /// The number of `false` values observed.
1884        falses: Diff,
1885    },
1886    /// Accumulates simple numeric values.
1887    SimpleNumber {
1888        /// The accumulation of all non-NULL values observed.
1889        accum: AccumCount,
1890        /// The number of non-NULL values observed.
1891        non_nulls: Diff,
1892    },
1893    /// Accumulates float values.
1894    Float {
1895        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
1896        /// preserve associativity and commutativity
1897        accum: AccumCount,
1898        /// Counts +inf
1899        pos_infs: Diff,
1900        /// Counts -inf
1901        neg_infs: Diff,
1902        /// Counts NaNs
1903        nans: Diff,
1904        /// Counts non-NULL values
1905        non_nulls: Diff,
1906    },
1907    /// Accumulates arbitrary precision decimals.
1908    Numeric {
1909        /// Accumulates non-special values
1910        accum: OrderedDecimal<NumericAgg>,
1911        /// Counts +inf
1912        pos_infs: Diff,
1913        /// Counts -inf
1914        neg_infs: Diff,
1915        /// Counts NaNs
1916        nans: Diff,
1917        /// Counts non-NULL values
1918        non_nulls: Diff,
1919    },
1920}
1921
1922impl IsZero for Accum {
1923    fn is_zero(&self) -> bool {
1924        match self {
1925            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1926            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1927            Accum::Float {
1928                accum,
1929                pos_infs,
1930                neg_infs,
1931                nans,
1932                non_nulls,
1933            } => {
1934                accum.is_zero()
1935                    && pos_infs.is_zero()
1936                    && neg_infs.is_zero()
1937                    && nans.is_zero()
1938                    && non_nulls.is_zero()
1939            }
1940            Accum::Numeric {
1941                accum,
1942                pos_infs,
1943                neg_infs,
1944                nans,
1945                non_nulls,
1946            } => {
1947                accum.0.is_zero()
1948                    && pos_infs.is_zero()
1949                    && neg_infs.is_zero()
1950                    && nans.is_zero()
1951                    && non_nulls.is_zero()
1952            }
1953        }
1954    }
1955}
1956
1957impl Semigroup for Accum {
1958    fn plus_equals(&mut self, other: &Accum) {
1959        match (&mut *self, other) {
1960            (
1961                Accum::Bool { trues, falses },
1962                Accum::Bool {
1963                    trues: other_trues,
1964                    falses: other_falses,
1965                },
1966            ) => {
1967                *trues += other_trues;
1968                *falses += other_falses;
1969            }
1970            (
1971                Accum::SimpleNumber { accum, non_nulls },
1972                Accum::SimpleNumber {
1973                    accum: other_accum,
1974                    non_nulls: other_non_nulls,
1975                },
1976            ) => {
1977                *accum += other_accum;
1978                *non_nulls += other_non_nulls;
1979            }
1980            (
1981                Accum::Float {
1982                    accum,
1983                    pos_infs,
1984                    neg_infs,
1985                    nans,
1986                    non_nulls,
1987                },
1988                Accum::Float {
1989                    accum: other_accum,
1990                    pos_infs: other_pos_infs,
1991                    neg_infs: other_neg_infs,
1992                    nans: other_nans,
1993                    non_nulls: other_non_nulls,
1994                },
1995            ) => {
1996                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1997                    warn!("Float accumulator overflow. Incorrect results possible");
1998                    accum.wrapping_add(*other_accum)
1999                });
2000                *pos_infs += other_pos_infs;
2001                *neg_infs += other_neg_infs;
2002                *nans += other_nans;
2003                *non_nulls += other_non_nulls;
2004            }
2005            (
2006                Accum::Numeric {
2007                    accum,
2008                    pos_infs,
2009                    neg_infs,
2010                    nans,
2011                    non_nulls,
2012                },
2013                Accum::Numeric {
2014                    accum: other_accum,
2015                    pos_infs: other_pos_infs,
2016                    neg_infs: other_neg_infs,
2017                    nans: other_nans,
2018                    non_nulls: other_non_nulls,
2019                },
2020            ) => {
2021                let mut cx_agg = numeric::cx_agg();
2022                cx_agg.add(&mut accum.0, &other_accum.0);
2023                // `rounded` signals we have exceeded the aggregator's max
2024                // precision, which means we've lost commutativity and
2025                // associativity; nothing to be done here, so panic. For more
2026                // context, see the DEC_Rounded definition at
2027                // http://speleotrove.com/decimal/dncont.html
2028                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2029                // Reduce to reclaim unused decimal precision. Note that this
2030                // reduction must happen somewhere to make the following
2031                // invertible:
2032                // ```
2033                // CREATE TABLE a (a numeric);
2034                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2035                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2036                // ```
2037                // This will now return infinity. However, we can retract the
2038                // value that blew up its precision:
2039                // ```
2040                // INSERT INTO a VALUES ('-9e-39');
2041                // ```
2042                // This leaves `t`'s aggregator with a value of 9e39. However,
2043                // without doing a reduction, `libdecnum` will store the value
2044                // as 9e39+0e-39, which still exceeds the narrower context's
2045                // precision. By doing the reduction, we can "reclaim" the 39
2046                // digits of precision.
2047                cx_agg.reduce(&mut accum.0);
2048                *pos_infs += other_pos_infs;
2049                *neg_infs += other_neg_infs;
2050                *nans += other_nans;
2051                *non_nulls += other_non_nulls;
2052            }
2053            (l, r) => unreachable!(
2054                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2055            ),
2056        }
2057    }
2058}
2059
2060impl Multiply<Diff> for Accum {
2061    type Output = Accum;
2062
2063    fn multiply(self, factor: &Diff) -> Accum {
2064        let factor = *factor;
2065        match self {
2066            Accum::Bool { trues, falses } => Accum::Bool {
2067                trues: trues * factor,
2068                falses: falses * factor,
2069            },
2070            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2071                accum: accum * AccumCount::from(factor),
2072                non_nulls: non_nulls * factor,
2073            },
2074            Accum::Float {
2075                accum,
2076                pos_infs,
2077                neg_infs,
2078                nans,
2079                non_nulls,
2080            } => Accum::Float {
2081                accum: accum
2082                    .checked_mul(AccumCount::from(factor))
2083                    .unwrap_or_else(|| {
2084                        warn!("Float accumulator overflow. Incorrect results possible");
2085                        accum.wrapping_mul(AccumCount::from(factor))
2086                    }),
2087                pos_infs: pos_infs * factor,
2088                neg_infs: neg_infs * factor,
2089                nans: nans * factor,
2090                non_nulls: non_nulls * factor,
2091            },
2092            Accum::Numeric {
2093                accum,
2094                pos_infs,
2095                neg_infs,
2096                nans,
2097                non_nulls,
2098            } => {
2099                let mut cx = numeric::cx_agg();
2100                let mut f = NumericAgg::from(factor.into_inner());
2101                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2102                // always be an integer, i.e. we are never increasing the
2103                // values' scale.
2104                cx.mul(&mut f, &accum.0);
2105                // `rounded` signals we have exceeded the aggregator's max
2106                // precision, which means we've lost commutativity and
2107                // associativity; nothing to be done here, so panic. For more
2108                // context, see the DEC_Rounded definition at
2109                // http://speleotrove.com/decimal/dncont.html
2110                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2111                Accum::Numeric {
2112                    accum: OrderedDecimal(f),
2113                    pos_infs: pos_infs * factor,
2114                    neg_infs: neg_infs * factor,
2115                    nans: nans * factor,
2116                    non_nulls: non_nulls * factor,
2117                }
2118            }
2119        }
2120    }
2121}
2122
2123impl Columnation for Accum {
2124    type InnerRegion = CopyRegion<Self>;
2125}
2126
2127/// Monoids for in-place compaction of monotonic streams.
2128mod monoids {
2129
2130    // We can improve the performance of some aggregations through the use of algebra.
2131    // In particular, we can move some of the aggregations in to the `diff` field of
2132    // updates, by changing `diff` from integers to a different algebraic structure.
2133    //
2134    // The one we use is called a "semigroup", and it means that the structure has a
2135    // symmetric addition operator. The trait we use also allows the semigroup elements
2136    // to present as "zero", meaning they always act as the identity under +. Here,
2137    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2138    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2139    // of DD, this Semigroup should _not_ have a zero.
2140    //
2141    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2142    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2143    // assumes this.
2144
2145    use differential_dataflow::containers::{Columnation, Region};
2146    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2147    use mz_expr::AggregateFunc;
2148    use mz_ore::soft_panic_or_log;
2149    use mz_repr::{Datum, Diff, Row};
2150    use serde::{Deserialize, Serialize};
2151
2152    /// A monoid containing a single-datum row.
2153    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2154    pub enum ReductionMonoid {
2155        Min(Row),
2156        Max(Row),
2157    }
2158
2159    impl ReductionMonoid {
2160        pub fn finalize(&self) -> &Row {
2161            use ReductionMonoid::*;
2162            match self {
2163                Min(row) | Max(row) => row,
2164            }
2165        }
2166    }
2167
2168    impl Clone for ReductionMonoid {
2169        fn clone(&self) -> Self {
2170            use ReductionMonoid::*;
2171            match self {
2172                Min(row) => Min(row.clone()),
2173                Max(row) => Max(row.clone()),
2174            }
2175        }
2176
2177        fn clone_from(&mut self, source: &Self) {
2178            use ReductionMonoid::*;
2179
2180            let mut row = std::mem::take(match self {
2181                Min(row) | Max(row) => row,
2182            });
2183
2184            let source_row = match source {
2185                Min(row) | Max(row) => row,
2186            };
2187
2188            row.clone_from(source_row);
2189
2190            match source {
2191                Min(_) => *self = Min(row),
2192                Max(_) => *self = Max(row),
2193            }
2194        }
2195    }
2196
2197    impl Multiply<Diff> for ReductionMonoid {
2198        type Output = Self;
2199
2200        fn multiply(self, factor: &Diff) -> Self {
2201            // Multiplication in ReductionMonoid is idempotent, and
2202            // its users must ascertain its monotonicity beforehand
2203            // (typically with ensure_monotonic) since it has no zero
2204            // value for us to use here.
2205            assert!(factor.is_positive());
2206            self
2207        }
2208    }
2209
2210    impl Semigroup for ReductionMonoid {
2211        fn plus_equals(&mut self, rhs: &Self) {
2212            match (self, rhs) {
2213                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2214                    let swap = {
2215                        let lhs_val = lhs.unpack_first();
2216                        let rhs_val = rhs.unpack_first();
2217                        // Datum::Null is the identity, not a small element.
2218                        match (lhs_val, rhs_val) {
2219                            (_, Datum::Null) => false,
2220                            (Datum::Null, _) => true,
2221                            (lhs, rhs) => rhs < lhs,
2222                        }
2223                    };
2224                    if swap {
2225                        lhs.clone_from(rhs);
2226                    }
2227                }
2228                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2229                    let swap = {
2230                        let lhs_val = lhs.unpack_first();
2231                        let rhs_val = rhs.unpack_first();
2232                        // Datum::Null is the identity, not a large element.
2233                        match (lhs_val, rhs_val) {
2234                            (_, Datum::Null) => false,
2235                            (Datum::Null, _) => true,
2236                            (lhs, rhs) => rhs > lhs,
2237                        }
2238                    };
2239                    if swap {
2240                        lhs.clone_from(rhs);
2241                    }
2242                }
2243                (lhs, rhs) => {
2244                    soft_panic_or_log!(
2245                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2246                    );
2247                }
2248            }
2249        }
2250    }
2251
2252    impl IsZero for ReductionMonoid {
2253        fn is_zero(&self) -> bool {
2254            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2255            // DD uses true results of this method to make stuff disappear. This makes sense when
2256            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2257            // We don't want funny stuff, like disappearing, happening to reduction results even
2258            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2259            false
2260        }
2261    }
2262
2263    impl Columnation for ReductionMonoid {
2264        type InnerRegion = ReductionMonoidRegion;
2265    }
2266
2267    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2268    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2269    /// the former for simplicity reasons.
2270    #[derive(Default)]
2271    pub struct ReductionMonoidRegion {
2272        inner: <Row as Columnation>::InnerRegion,
2273    }
2274
2275    impl Region for ReductionMonoidRegion {
2276        type Item = ReductionMonoid;
2277
2278        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2279            use ReductionMonoid::*;
2280            match item {
2281                Min(row) => Min(unsafe { self.inner.copy(row) }),
2282                Max(row) => Max(unsafe { self.inner.copy(row) }),
2283            }
2284        }
2285
2286        fn clear(&mut self) {
2287            self.inner.clear();
2288        }
2289
2290        fn reserve_items<'a, I>(&mut self, items: I)
2291        where
2292            Self: 'a,
2293            I: Iterator<Item = &'a Self::Item> + Clone,
2294        {
2295            self.inner
2296                .reserve_items(items.map(ReductionMonoid::finalize));
2297        }
2298
2299        fn reserve_regions<'a, I>(&mut self, regions: I)
2300        where
2301            Self: 'a,
2302            I: Iterator<Item = &'a Self> + Clone,
2303        {
2304            self.inner.reserve_regions(regions.map(|r| &r.inner));
2305        }
2306
2307        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2308            self.inner.heap_size(callback);
2309        }
2310    }
2311
2312    /// Get the correct monoid implementation for a given aggregation function. Note that
2313    /// all hierarchical aggregation functions need to supply a monoid implementation.
2314    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2315        match func {
2316            AggregateFunc::MaxNumeric
2317            | AggregateFunc::MaxInt16
2318            | AggregateFunc::MaxInt32
2319            | AggregateFunc::MaxInt64
2320            | AggregateFunc::MaxUInt16
2321            | AggregateFunc::MaxUInt32
2322            | AggregateFunc::MaxUInt64
2323            | AggregateFunc::MaxMzTimestamp
2324            | AggregateFunc::MaxFloat32
2325            | AggregateFunc::MaxFloat64
2326            | AggregateFunc::MaxBool
2327            | AggregateFunc::MaxString
2328            | AggregateFunc::MaxDate
2329            | AggregateFunc::MaxTimestamp
2330            | AggregateFunc::MaxTimestampTz
2331            | AggregateFunc::MaxInterval
2332            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2333            AggregateFunc::MinNumeric
2334            | AggregateFunc::MinInt16
2335            | AggregateFunc::MinInt32
2336            | AggregateFunc::MinInt64
2337            | AggregateFunc::MinUInt16
2338            | AggregateFunc::MinUInt32
2339            | AggregateFunc::MinUInt64
2340            | AggregateFunc::MinMzTimestamp
2341            | AggregateFunc::MinFloat32
2342            | AggregateFunc::MinFloat64
2343            | AggregateFunc::MinBool
2344            | AggregateFunc::MinString
2345            | AggregateFunc::MinDate
2346            | AggregateFunc::MinTimestamp
2347            | AggregateFunc::MinTimestampTz
2348            | AggregateFunc::MinInterval
2349            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2350            AggregateFunc::SumInt16
2351            | AggregateFunc::SumInt32
2352            | AggregateFunc::SumInt64
2353            | AggregateFunc::SumUInt16
2354            | AggregateFunc::SumUInt32
2355            | AggregateFunc::SumUInt64
2356            | AggregateFunc::SumFloat32
2357            | AggregateFunc::SumFloat64
2358            | AggregateFunc::SumNumeric
2359            | AggregateFunc::Count
2360            | AggregateFunc::Any
2361            | AggregateFunc::All
2362            | AggregateFunc::Dummy
2363            | AggregateFunc::JsonbAgg { .. }
2364            | AggregateFunc::JsonbObjectAgg { .. }
2365            | AggregateFunc::MapAgg { .. }
2366            | AggregateFunc::ArrayConcat { .. }
2367            | AggregateFunc::ListConcat { .. }
2368            | AggregateFunc::StringAgg { .. }
2369            | AggregateFunc::RowNumber { .. }
2370            | AggregateFunc::Rank { .. }
2371            | AggregateFunc::DenseRank { .. }
2372            | AggregateFunc::LagLead { .. }
2373            | AggregateFunc::FirstValue { .. }
2374            | AggregateFunc::LastValue { .. }
2375            | AggregateFunc::WindowAggregate { .. }
2376            | AggregateFunc::FusedValueWindowFunc { .. }
2377            | AggregateFunc::FusedWindowAggregate { .. } => None,
2378        }
2379    }
2380}
2381
2382mod window_agg_helpers {
2383    use crate::render::reduce::*;
2384
2385    /// TODO: It would be better for performance to do the branching that is in the methods of this
2386    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2387    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2388    /// directly.
2389    pub enum OneByOneAggrImpls {
2390        Accumulable(AccumulableOneByOneAggr),
2391        Hierarchical(HierarchicalOneByOneAggr),
2392        Basic(mz_expr::NaiveOneByOneAggr),
2393    }
2394
2395    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2396        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2397            match reduction_type(agg) {
2398                ReductionType::Basic => {
2399                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2400                }
2401                ReductionType::Accumulable => {
2402                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2403                }
2404                ReductionType::Hierarchical => {
2405                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2406                }
2407            }
2408        }
2409
2410        fn give(&mut self, d: &Datum) {
2411            match self {
2412                OneByOneAggrImpls::Basic(i) => i.give(d),
2413                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2414                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2415            }
2416        }
2417
2418        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2419            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2420            match self {
2421                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2422                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2423                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2424            }
2425        }
2426    }
2427
2428    pub struct AccumulableOneByOneAggr {
2429        aggr_func: AggregateFunc,
2430        accum: Accum,
2431        total: Diff,
2432    }
2433
2434    impl AccumulableOneByOneAggr {
2435        fn new(aggr_func: &AggregateFunc) -> Self {
2436            AccumulableOneByOneAggr {
2437                aggr_func: aggr_func.clone(),
2438                accum: accumulable_zero(aggr_func),
2439                total: Diff::ZERO,
2440            }
2441        }
2442
2443        fn give(&mut self, d: &Datum) {
2444            self.accum
2445                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2446            self.total += Diff::ONE;
2447        }
2448
2449        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2450            temp_storage.make_datum(|packer| {
2451                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2452            })
2453        }
2454    }
2455
2456    pub struct HierarchicalOneByOneAggr {
2457        aggr_func: AggregateFunc,
2458        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2459        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2460        monoid: ReductionMonoid,
2461    }
2462
2463    impl HierarchicalOneByOneAggr {
2464        fn new(aggr_func: &AggregateFunc) -> Self {
2465            let mut row_buf = Row::default();
2466            row_buf.packer().push(Datum::Null);
2467            HierarchicalOneByOneAggr {
2468                aggr_func: aggr_func.clone(),
2469                monoid: get_monoid(row_buf, aggr_func)
2470                    .expect("aggr_func should be a hierarchical aggregation function"),
2471            }
2472        }
2473
2474        fn give(&mut self, d: &Datum) {
2475            let mut row_buf = Row::default();
2476            row_buf.packer().push(d);
2477            let m = get_monoid(row_buf, &self.aggr_func)
2478                .expect("aggr_func should be a hierarchical aggregation function");
2479            self.monoid.plus_equals(&m);
2480        }
2481
2482        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2483            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2484        }
2485    }
2486}