Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Data, VecCollection};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58    ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59    RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64    G: Scope,
65    G::Timestamp: MzTimestamp + Refines<T>,
66    T: MzTimestamp,
67{
68    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
69    /// minimize worst-case incremental update times and memory footprint.
70    pub fn render_reduce(
71        &self,
72        input_key: Option<Vec<MirScalarExpr>>,
73        input: CollectionBundle<G, T>,
74        key_val_plan: KeyValPlan,
75        reduce_plan: ReducePlan,
76        mfp_after: Option<MapFilterProject>,
77    ) -> CollectionBundle<G, T> {
78        // Convert `mfp_after` to an actionable plan.
79        let mfp_after = mfp_after.map(|m| {
80            m.into_plan()
81                .expect("MFP planning must succeed")
82                .into_nontemporal()
83                .expect("Fused Reduce MFPs do not have temporal predicates")
84        });
85
86        input.scope().region_named("Reduce", |inner| {
87            let KeyValPlan {
88                mut key_plan,
89                mut val_plan,
90            } = key_val_plan;
91            let key_arity = key_plan.projection.len();
92            let mut datums = DatumVec::new();
93
94            // Determine the columns we'll need from the row.
95            let mut demand = Vec::new();
96            demand.extend(key_plan.demand());
97            demand.extend(val_plan.demand());
98            demand.sort();
99            demand.dedup();
100
101            // remap column references to the subset we use.
102            let mut demand_map = BTreeMap::new();
103            for column in demand.iter() {
104                demand_map.insert(*column, demand_map.len());
105            }
106            let demand_map_len = demand_map.len();
107            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113                input_key.map(|k| (k, None)),
114                max_demand,
115                move |row_datums, time, diff| {
116                    let mut row_builder = SharedRow::get();
117                    let temp_storage = RowArena::new();
118
119                    let mut row_iter = row_datums.drain(..);
120                    let mut datums_local = datums.borrow();
121                    // Unpack only the demanded columns.
122                    for skip in skips.iter() {
123                        datums_local.push(row_iter.nth(*skip).unwrap());
124                    }
125
126                    // Evaluate the key expressions.
127                    let key =
128                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129                    let key = match key {
130                        Err(e) => {
131                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132                        }
133                        Ok(Some(key)) => key.clone(),
134                        Ok(None) => panic!("Row expected as no predicate was used"),
135                    };
136
137                    // Evaluate the value expressions.
138                    // The prior evaluation may have left additional columns we should delete.
139                    datums_local.truncate(skips.len());
140                    let val =
141                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142                    let val = match val {
143                        Err(e) => {
144                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145                        }
146                        Ok(Some(val)) => val.clone(),
147                        Ok(None) => panic!("Row expected as no predicate was used"),
148                    };
149
150                    Some((Ok((key, val)), time.clone(), diff.clone()))
151                },
152            );
153
154            // Demux out the potential errors from key and value selector evaluation.
155            type CB<T> = ConsolidatingContainerBuilder<T>;
156            let (ok, mut err) = key_val_input
157                .as_collection()
158                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160            err = err.concat(err_input);
161
162            // Render the reduce plan
163            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164                .leave_region()
165        })
166    }
167
168    /// Render a dataflow based on the provided plan.
169    ///
170    /// The output will be an arrangements that looks the same as if
171    /// we just had a single reduce operator computing everything together, and
172    /// this arrangement can also be re-used.
173    fn render_reduce_plan<S>(
174        &self,
175        plan: ReducePlan,
176        collection: VecCollection<S, (Row, Row), Diff>,
177        err_input: VecCollection<S, DataflowError, Diff>,
178        key_arity: usize,
179        mfp_after: Option<SafeMfpPlan>,
180    ) -> CollectionBundle<S, T>
181    where
182        S: Scope<Timestamp = G::Timestamp>,
183    {
184        let mut errors = Default::default();
185        let arrangement =
186            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188        CollectionBundle::from_columns(
189            0..key_arity,
190            ArrangementFlavor::Local(
191                arrangement,
192                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193            ),
194        )
195    }
196
197    fn render_reduce_plan_inner<S>(
198        &self,
199        plan: ReducePlan,
200        collection: VecCollection<S, (Row, Row), Diff>,
201        errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
202        key_arity: usize,
203        mfp_after: Option<SafeMfpPlan>,
204    ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205    where
206        S: Scope<Timestamp = G::Timestamp>,
207    {
208        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
209        // not only values (database-issues#6658).
210        let arrangement = match plan {
211            // If we have no aggregations or just a single type of reduction, we
212            // can go ahead and render them directly.
213            ReducePlan::Distinct => {
214                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215                errors.push(errs);
216                arranged_output
217            }
218            ReducePlan::Accumulable(expr) => {
219                let (arranged_output, errs) =
220                    self.build_accumulable(collection, expr, key_arity, mfp_after);
221                errors.push(errs);
222                arranged_output
223            }
224            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226                errors.push(errs);
227                output
228            }
229            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231                errors.push(errs);
232                output
233            }
234            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235                expr,
236                fused_unnest_list,
237            })) => {
238                // Note that we skip validating for negative diffs when we have a fused unnest list,
239                // because this is already a CPU-intensive situation due to the non-incrementalness
240                // of window functions.
241                let validating = !fused_unnest_list;
242                let (output, errs) = self.build_basic_aggregate(
243                    collection,
244                    0,
245                    &expr,
246                    validating,
247                    key_arity,
248                    mfp_after,
249                    fused_unnest_list,
250                );
251                if validating {
252                    errors.push(errs.expect("validation should have occurred as it was requested"));
253                }
254                output
255            }
256            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257                let (output, errs) =
258                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259                errors.push(errs);
260                output
261            }
262        };
263        arrangement
264    }
265
266    /// Build the dataflow to compute the set of distinct keys.
267    fn build_distinct<S>(
268        &self,
269        collection: VecCollection<S, (Row, Row), Diff>,
270        mfp_after: Option<SafeMfpPlan>,
271    ) -> (
272        Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
273        VecCollection<S, DataflowError, Diff>,
274    )
275    where
276        S: Scope<Timestamp = G::Timestamp>,
277    {
278        let error_logger = self.error_logger();
279
280        // Allocations for the two closures.
281        let mut datums1 = DatumVec::new();
282        let mut datums2 = DatumVec::new();
283        let mfp_after1 = mfp_after.clone();
284        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
285
286        let (output, errors) = collection
287            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
288                "Arranged DistinctBy",
289            )
290            .reduce_pair::<
291                _,
292                RowRowBuilder<_, _>,
293                RowRowSpine<_, _>,
294                _,
295                RowErrBuilder<_, _>,
296                RowErrSpine<_, _>,
297            >(
298                "DistinctBy",
299                "DistinctByErrorCheck",
300                move |key, _input, output| {
301                    let temp_storage = RowArena::new();
302                    let mut datums_local = datums1.borrow();
303                    datums_local.extend(key.to_datum_iter());
304
305                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
306                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
307                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
308                    if mfp_after1
309                        .as_ref()
310                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
311                        .unwrap_or(Ok(true))
312                        == Ok(true)
313                    {
314                        // We're pushing a unit value here because the key is implicitly added by the
315                        // arrangement, and the permutation logic takes care of using the key part of the
316                        // output.
317                        output.push((Row::default(), Diff::ONE));
318                    }
319                },
320                move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
321                    for (_, count) in input.iter() {
322                        if count.is_positive() {
323                            continue;
324                        }
325                        let message = "Non-positive multiplicity in DistinctBy";
326                        error_logger.log(message, &format!("row={key:?}, count={count}"));
327                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
328                        return;
329                    }
330                    // If `mfp_after` can error, then evaluate it here.
331                    let Some(mfp) = &mfp_after2 else { return };
332                    let temp_storage = RowArena::new();
333                    let datum_iter = key.to_datum_iter();
334                    let mut datums_local = datums2.borrow();
335                    datums_local.extend(datum_iter);
336
337                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
338                        output.push((e.into(), Diff::ONE));
339                    }
340                },
341            );
342        (output, errors.as_collection(|_k, v| v.clone()))
343    }
344
345    /// Build the dataflow to compute and arrange multiple non-accumulable,
346    /// non-hierarchical aggregations on `input`.
347    ///
348    /// This function assumes that we are explicitly rendering multiple basic aggregations.
349    /// For each aggregate, we render a different reduce operator, and then fuse
350    /// results together into a final arrangement that presents all the results
351    /// in the order specified by `aggrs`.
352    fn build_basic_aggregates<S>(
353        &self,
354        input: VecCollection<S, (Row, Row), Diff>,
355        aggrs: Vec<AggregateExpr>,
356        key_arity: usize,
357        mfp_after: Option<SafeMfpPlan>,
358    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
359    where
360        S: Scope<Timestamp = G::Timestamp>,
361    {
362        // We are only using this function to render multiple basic aggregates and
363        // stitch them together. If that's not true we should complain.
364        if aggrs.len() <= 1 {
365            self.error_logger().soft_panic_or_log(
366                "Too few aggregations when building basic aggregates",
367                &format!("len={}", aggrs.len()),
368            )
369        }
370        let mut err_output = None;
371        let mut to_collect = Vec::new();
372        for (index, aggr) in aggrs.into_iter().enumerate() {
373            let (result, errs) = self.build_basic_aggregate(
374                input.clone(),
375                index,
376                &aggr,
377                err_output.is_none(),
378                key_arity,
379                None,
380                false,
381            );
382            if errs.is_some() {
383                err_output = errs
384            }
385            to_collect
386                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
387        }
388
389        // Allocations for the two closures.
390        let mut datums1 = DatumVec::new();
391        let mut datums2 = DatumVec::new();
392        let mfp_after1 = mfp_after.clone();
393        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
394
395        let arranged =
396            differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
397                .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
398                "Arranged ReduceFuseBasic input",
399            );
400
401        let output = arranged
402            .clone()
403            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
404                move |key, input, output| {
405                    let temp_storage = RowArena::new();
406                    let datum_iter = key.to_datum_iter();
407                    let mut datums_local = datums1.borrow();
408                    datums_local.extend(datum_iter);
409                    let key_len = datums_local.len();
410
411                    for ((_, row), _) in input.iter() {
412                        datums_local.push(row.unpack_first());
413                    }
414
415                    if let Some(row) =
416                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
417                    {
418                        output.push((row, Diff::ONE));
419                    }
420                }
421            });
422        // If `mfp_after` can error, then we need to render a paired reduction
423        // to scan for these potential errors. Note that we cannot directly use
424        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
425        // conditionally render the second component of the reduction pair.
426        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
427        if let Some(mfp) = mfp_after2 {
428            let mfp_errs = arranged
429                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
430                    "ReduceFuseBasic Error Check",
431                    move |key, input, output| {
432                        // Since negative accumulations are checked in at least one component
433                        // aggregate, we only need to look for MFP errors here.
434                        let temp_storage = RowArena::new();
435                        let datum_iter = key.to_datum_iter();
436                        let mut datums_local = datums2.borrow();
437                        datums_local.extend(datum_iter);
438
439                        for ((_, row), _) in input.iter() {
440                            datums_local.push(row.unpack_first());
441                        }
442
443                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
444                            output.push((e.into(), Diff::ONE));
445                        }
446                    },
447                )
448                .as_collection(|_, v| v.clone());
449            (output, validation_errs.concat(mfp_errs))
450        } else {
451            (output, validation_errs)
452        }
453    }
454
455    /// Build the dataflow to compute a single basic aggregation.
456    ///
457    /// This method also applies distinctness if required.
458    fn build_basic_aggregate<S>(
459        &self,
460        input: VecCollection<S, (Row, Row), Diff>,
461        index: usize,
462        aggr: &AggregateExpr,
463        validating: bool,
464        key_arity: usize,
465        mfp_after: Option<SafeMfpPlan>,
466        fused_unnest_list: bool,
467    ) -> (
468        RowRowArrangement<S>,
469        Option<VecCollection<S, DataflowError, Diff>>,
470    )
471    where
472        S: Scope<Timestamp = G::Timestamp>,
473    {
474        let AggregateExpr {
475            func,
476            expr: _,
477            distinct,
478        } = aggr.clone();
479
480        // Extract the value we were asked to aggregate over.
481        let mut partial = input.map(move |(key, row)| {
482            let mut row_builder = SharedRow::get();
483            let value = row.iter().nth(index).unwrap();
484            row_builder.packer().push(value);
485            (key, row_builder.clone())
486        });
487
488        let mut err_output = None;
489
490        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
491        if distinct {
492            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
493            let pairer = Pairer::new(key_arity);
494            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
495            if validating {
496                let (oks, errs) = self
497                    .build_reduce_inaccumulable_distinct::<
498                        _,
499                        RowValBuilder<Result<(), String>, _, _>,
500                        RowValSpine<Result<(), String>, _, _>,
501                    >(keyed, None)
502                    .as_collection(|k, v| {
503                        (
504                            k.to_row(),
505                            v.as_ref()
506                                .map(|&()| ())
507                                .map_err(|m| m.as_str().into()),
508                        )
509                    })
510                    .map_fallible::<
511                        CapacityContainerBuilder<_>,
512                        CapacityContainerBuilder<_>,
513                        _,
514                        _,
515                        _,
516                    >(
517                        "Demux Errors",
518                        move |(key_val, result)| match result {
519                            Ok(()) => Ok(pairer.split(&key_val)),
520                            Err(m) => {
521                                Err(EvalError::Internal(m).into())
522                            }
523                        },
524                    );
525                err_output = Some(errs);
526                partial = oks;
527            } else {
528                partial = self
529                    .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
530                        keyed,
531                        Some(" [val: empty]"),
532                    )
533                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
534            }
535        }
536
537        // Allocations for the two closures.
538        let mut datums1 = DatumVec::new();
539        let mut datums2 = DatumVec::new();
540        let mut datums_key_1 = DatumVec::new();
541        let mut datums_key_2 = DatumVec::new();
542        let mfp_after1 = mfp_after.clone();
543        let func2 = func.clone();
544
545        let name = if !fused_unnest_list {
546            "ReduceInaccumulable"
547        } else {
548            "FusedReduceUnnestList"
549        };
550        let arranged = partial
551            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
552                "Arranged {name}"
553            ));
554        let oks = if !fused_unnest_list {
555            arranged
556                .clone()
557                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
558                    move |key, source, target| {
559                        // We respect the multiplicity here (unlike in hierarchical aggregation)
560                        // because we don't know that the aggregation method is not sensitive
561                        // to the number of records.
562                        let iter = source.iter().flat_map(|(v, w)| {
563                            // Note that in the non-positive case, this is wrong, but harmless because
564                            // our other reduction will produce an error.
565                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
566                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
567                        });
568
569                        let temp_storage = RowArena::new();
570                        let datum_iter = key.to_datum_iter();
571                        let mut datums_local = datums1.borrow();
572                        datums_local.extend(datum_iter);
573                        let key_len = datums_local.len();
574                        datums_local.push(
575                        // Note that this is not necessarily a window aggregation, in which case
576                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
577                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
578                            iter,
579                            &temp_storage,
580                        ),
581                    );
582
583                        if let Some(row) = evaluate_mfp_after(
584                            &mfp_after1,
585                            &mut datums_local,
586                            &temp_storage,
587                            key_len,
588                        ) {
589                            target.push((row, Diff::ONE));
590                        }
591                    }
592                })
593        } else {
594            arranged
595                .clone()
596                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
597                    move |key, source, target| {
598                        // This part is the same as in the `!fused_unnest_list` if branch above.
599                        let iter = source.iter().flat_map(|(v, w)| {
600                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
601                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
602                        });
603
604                        // This is the part that is specific to the `fused_unnest_list` branch.
605                        let temp_storage = RowArena::new();
606                        let mut datums_local = datums_key_1.borrow();
607                        datums_local.extend(key.to_datum_iter());
608                        let key_len = datums_local.len();
609                        for datum in func
610                            .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
611                                iter,
612                                &temp_storage,
613                            )
614                        {
615                            datums_local.truncate(key_len);
616                            datums_local.push(datum);
617                            if let Some(row) = evaluate_mfp_after(
618                                &mfp_after1,
619                                &mut datums_local,
620                                &temp_storage,
621                                key_len,
622                            ) {
623                                target.push((row, Diff::ONE));
624                            }
625                        }
626                    }
627                })
628        };
629
630        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
631        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
632        // rationale around using a second reduction here.
633        let must_validate = validating && err_output.is_none();
634        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
635        if must_validate || mfp_after2.is_some() {
636            let error_logger = self.error_logger();
637
638            let errs = if !fused_unnest_list {
639                arranged
640                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
641                        &format!("{name} Error Check"),
642                        move |key, source, target| {
643                            // Negative counts would be surprising, but until we are 100% certain we won't
644                            // see them, we should report when we do. We may want to bake even more info
645                            // in here in the future.
646                            if must_validate {
647                                for (value, count) in source.iter() {
648                                    if count.is_positive() {
649                                        continue;
650                                    }
651                                    let value = value.to_row();
652                                    let message =
653                                        "Non-positive accumulation in ReduceInaccumulable";
654                                    error_logger
655                                        .log(message, &format!("value={value:?}, count={count}"));
656                                    let err = EvalError::Internal(message.into());
657                                    target.push((err.into(), Diff::ONE));
658                                    return;
659                                }
660                            }
661
662                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
663                            let Some(mfp) = &mfp_after2 else { return };
664                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
665                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
666                                // This would ideally use `to_datum_iter` but we cannot as it needs to
667                                // borrow `v` and only presents datums with that lifetime, not any longer.
668                                std::iter::repeat(v.next().unwrap()).take(count)
669                            });
670
671                            let temp_storage = RowArena::new();
672                            let datum_iter = key.to_datum_iter();
673                            let mut datums_local = datums2.borrow();
674                            datums_local.extend(datum_iter);
675                            datums_local.push(
676                                func2.eval_with_fast_window_agg::<
677                                    _,
678                                    window_agg_helpers::OneByOneAggrImpls,
679                                >(
680                                    iter, &temp_storage
681                                ),
682                            );
683                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
684                                target.push((e.into(), Diff::ONE));
685                            }
686                        },
687                    )
688                    .as_collection(|_, v| v.clone())
689            } else {
690                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
691                assert!(!must_validate);
692                // We couldn't have got into this if branch due to `must_validate`, so it must be
693                // because of the `mfp_after2.is_some()`.
694                let Some(mfp) = mfp_after2 else {
695                    unreachable!()
696                };
697                arranged
698                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
699                        &format!("{name} Error Check"),
700                        move |key, source, target| {
701                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
702                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
703                                // This would ideally use `to_datum_iter` but we cannot as it needs to
704                                // borrow `v` and only presents datums with that lifetime, not any longer.
705                                std::iter::repeat(v.next().unwrap()).take(count)
706                            });
707
708                            let temp_storage = RowArena::new();
709                            let mut datums_local = datums_key_2.borrow();
710                            datums_local.extend(key.to_datum_iter());
711                            let key_len = datums_local.len();
712                            for datum in func2
713                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
714                                    iter,
715                                    &temp_storage,
716                                )
717                            {
718                                datums_local.truncate(key_len);
719                                datums_local.push(datum);
720                                // We know that `mfp` can error (because of the `could_error` call
721                                // above), so try to evaluate it here.
722                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
723                                {
724                                    target.push((e.into(), Diff::ONE));
725                                }
726                            }
727                        },
728                    )
729                    .as_collection(|_, v| v.clone())
730            };
731
732            if let Some(e) = err_output {
733                err_output = Some(e.concat(errs));
734            } else {
735                err_output = Some(errs);
736            }
737        }
738        (oks, err_output)
739    }
740
741    fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
742        &self,
743        input: VecCollection<S, Row, Diff>,
744        name_tag: Option<&str>,
745    ) -> Arranged<S, TraceAgent<Tr>>
746    where
747        S: Scope<Timestamp = G::Timestamp>,
748        Tr: for<'a> Trace<
749                Key<'a> = DatumSeq<'a>,
750                KeyOwn = Row,
751                Time = G::Timestamp,
752                Diff = Diff,
753                ValOwn: Data + MaybeValidatingRow<(), String>,
754            > + 'static,
755        Bu: Builder<
756                Time = G::Timestamp,
757                Input: Container
758                           + InternalMerge
759                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
760                Output = Tr::Batch,
761            >,
762        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
763    {
764        let error_logger = self.error_logger();
765
766        let output_name = format!(
767            "ReduceInaccumulable Distinct{}",
768            name_tag.unwrap_or_default()
769        );
770
771        let input: KeyCollection<_, _, _> = input.into();
772        input
773            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
774                "Arranged ReduceInaccumulable Distinct [val: empty]",
775            )
776            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
777                if let Some(err) = Tr::ValOwn::into_error() {
778                    for (value, count) in source.iter() {
779                        if count.is_positive() {
780                            continue;
781                        }
782
783                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
784                        error_logger.log(message, &format!("value={value:?}, count={count}"));
785                        t.push((err(message.to_string()), Diff::ONE));
786                        return;
787                    }
788                }
789                t.push((Tr::ValOwn::ok(()), Diff::ONE))
790            })
791    }
792
793    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
794    /// on non-monotonic inputs.
795    ///
796    /// This function renders a single reduction tree that computes aggregations with
797    /// a priority queue implemented with a series of reduce operators that partition
798    /// the input into buckets, and compute the aggregation over very small buckets
799    /// and feed the results up to larger buckets.
800    ///
801    /// Note that this implementation currently ignores the distinct bit because we
802    /// currently only perform min / max hierarchically and the reduction tree
803    /// efficiently suppresses non-distinct updates.
804    ///
805    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
806    /// trickery here to limit the memory usage per layer by internally
807    /// holding only the elements that were rejected by this stage. However, the
808    /// output collection maintains the `((key, bucket), (passing value)` for this
809    /// stage.
810    fn build_bucketed<S>(
811        &self,
812        input: VecCollection<S, (Row, Row), Diff>,
813        BucketedPlan {
814            aggr_funcs,
815            buckets,
816        }: BucketedPlan,
817        key_arity: usize,
818        mfp_after: Option<SafeMfpPlan>,
819    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
820    where
821        S: Scope<Timestamp = G::Timestamp>,
822    {
823        let mut err_output: Option<VecCollection<S, _, _>> = None;
824        let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
825            let input = input.enter(inner);
826
827            // The first mod to apply to the hash.
828            let first_mod = buckets.get(0).copied().unwrap_or(1);
829            let aggregations = aggr_funcs.len();
830
831            // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
832            let mut stage = input.map(move |(key, row)| {
833                let mut row_builder = SharedRow::get();
834                let mut row_packer = row_builder.packer();
835                row_packer.extend(row.iter().take(aggregations));
836                let values = row_builder.clone();
837
838                // Apply the initial mod here.
839                let hash = values.hashed() % first_mod;
840                let hash_key =
841                    row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
842                (hash_key, values)
843            });
844
845            // Repeatedly apply hierarchical reduction with a progressively coarser key.
846            for (index, b) in buckets.into_iter().enumerate() {
847                // Apply subsequent bucket mods for all but the first round.
848                let input = if index == 0 {
849                    stage
850                } else {
851                    stage.map(move |(hash_key, values)| {
852                        let mut hash_key_iter = hash_key.iter();
853                        let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
854                        // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
855                        let hash_key = SharedRow::pack(
856                            std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
857                        );
858                        (hash_key, values)
859                    })
860                };
861
862                // We only want the first stage to perform validation of whether invalid accumulations
863                // were observed in the input. Subsequently, we will either produce an error in the error
864                // stream or produce correct data in the output stream.
865                let validating = err_output.is_none();
866
867                let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
868                if let Some(errs) = errs {
869                    err_output = Some(errs.leave_region());
870                }
871                stage = oks
872            }
873
874            // Discard the hash from the key and return to the format of the input data.
875            let partial = stage.map(move |(hash_key, values)| {
876                let mut hash_key_iter = hash_key.iter();
877                let _hash = hash_key_iter.next();
878                (SharedRow::pack(hash_key_iter.take(key_arity)), values)
879            });
880
881            // Allocations for the two closures.
882            let mut datums1 = DatumVec::new();
883            let mut datums2 = DatumVec::new();
884            let mfp_after1 = mfp_after.clone();
885            let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
886            let aggr_funcs2 = aggr_funcs.clone();
887
888            // Build a series of stages for the reduction
889            // Arrange the final result into (key, Row)
890            let error_logger = self.error_logger();
891            // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
892            // view mz_introspection.mz_expected_group_size_advice.
893            let arranged = partial
894                .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
895                    "Arrange ReduceMinsMaxes",
896                );
897            // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
898            // but we then wouldn't be able to do this error check conditionally.  See its documentation
899            // for the rationale around using a second reduction here.
900            let must_validate = err_output.is_none();
901            if must_validate || mfp_after2.is_some() {
902                let errs = arranged
903                    .clone()
904                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
905                        "ReduceMinsMaxes Error Check",
906                        move |key, source, target| {
907                            // Negative counts would be surprising, but until we are 100% certain we wont
908                            // see them, we should report when we do. We may want to bake even more info
909                            // in here in the future.
910                            if must_validate {
911                                for (val, count) in source.iter() {
912                                    if count.is_positive() {
913                                        continue;
914                                    }
915                                    let val = val.to_row();
916                                    let message = "Non-positive accumulation in ReduceMinsMaxes";
917                                    error_logger
918                                        .log(message, &format!("val={val:?}, count={count}"));
919                                    target.push((
920                                        EvalError::Internal(message.into()).into(),
921                                        Diff::ONE,
922                                    ));
923                                    return;
924                                }
925                            }
926
927                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
928                            let Some(mfp) = &mfp_after2 else { return };
929                            let temp_storage = RowArena::new();
930                            let datum_iter = key.to_datum_iter();
931                            let mut datums_local = datums2.borrow();
932                            datums_local.extend(datum_iter);
933
934                            let mut source_iters = source
935                                .iter()
936                                .map(|(values, _cnt)| *values)
937                                .collect::<Vec<_>>();
938                            for func in aggr_funcs2.iter() {
939                                let column_iter = (0..source_iters.len())
940                                    .map(|i| source_iters[i].next().unwrap());
941                                datums_local.push(func.eval(column_iter, &temp_storage));
942                            }
943                            if let Result::Err(e) =
944                                mfp.evaluate_inner(&mut datums_local, &temp_storage)
945                            {
946                                target.push((e.into(), Diff::ONE));
947                            }
948                        },
949                    )
950                    .as_collection(|_, v| v.clone())
951                    .leave_region();
952                if let Some(e) = err_output.take() {
953                    err_output = Some(e.concat(errs));
954                } else {
955                    err_output = Some(errs);
956                }
957            }
958            arranged
959                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
960                    "ReduceMinsMaxes",
961                    move |key, source, target| {
962                        let temp_storage = RowArena::new();
963                        let datum_iter = key.to_datum_iter();
964                        let mut datums_local = datums1.borrow();
965                        datums_local.extend(datum_iter);
966                        let key_len = datums_local.len();
967
968                        let mut source_iters = source
969                            .iter()
970                            .map(|(values, _cnt)| *values)
971                            .collect::<Vec<_>>();
972                        for func in aggr_funcs.iter() {
973                            let column_iter =
974                                (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
975                            datums_local.push(func.eval(column_iter, &temp_storage));
976                        }
977
978                        if let Some(row) = evaluate_mfp_after(
979                            &mfp_after1,
980                            &mut datums_local,
981                            &temp_storage,
982                            key_len,
983                        ) {
984                            target.push((row, Diff::ONE));
985                        }
986                    },
987                )
988                .leave_region()
989        });
990        (
991            arranged_output,
992            err_output.expect("expected to validate in one level of the hierarchy"),
993        )
994    }
995
996    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
997    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
998    /// with the negation of what's produced by the reduction.
999    /// `validating` indicates whether we want this stage to perform error detection
1000    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
1001    /// stages can skip validation.
1002    fn build_bucketed_stage<S>(
1003        &self,
1004        aggr_funcs: &Vec<AggregateFunc>,
1005        input: VecCollection<S, (Row, Row), Diff>,
1006        validating: bool,
1007    ) -> (
1008        VecCollection<S, (Row, Row), Diff>,
1009        Option<VecCollection<S, DataflowError, Diff>>,
1010    )
1011    where
1012        S: Scope<Timestamp = G::Timestamp>,
1013    {
1014        let (input, negated_output, errs) = if validating {
1015            let (input, reduced) = self
1016                .build_bucketed_negated_output::<
1017                    _,
1018                    RowValBuilder<_, _, _>,
1019                    RowValSpine<Result<Row, Row>, _, _>,
1020                >(
1021                    input.clone(),
1022                    aggr_funcs.clone(),
1023                );
1024            let (oks, errs) = reduced
1025                .as_collection(|k, v| (k.to_row(), v.clone()))
1026                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1027                "Checked Invalid Accumulations",
1028                |(hash_key, result)| match result {
1029                    Err(hash_key) => {
1030                        let mut hash_key_iter = hash_key.iter();
1031                        let _hash = hash_key_iter.next();
1032                        let key = SharedRow::pack(hash_key_iter);
1033                        let message = format!(
1034                            "Invalid data in source, saw non-positive accumulation \
1035                                         for key {key:?} in hierarchical mins-maxes aggregate"
1036                        );
1037                        Err(EvalError::Internal(message.into()).into())
1038                    }
1039                    Ok(values) => Ok((hash_key, values)),
1040                },
1041            );
1042            (input, oks, Some(errs))
1043        } else {
1044            let (input, reduced) = self
1045                .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1046                    input,
1047                    aggr_funcs.clone(),
1048                );
1049            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1050            // that we need to apply the mod on both input and oks.
1051            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1052            (input, oks, None)
1053        };
1054
1055        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1056        let oks = negated_output.concat(input);
1057        (oks, errs)
1058    }
1059
1060    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1061    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1062    /// with all diffs in the reduction's output negated.
1063    fn build_bucketed_negated_output<S, Bu, Tr>(
1064        &self,
1065        input: VecCollection<S, (Row, Row), Diff>,
1066        aggrs: Vec<AggregateFunc>,
1067    ) -> (
1068        Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1069        Arranged<S, TraceAgent<Tr>>,
1070    )
1071    where
1072        S: Scope<Timestamp = G::Timestamp>,
1073        Tr: for<'a> Trace<
1074                Key<'a> = DatumSeq<'a>,
1075                KeyOwn = Row,
1076                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1077                Time = G::Timestamp,
1078                Diff = Diff,
1079            > + 'static,
1080        Bu: Builder<
1081                Time = G::Timestamp,
1082                Input: Container
1083                           + InternalMerge
1084                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1085                Output = Tr::Batch,
1086            >,
1087        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1088    {
1089        let error_logger = self.error_logger();
1090        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1091        // view mz_introspection.mz_expected_group_size_advice.
1092        let arranged_input = input
1093            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1094                "Arranged MinsMaxesHierarchical input",
1095            );
1096
1097        let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1098            "Reduced Fallibly MinsMaxesHierarchical",
1099            move |key, source, target| {
1100                if let Some(err) = Tr::ValOwn::into_error() {
1101                    // Should negative accumulations reach us, we should loudly complain.
1102                    for (value, count) in source.iter() {
1103                        if count.is_positive() {
1104                            continue;
1105                        }
1106                        error_logger.log(
1107                            "Non-positive accumulation in MinsMaxesHierarchical",
1108                            &format!("key={key:?}, value={value:?}, count={count}"),
1109                        );
1110                        // After complaining, output an error here so that we can eventually
1111                        // report it in an error stream.
1112                        target.push((err(Tr::owned_key(key)), Diff::ONE));
1113                        return;
1114                    }
1115                }
1116
1117                let mut row_builder = SharedRow::get();
1118                let mut row_packer = row_builder.packer();
1119
1120                let mut source_iters = source
1121                    .iter()
1122                    .map(|(values, _cnt)| *values)
1123                    .collect::<Vec<_>>();
1124                for func in aggrs.iter() {
1125                    let column_iter =
1126                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1127                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1128                }
1129                // We only want to arrange the parts of the input that are not part of the output.
1130                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1131                // gives us the intended value of this aggregate function. Also we assume that regardless
1132                // of the multiplicity of the final result in the input, we only want to have one copy
1133                // in the output.
1134                target.reserve(source.len().saturating_add(1));
1135                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1136                target.extend(source.iter().map(|(values, cnt)| {
1137                    let mut cnt = *cnt;
1138                    cnt.negate();
1139                    (Tr::ValOwn::ok(values.to_row()), cnt)
1140                }));
1141            },
1142        );
1143        (arranged_input, reduced)
1144    }
1145
1146    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1147    /// on monotonic inputs.
1148    fn build_monotonic<S>(
1149        &self,
1150        collection: VecCollection<S, (Row, Row), Diff>,
1151        MonotonicPlan {
1152            aggr_funcs,
1153            must_consolidate,
1154        }: MonotonicPlan,
1155        mfp_after: Option<SafeMfpPlan>,
1156    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1157    where
1158        S: Scope<Timestamp = G::Timestamp>,
1159    {
1160        let aggregations = aggr_funcs.len();
1161        // Gather the relevant values into a vec of rows ordered by aggregation_index
1162        let collection = collection
1163            .map(move |(key, row)| {
1164                let mut row_builder = SharedRow::get();
1165                let mut values = Vec::with_capacity(aggregations);
1166                values.extend(
1167                    row.iter()
1168                        .take(aggregations)
1169                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1170                );
1171
1172                (key, values)
1173            })
1174            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1175                must_consolidate,
1176                "Consolidated ReduceMonotonic input",
1177            );
1178
1179        // It should be now possible to ensure that we have a monotonic collection.
1180        let error_logger = self.error_logger();
1181        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1182            error_logger.log(
1183                "Non-monotonic input to ReduceMonotonic",
1184                &format!("data={data:?}, diff={diff}"),
1185            );
1186            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1187            (EvalError::Internal(m).into(), Diff::ONE)
1188        });
1189        // We can place our rows directly into the diff field, and
1190        // only keep the relevant one corresponding to evaluating our
1191        // aggregate, instead of having to do a hierarchical reduction.
1192        let partial = partial.explode_one(move |(key, values)| {
1193            let mut output = Vec::new();
1194            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1195                output.push(monoids::get_monoid(row, func).expect(
1196                    "hierarchical aggregations are expected to have monoid implementations",
1197                ));
1198            }
1199            (key, output)
1200        });
1201
1202        // Allocations for the two closures.
1203        let mut datums1 = DatumVec::new();
1204        let mut datums2 = DatumVec::new();
1205        let mfp_after1 = mfp_after.clone();
1206        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1207
1208        let partial: KeyCollection<_, _, _> = partial.into();
1209        let arranged = partial
1210            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1211                "ArrangeMonotonic [val: empty]",
1212            );
1213        let output = arranged
1214            .clone()
1215            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1216                move |key, input, output| {
1217                    let temp_storage = RowArena::new();
1218                    let datum_iter = key.to_datum_iter();
1219                    let mut datums_local = datums1.borrow();
1220                    datums_local.extend(datum_iter);
1221                    let key_len = datums_local.len();
1222                    let accum = &input[0].1;
1223                    for monoid in accum.iter() {
1224                        datums_local.extend(monoid.finalize().iter());
1225                    }
1226
1227                    if let Some(row) =
1228                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1229                    {
1230                        output.push((row, Diff::ONE));
1231                    }
1232                }
1233            });
1234
1235        // If `mfp_after` can error, then we need to render a paired reduction
1236        // to scan for these potential errors. Note that we cannot directly use
1237        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1238        // conditionally render the second component of the reduction pair.
1239        if let Some(mfp) = mfp_after2 {
1240            let mfp_errs = arranged
1241                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1242                    "ReduceMonotonic Error Check",
1243                    move |key, input, output| {
1244                        let temp_storage = RowArena::new();
1245                        let datum_iter = key.to_datum_iter();
1246                        let mut datums_local = datums2.borrow();
1247                        datums_local.extend(datum_iter);
1248                        let accum = &input[0].1;
1249                        for monoid in accum.iter() {
1250                            datums_local.extend(monoid.finalize().iter());
1251                        }
1252                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1253                        {
1254                            output.push((e.into(), Diff::ONE));
1255                        }
1256                    },
1257                )
1258                .as_collection(|_k, v| v.clone());
1259            (output, validation_errs.concat(mfp_errs))
1260        } else {
1261            (output, validation_errs)
1262        }
1263    }
1264
1265    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1266    ///
1267    /// The incoming values are moved to the update's "difference" field, at which point
1268    /// they can be accumulated in place. The `count` operator promotes the accumulated
1269    /// values to data, at which point a final map applies operator-specific logic to
1270    /// yield the final aggregate.
1271    fn build_accumulable<S>(
1272        &self,
1273        collection: VecCollection<S, (Row, Row), Diff>,
1274        AccumulablePlan {
1275            full_aggrs,
1276            simple_aggrs,
1277            distinct_aggrs,
1278        }: AccumulablePlan,
1279        key_arity: usize,
1280        mfp_after: Option<SafeMfpPlan>,
1281    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1282    where
1283        S: Scope<Timestamp = G::Timestamp>,
1284    {
1285        let mut collection_scope = collection.scope();
1286
1287        // we must have called this function with something to reduce
1288        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1289            self.error_logger().soft_panic_or_log(
1290                "Incorrect numbers of aggregates in accummulable reduction rendering",
1291                &format!(
1292                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1293                    full_aggrs.len(),
1294                    simple_aggrs.len(),
1295                    distinct_aggrs.len(),
1296                ),
1297            );
1298        }
1299
1300        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1301        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1302        // Other aggregations can be directly moved in to the `diff` field.
1303        //
1304        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1305        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1306        // generally the count, and then two aggregation-specific values. The size could be
1307        // reduced if we want to specialize for the aggregations.
1308
1309        // Instantiate a default vector for diffs with the correct types at each
1310        // position.
1311        let zero_diffs: (Vec<_>, Diff) = (
1312            full_aggrs
1313                .iter()
1314                .map(|f| accumulable_zero(&f.func))
1315                .collect(),
1316            Diff::ZERO,
1317        );
1318
1319        let mut to_aggregate = Vec::new();
1320        if simple_aggrs.len() > 0 {
1321            // First, collect all non-distinct aggregations in one pass.
1322            let collection = collection.clone();
1323            let easy_cases = collection.explode_one({
1324                let zero_diffs = zero_diffs.clone();
1325                move |(key, row)| {
1326                    let mut diffs = zero_diffs.clone();
1327                    // Try to unpack only the datums we need. Unfortunately, since we
1328                    // can't random access into a Row, we have to iterate through one by one.
1329                    // TODO: Even though we don't have random access, we could still avoid unpacking
1330                    // everything that we don't care about, and it might be worth it to extend the
1331                    // Row API to do that.
1332                    let mut row_iter = row.iter().enumerate();
1333                    for (datum_index, aggr) in simple_aggrs.iter() {
1334                        let mut datum = row_iter.next().unwrap();
1335                        while datum_index != &datum.0 {
1336                            datum = row_iter.next().unwrap();
1337                        }
1338                        let datum = datum.1;
1339                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1340                        diffs.1 = Diff::ONE;
1341                    }
1342                    ((key, ()), diffs)
1343                }
1344            });
1345            to_aggregate.push(easy_cases);
1346        }
1347
1348        // Next, collect all aggregations that require distinctness.
1349        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1350            let pairer = Pairer::new(key_arity);
1351            let collection = collection
1352                .clone()
1353                .map(move |(key, row)| {
1354                    let value = row.iter().nth(datum_index).unwrap();
1355                    (pairer.merge(&key, std::iter::once(value)), ())
1356                })
1357                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1358                    "Arranged Accumulable Distinct [val: empty]",
1359                )
1360                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1361                    "Reduced Accumulable Distinct [val: empty]",
1362                    move |_k, _s, t| t.push(((), Diff::ONE)),
1363                )
1364                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1365                .explode_one({
1366                    let zero_diffs = zero_diffs.clone();
1367                    move |(key, row)| {
1368                        let datum = row.iter().next().unwrap();
1369                        let mut diffs = zero_diffs.clone();
1370                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1371                        diffs.1 = Diff::ONE;
1372                        ((key, ()), diffs)
1373                    }
1374                });
1375            to_aggregate.push(collection);
1376        }
1377
1378        // now concatenate, if necessary, multiple aggregations
1379        let collection = if to_aggregate.len() == 1 {
1380            to_aggregate.remove(0)
1381        } else {
1382            differential_dataflow::collection::concatenate(&mut collection_scope, to_aggregate)
1383        };
1384
1385        // Allocations for the two closures.
1386        let mut datums1 = DatumVec::new();
1387        let mut datums2 = DatumVec::new();
1388        let mfp_after1 = mfp_after.clone();
1389        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1390        let full_aggrs2 = full_aggrs.clone();
1391
1392        let error_logger = self.error_logger();
1393        let err_full_aggrs = full_aggrs.clone();
1394        let (arranged_output, arranged_errs) = collection
1395            .mz_arrange::<
1396                RowBatcher<_, _>,
1397                RowBuilder<_, _>,
1398                RowSpine<_, (Vec<Accum>, Diff)>,
1399            >("ArrangeAccumulable [val: empty]")
1400            .reduce_pair::<
1401                _,
1402                RowRowBuilder<_, _>,
1403                RowRowSpine<_, _>,
1404                _,
1405                RowErrBuilder<_, _>,
1406                RowErrSpine<_, _>,
1407            >(
1408                "ReduceAccumulable",
1409                "AccumulableErrorCheck",
1410                {
1411                    move |key, input, output| {
1412                        let (ref accums, total) = input[0].1;
1413
1414                        let temp_storage = RowArena::new();
1415                        let datum_iter = key.to_datum_iter();
1416                        let mut datums_local = datums1.borrow();
1417                        datums_local.extend(datum_iter);
1418                        let key_len = datums_local.len();
1419                        for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1420                            datums_local.push(finalize_accum(&aggr.func, accum, total));
1421                        }
1422
1423                        if let Some(row) = evaluate_mfp_after(
1424                            &mfp_after1,
1425                            &mut datums_local,
1426                            &temp_storage,
1427                            key_len,
1428                        ) {
1429                            output.push((row, Diff::ONE));
1430                        }
1431                    }
1432                },
1433                move |key, input, output| {
1434                    let (ref accums, total) = input[0].1;
1435                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1436                        // We first test here if inputs without net-positive records are present,
1437                        // producing an error to the logs and to the query output if that is the case.
1438                        if total == Diff::ZERO && !accum.is_zero() {
1439                            error_logger.log(
1440                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1441                                &format!("aggr={aggr:?}, accum={accum:?}"),
1442                            );
1443                            let key = key.to_row();
1444                            let message = format!(
1445                                "Invalid data in source, saw net-zero records for key {key} \
1446                                 with non-zero accumulation in accumulable aggregate"
1447                            );
1448                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1449                        }
1450                        match (&aggr.func, &accum) {
1451                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1452                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1453                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1454                                if accum.is_negative() {
1455                                    error_logger.log(
1456                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1457                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1458                                );
1459                                    let key = key.to_row();
1460                                    let message = format!(
1461                                        "Invalid data in source, saw negative accumulation with \
1462                                         unsigned type for key {key}"
1463                                    );
1464                                    let err =
1465                                        EvalError::Internal(message.into());
1466                                    output.push((err.into(), Diff::ONE));
1467                                }
1468                            }
1469                            _ => (), // no more errors to check for at this point!
1470                        }
1471                    }
1472
1473                    // If `mfp_after` can error, then evaluate it here.
1474                    let Some(mfp) = &mfp_after2 else { return };
1475                    let temp_storage = RowArena::new();
1476                    let datum_iter = key.to_datum_iter();
1477                    let mut datums_local = datums2.borrow();
1478                    datums_local.extend(datum_iter);
1479                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1480                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1481                    }
1482
1483                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1484                        output.push((e.into(), Diff::ONE));
1485                    }
1486                },
1487            );
1488        (
1489            arranged_output,
1490            arranged_errs.as_collection(|_key, error| error.clone()),
1491        )
1492    }
1493}
1494
1495/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1496/// containing key and aggregate values, then returns a result `Row` or `None`
1497/// if the MFP filters the result out.
1498fn evaluate_mfp_after<'a, 'b>(
1499    mfp_after: &'a Option<SafeMfpPlan>,
1500    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1501    temp_storage: &'a RowArena,
1502    key_len: usize,
1503) -> Option<Row> {
1504    let mut row_builder = SharedRow::get();
1505    // Apply MFP if it exists and pack a Row of
1506    // aggregate values from `datums_local`.
1507    if let Some(mfp) = mfp_after {
1508        // It must ignore errors here, but they are scanned
1509        // for elsewhere if the MFP can error.
1510        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1511            // The `mfp_after` must preserve the key columns,
1512            // so we can skip them to form aggregation results.
1513            Some(row_builder.pack_using(iter.skip(key_len)))
1514        } else {
1515            None
1516        }
1517    } else {
1518        Some(row_builder.pack_using(&datums_local[key_len..]))
1519    }
1520}
1521
1522fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1523    match aggr_func {
1524        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1525            trues: Diff::ZERO,
1526            falses: Diff::ZERO,
1527        },
1528        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1529            accum: AccumCount::ZERO,
1530            pos_infs: Diff::ZERO,
1531            neg_infs: Diff::ZERO,
1532            nans: Diff::ZERO,
1533            non_nulls: Diff::ZERO,
1534        },
1535        AggregateFunc::SumNumeric => Accum::Numeric {
1536            accum: OrderedDecimal(NumericAgg::zero()),
1537            pos_infs: Diff::ZERO,
1538            neg_infs: Diff::ZERO,
1539            nans: Diff::ZERO,
1540            non_nulls: Diff::ZERO,
1541        },
1542        _ => Accum::SimpleNumber {
1543            accum: AccumCount::ZERO,
1544            non_nulls: Diff::ZERO,
1545        },
1546    }
1547}
1548
1549static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1550
1551fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1552    match aggregate_func {
1553        AggregateFunc::Count => Accum::SimpleNumber {
1554            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1555            non_nulls: if datum.is_null() {
1556                Diff::ZERO
1557            } else {
1558                Diff::ONE
1559            },
1560        },
1561        AggregateFunc::Any | AggregateFunc::All => match datum {
1562            Datum::True => Accum::Bool {
1563                trues: Diff::ONE,
1564                falses: Diff::ZERO,
1565            },
1566            Datum::Null => Accum::Bool {
1567                trues: Diff::ZERO,
1568                falses: Diff::ZERO,
1569            },
1570            Datum::False => Accum::Bool {
1571                trues: Diff::ZERO,
1572                falses: Diff::ONE,
1573            },
1574            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1575        },
1576        AggregateFunc::Dummy => match datum {
1577            Datum::Dummy => Accum::SimpleNumber {
1578                accum: AccumCount::ZERO,
1579                non_nulls: Diff::ZERO,
1580            },
1581            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1582        },
1583        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1584            let n = match datum {
1585                Datum::Float32(n) => f64::from(*n),
1586                Datum::Float64(n) => *n,
1587                Datum::Null => 0f64,
1588                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1589            };
1590
1591            let nans = Diff::from(n.is_nan());
1592            let pos_infs = Diff::from(n == f64::INFINITY);
1593            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1594            let non_nulls = Diff::from(datum != Datum::Null);
1595
1596            // Map the floating point value onto a fixed precision domain
1597            // All special values should map to zero, since they are tracked separately
1598            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1599                AccumCount::ZERO
1600            } else {
1601                // This operation will truncate to i128::MAX if out of range.
1602                // TODO(benesch): rewrite to avoid `as`.
1603                #[allow(clippy::as_conversions)]
1604                { (n * *FLOAT_SCALE) as i128 }.into()
1605            };
1606
1607            Accum::Float {
1608                accum,
1609                pos_infs,
1610                neg_infs,
1611                nans,
1612                non_nulls,
1613            }
1614        }
1615        AggregateFunc::SumNumeric => match datum {
1616            Datum::Numeric(n) => {
1617                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1618                    if n.0.is_negative() {
1619                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1620                    } else {
1621                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1622                    }
1623                } else if n.0.is_nan() {
1624                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1625                } else {
1626                    // Take a narrow decimal (datum) into a wide decimal
1627                    // (aggregator).
1628                    let mut cx_agg = numeric::cx_agg();
1629                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1630                };
1631
1632                Accum::Numeric {
1633                    accum: OrderedDecimal(accum),
1634                    pos_infs,
1635                    neg_infs,
1636                    nans,
1637                    non_nulls: Diff::ONE,
1638                }
1639            }
1640            Datum::Null => Accum::Numeric {
1641                accum: OrderedDecimal(NumericAgg::zero()),
1642                pos_infs: Diff::ZERO,
1643                neg_infs: Diff::ZERO,
1644                nans: Diff::ZERO,
1645                non_nulls: Diff::ZERO,
1646            },
1647            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1648        },
1649        _ => {
1650            // Other accumulations need to disentangle the accumulable
1651            // value from its NULL-ness, which is not quite as easily
1652            // accumulated.
1653            match datum {
1654                Datum::Int16(i) => Accum::SimpleNumber {
1655                    accum: i.into(),
1656                    non_nulls: Diff::ONE,
1657                },
1658                Datum::Int32(i) => Accum::SimpleNumber {
1659                    accum: i.into(),
1660                    non_nulls: Diff::ONE,
1661                },
1662                Datum::Int64(i) => Accum::SimpleNumber {
1663                    accum: i.into(),
1664                    non_nulls: Diff::ONE,
1665                },
1666                Datum::UInt16(u) => Accum::SimpleNumber {
1667                    accum: u.into(),
1668                    non_nulls: Diff::ONE,
1669                },
1670                Datum::UInt32(u) => Accum::SimpleNumber {
1671                    accum: u.into(),
1672                    non_nulls: Diff::ONE,
1673                },
1674                Datum::UInt64(u) => Accum::SimpleNumber {
1675                    accum: u.into(),
1676                    non_nulls: Diff::ONE,
1677                },
1678                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1679                    accum: u64::from(t).into(),
1680                    non_nulls: Diff::ONE,
1681                },
1682                Datum::Null => Accum::SimpleNumber {
1683                    accum: AccumCount::ZERO,
1684                    non_nulls: Diff::ZERO,
1685                },
1686                x => panic!("Accumulating non-integer data: {x:?}"),
1687            }
1688        }
1689    }
1690}
1691
1692fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1693    // The finished value depends on the aggregation function in a variety of ways.
1694    // For all aggregates but count, if only null values were
1695    // accumulated, then the output is null.
1696    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1697        Datum::Null
1698    } else {
1699        match (&aggr_func, &accum) {
1700            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1701                Datum::Int64(non_nulls.into_inner())
1702            }
1703            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1704                // If any false, else if all true, else must be no false and some nulls.
1705                if falses.is_positive() {
1706                    Datum::False
1707                } else if *trues == total {
1708                    Datum::True
1709                } else {
1710                    Datum::Null
1711                }
1712            }
1713            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1714                // If any true, else if all false, else must be no true and some nulls.
1715                if trues.is_positive() {
1716                    Datum::True
1717                } else if *falses == total {
1718                    Datum::False
1719                } else {
1720                    Datum::Null
1721                }
1722            }
1723            (AggregateFunc::Dummy, _) => Datum::Dummy,
1724            // If any non-nulls, just report the aggregate.
1725            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1726            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1727                // This conversion is safe, as long as we have less than 2^32
1728                // summands.
1729                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1730                // If so, rewrite to avoid `as`.
1731                #[allow(clippy::as_conversions)]
1732                Datum::Int64(accum.into_inner() as i64)
1733            }
1734            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1735            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1736            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1737                if !accum.is_negative() {
1738                    // Our semantics of overflow are not clearly articulated wrt.
1739                    // unsigned vs. signed types (database-issues#5172). We adopt an
1740                    // unsigned wrapping behavior to match what we do above for
1741                    // signed types.
1742                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1743                    #[allow(clippy::as_conversions)]
1744                    Datum::UInt64(accum.into_inner() as u64)
1745                } else {
1746                    // Note that we return a value here, but an error in the other
1747                    // operator of the reduce_pair. Therefore, we expect that this
1748                    // value will never be exposed as an output.
1749                    Datum::Null
1750                }
1751            }
1752            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1753                if !accum.is_negative() {
1754                    Datum::from(*accum)
1755                } else {
1756                    // Note that we return a value here, but an error in the other
1757                    // operator of the reduce_pair. Therefore, we expect that this
1758                    // value will never be exposed as an output.
1759                    Datum::Null
1760                }
1761            }
1762            (
1763                AggregateFunc::SumFloat32,
1764                Accum::Float {
1765                    accum,
1766                    pos_infs,
1767                    neg_infs,
1768                    nans,
1769                    non_nulls: _,
1770                },
1771            ) => {
1772                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1773                    // NaNs are NaNs and cases where we've seen a
1774                    // mixture of positive and negative infinities.
1775                    Datum::from(f32::NAN)
1776                } else if pos_infs.is_positive() {
1777                    Datum::from(f32::INFINITY)
1778                } else if neg_infs.is_positive() {
1779                    Datum::from(f32::NEG_INFINITY)
1780                } else {
1781                    // TODO(benesch): remove potentially dangerous usage of `as`.
1782                    #[allow(clippy::as_conversions)]
1783                    {
1784                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1785                    }
1786                }
1787            }
1788            (
1789                AggregateFunc::SumFloat64,
1790                Accum::Float {
1791                    accum,
1792                    pos_infs,
1793                    neg_infs,
1794                    nans,
1795                    non_nulls: _,
1796                },
1797            ) => {
1798                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1799                    // NaNs are NaNs and cases where we've seen a
1800                    // mixture of positive and negative infinities.
1801                    Datum::from(f64::NAN)
1802                } else if pos_infs.is_positive() {
1803                    Datum::from(f64::INFINITY)
1804                } else if neg_infs.is_positive() {
1805                    Datum::from(f64::NEG_INFINITY)
1806                } else {
1807                    // TODO(benesch): remove potentially dangerous usage of `as`.
1808                    #[allow(clippy::as_conversions)]
1809                    {
1810                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1811                    }
1812                }
1813            }
1814            (
1815                AggregateFunc::SumNumeric,
1816                Accum::Numeric {
1817                    accum,
1818                    pos_infs,
1819                    neg_infs,
1820                    nans,
1821                    non_nulls: _,
1822                },
1823            ) => {
1824                let mut cx_datum = numeric::cx_datum();
1825                let d = cx_datum.to_width(accum.0);
1826                // Take a wide decimal (aggregator) into a
1827                // narrow decimal (datum). If this operation
1828                // overflows the datum, this new value will be
1829                // +/- infinity. However, the aggregator tracks
1830                // the amount of overflow, making it invertible.
1831                let inf_d = d.is_infinite();
1832                let neg_d = d.is_negative();
1833                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1834                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1835                if nans.is_positive() || (pos_inf && neg_inf) {
1836                    // NaNs are NaNs and cases where we've seen a
1837                    // mixture of positive and negative infinities.
1838                    Datum::from(Numeric::nan())
1839                } else if pos_inf {
1840                    Datum::from(Numeric::infinity())
1841                } else if neg_inf {
1842                    let mut cx = numeric::cx_datum();
1843                    let mut d = Numeric::infinity();
1844                    cx.neg(&mut d);
1845                    Datum::from(d)
1846                } else {
1847                    Datum::from(d)
1848                }
1849            }
1850            _ => panic!(
1851                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1852                aggr_func
1853            ),
1854        }
1855    }
1856}
1857
1858/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1859type AccumCount = mz_ore::Overflowing<i128>;
1860
1861/// Accumulates values for the various types of accumulable aggregations.
1862///
1863/// We assume that there are not more than 2^32 elements for the aggregation.
1864/// Thus we can perform a summation over i32 in an i64 accumulator
1865/// and not worry about exceeding its bounds.
1866///
1867/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1868/// point representation has less precision than a double. It is entirely possible
1869/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1870/// to preserve group guarantees.
1871#[derive(
1872    Debug,
1873    Clone,
1874    Copy,
1875    PartialEq,
1876    Eq,
1877    PartialOrd,
1878    Ord,
1879    Serialize,
1880    Deserialize
1881)]
1882enum Accum {
1883    /// Accumulates boolean values.
1884    Bool {
1885        /// The number of `true` values observed.
1886        trues: Diff,
1887        /// The number of `false` values observed.
1888        falses: Diff,
1889    },
1890    /// Accumulates simple numeric values.
1891    SimpleNumber {
1892        /// The accumulation of all non-NULL values observed.
1893        accum: AccumCount,
1894        /// The number of non-NULL values observed.
1895        non_nulls: Diff,
1896    },
1897    /// Accumulates float values.
1898    Float {
1899        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
1900        /// preserve associativity and commutativity
1901        accum: AccumCount,
1902        /// Counts +inf
1903        pos_infs: Diff,
1904        /// Counts -inf
1905        neg_infs: Diff,
1906        /// Counts NaNs
1907        nans: Diff,
1908        /// Counts non-NULL values
1909        non_nulls: Diff,
1910    },
1911    /// Accumulates arbitrary precision decimals.
1912    Numeric {
1913        /// Accumulates non-special values
1914        accum: OrderedDecimal<NumericAgg>,
1915        /// Counts +inf
1916        pos_infs: Diff,
1917        /// Counts -inf
1918        neg_infs: Diff,
1919        /// Counts NaNs
1920        nans: Diff,
1921        /// Counts non-NULL values
1922        non_nulls: Diff,
1923    },
1924}
1925
1926impl IsZero for Accum {
1927    fn is_zero(&self) -> bool {
1928        match self {
1929            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1930            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1931            Accum::Float {
1932                accum,
1933                pos_infs,
1934                neg_infs,
1935                nans,
1936                non_nulls,
1937            } => {
1938                accum.is_zero()
1939                    && pos_infs.is_zero()
1940                    && neg_infs.is_zero()
1941                    && nans.is_zero()
1942                    && non_nulls.is_zero()
1943            }
1944            Accum::Numeric {
1945                accum,
1946                pos_infs,
1947                neg_infs,
1948                nans,
1949                non_nulls,
1950            } => {
1951                accum.0.is_zero()
1952                    && pos_infs.is_zero()
1953                    && neg_infs.is_zero()
1954                    && nans.is_zero()
1955                    && non_nulls.is_zero()
1956            }
1957        }
1958    }
1959}
1960
1961impl Semigroup for Accum {
1962    fn plus_equals(&mut self, other: &Accum) {
1963        match (&mut *self, other) {
1964            (
1965                Accum::Bool { trues, falses },
1966                Accum::Bool {
1967                    trues: other_trues,
1968                    falses: other_falses,
1969                },
1970            ) => {
1971                *trues += other_trues;
1972                *falses += other_falses;
1973            }
1974            (
1975                Accum::SimpleNumber { accum, non_nulls },
1976                Accum::SimpleNumber {
1977                    accum: other_accum,
1978                    non_nulls: other_non_nulls,
1979                },
1980            ) => {
1981                *accum += other_accum;
1982                *non_nulls += other_non_nulls;
1983            }
1984            (
1985                Accum::Float {
1986                    accum,
1987                    pos_infs,
1988                    neg_infs,
1989                    nans,
1990                    non_nulls,
1991                },
1992                Accum::Float {
1993                    accum: other_accum,
1994                    pos_infs: other_pos_infs,
1995                    neg_infs: other_neg_infs,
1996                    nans: other_nans,
1997                    non_nulls: other_non_nulls,
1998                },
1999            ) => {
2000                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2001                    warn!("Float accumulator overflow. Incorrect results possible");
2002                    accum.wrapping_add(*other_accum)
2003                });
2004                *pos_infs += other_pos_infs;
2005                *neg_infs += other_neg_infs;
2006                *nans += other_nans;
2007                *non_nulls += other_non_nulls;
2008            }
2009            (
2010                Accum::Numeric {
2011                    accum,
2012                    pos_infs,
2013                    neg_infs,
2014                    nans,
2015                    non_nulls,
2016                },
2017                Accum::Numeric {
2018                    accum: other_accum,
2019                    pos_infs: other_pos_infs,
2020                    neg_infs: other_neg_infs,
2021                    nans: other_nans,
2022                    non_nulls: other_non_nulls,
2023                },
2024            ) => {
2025                let mut cx_agg = numeric::cx_agg();
2026                cx_agg.add(&mut accum.0, &other_accum.0);
2027                // `rounded` signals we have exceeded the aggregator's max
2028                // precision, which means we've lost commutativity and
2029                // associativity; nothing to be done here, so panic. For more
2030                // context, see the DEC_Rounded definition at
2031                // http://speleotrove.com/decimal/dncont.html
2032                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2033                // Reduce to reclaim unused decimal precision. Note that this
2034                // reduction must happen somewhere to make the following
2035                // invertible:
2036                // ```
2037                // CREATE TABLE a (a numeric);
2038                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2039                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2040                // ```
2041                // This will now return infinity. However, we can retract the
2042                // value that blew up its precision:
2043                // ```
2044                // INSERT INTO a VALUES ('-9e-39');
2045                // ```
2046                // This leaves `t`'s aggregator with a value of 9e39. However,
2047                // without doing a reduction, `libdecnum` will store the value
2048                // as 9e39+0e-39, which still exceeds the narrower context's
2049                // precision. By doing the reduction, we can "reclaim" the 39
2050                // digits of precision.
2051                cx_agg.reduce(&mut accum.0);
2052                *pos_infs += other_pos_infs;
2053                *neg_infs += other_neg_infs;
2054                *nans += other_nans;
2055                *non_nulls += other_non_nulls;
2056            }
2057            (l, r) => unreachable!(
2058                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2059            ),
2060        }
2061    }
2062}
2063
2064impl Multiply<Diff> for Accum {
2065    type Output = Accum;
2066
2067    fn multiply(self, factor: &Diff) -> Accum {
2068        let factor = *factor;
2069        match self {
2070            Accum::Bool { trues, falses } => Accum::Bool {
2071                trues: trues * factor,
2072                falses: falses * factor,
2073            },
2074            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2075                accum: accum * AccumCount::from(factor),
2076                non_nulls: non_nulls * factor,
2077            },
2078            Accum::Float {
2079                accum,
2080                pos_infs,
2081                neg_infs,
2082                nans,
2083                non_nulls,
2084            } => Accum::Float {
2085                accum: accum
2086                    .checked_mul(AccumCount::from(factor))
2087                    .unwrap_or_else(|| {
2088                        warn!("Float accumulator overflow. Incorrect results possible");
2089                        accum.wrapping_mul(AccumCount::from(factor))
2090                    }),
2091                pos_infs: pos_infs * factor,
2092                neg_infs: neg_infs * factor,
2093                nans: nans * factor,
2094                non_nulls: non_nulls * factor,
2095            },
2096            Accum::Numeric {
2097                accum,
2098                pos_infs,
2099                neg_infs,
2100                nans,
2101                non_nulls,
2102            } => {
2103                let mut cx = numeric::cx_agg();
2104                let mut f = NumericAgg::from(factor.into_inner());
2105                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2106                // always be an integer, i.e. we are never increasing the
2107                // values' scale.
2108                cx.mul(&mut f, &accum.0);
2109                // `rounded` signals we have exceeded the aggregator's max
2110                // precision, which means we've lost commutativity and
2111                // associativity; nothing to be done here, so panic. For more
2112                // context, see the DEC_Rounded definition at
2113                // http://speleotrove.com/decimal/dncont.html
2114                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2115                Accum::Numeric {
2116                    accum: OrderedDecimal(f),
2117                    pos_infs: pos_infs * factor,
2118                    neg_infs: neg_infs * factor,
2119                    nans: nans * factor,
2120                    non_nulls: non_nulls * factor,
2121                }
2122            }
2123        }
2124    }
2125}
2126
2127impl Columnation for Accum {
2128    type InnerRegion = CopyRegion<Self>;
2129}
2130
2131/// Monoids for in-place compaction of monotonic streams.
2132mod monoids {
2133
2134    // We can improve the performance of some aggregations through the use of algebra.
2135    // In particular, we can move some of the aggregations in to the `diff` field of
2136    // updates, by changing `diff` from integers to a different algebraic structure.
2137    //
2138    // The one we use is called a "semigroup", and it means that the structure has a
2139    // symmetric addition operator. The trait we use also allows the semigroup elements
2140    // to present as "zero", meaning they always act as the identity under +. Here,
2141    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2142    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2143    // of DD, this Semigroup should _not_ have a zero.
2144    //
2145    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2146    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2147    // assumes this.
2148
2149    use differential_dataflow::containers::{Columnation, Region};
2150    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2151    use mz_expr::AggregateFunc;
2152    use mz_ore::soft_panic_or_log;
2153    use mz_repr::{Datum, Diff, Row};
2154    use serde::{Deserialize, Serialize};
2155
2156    /// A monoid containing a single-datum row.
2157    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2158    pub enum ReductionMonoid {
2159        Min(Row),
2160        Max(Row),
2161    }
2162
2163    impl ReductionMonoid {
2164        pub fn finalize(&self) -> &Row {
2165            use ReductionMonoid::*;
2166            match self {
2167                Min(row) | Max(row) => row,
2168            }
2169        }
2170    }
2171
2172    impl Clone for ReductionMonoid {
2173        fn clone(&self) -> Self {
2174            use ReductionMonoid::*;
2175            match self {
2176                Min(row) => Min(row.clone()),
2177                Max(row) => Max(row.clone()),
2178            }
2179        }
2180
2181        fn clone_from(&mut self, source: &Self) {
2182            use ReductionMonoid::*;
2183
2184            let mut row = std::mem::take(match self {
2185                Min(row) | Max(row) => row,
2186            });
2187
2188            let source_row = match source {
2189                Min(row) | Max(row) => row,
2190            };
2191
2192            row.clone_from(source_row);
2193
2194            match source {
2195                Min(_) => *self = Min(row),
2196                Max(_) => *self = Max(row),
2197            }
2198        }
2199    }
2200
2201    impl Multiply<Diff> for ReductionMonoid {
2202        type Output = Self;
2203
2204        fn multiply(self, factor: &Diff) -> Self {
2205            // Multiplication in ReductionMonoid is idempotent, and
2206            // its users must ascertain its monotonicity beforehand
2207            // (typically with ensure_monotonic) since it has no zero
2208            // value for us to use here.
2209            assert!(factor.is_positive());
2210            self
2211        }
2212    }
2213
2214    impl Semigroup for ReductionMonoid {
2215        fn plus_equals(&mut self, rhs: &Self) {
2216            match (self, rhs) {
2217                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2218                    let swap = {
2219                        let lhs_val = lhs.unpack_first();
2220                        let rhs_val = rhs.unpack_first();
2221                        // Datum::Null is the identity, not a small element.
2222                        match (lhs_val, rhs_val) {
2223                            (_, Datum::Null) => false,
2224                            (Datum::Null, _) => true,
2225                            (lhs, rhs) => rhs < lhs,
2226                        }
2227                    };
2228                    if swap {
2229                        lhs.clone_from(rhs);
2230                    }
2231                }
2232                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2233                    let swap = {
2234                        let lhs_val = lhs.unpack_first();
2235                        let rhs_val = rhs.unpack_first();
2236                        // Datum::Null is the identity, not a large element.
2237                        match (lhs_val, rhs_val) {
2238                            (_, Datum::Null) => false,
2239                            (Datum::Null, _) => true,
2240                            (lhs, rhs) => rhs > lhs,
2241                        }
2242                    };
2243                    if swap {
2244                        lhs.clone_from(rhs);
2245                    }
2246                }
2247                (lhs, rhs) => {
2248                    soft_panic_or_log!(
2249                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2250                    );
2251                }
2252            }
2253        }
2254    }
2255
2256    impl IsZero for ReductionMonoid {
2257        fn is_zero(&self) -> bool {
2258            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2259            // DD uses true results of this method to make stuff disappear. This makes sense when
2260            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2261            // We don't want funny stuff, like disappearing, happening to reduction results even
2262            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2263            false
2264        }
2265    }
2266
2267    impl Columnation for ReductionMonoid {
2268        type InnerRegion = ReductionMonoidRegion;
2269    }
2270
2271    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2272    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2273    /// the former for simplicity reasons.
2274    #[derive(Default)]
2275    pub struct ReductionMonoidRegion {
2276        inner: <Row as Columnation>::InnerRegion,
2277    }
2278
2279    impl Region for ReductionMonoidRegion {
2280        type Item = ReductionMonoid;
2281
2282        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2283            use ReductionMonoid::*;
2284            match item {
2285                Min(row) => Min(unsafe { self.inner.copy(row) }),
2286                Max(row) => Max(unsafe { self.inner.copy(row) }),
2287            }
2288        }
2289
2290        fn clear(&mut self) {
2291            self.inner.clear();
2292        }
2293
2294        fn reserve_items<'a, I>(&mut self, items: I)
2295        where
2296            Self: 'a,
2297            I: Iterator<Item = &'a Self::Item> + Clone,
2298        {
2299            self.inner
2300                .reserve_items(items.map(ReductionMonoid::finalize));
2301        }
2302
2303        fn reserve_regions<'a, I>(&mut self, regions: I)
2304        where
2305            Self: 'a,
2306            I: Iterator<Item = &'a Self> + Clone,
2307        {
2308            self.inner.reserve_regions(regions.map(|r| &r.inner));
2309        }
2310
2311        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2312            self.inner.heap_size(callback);
2313        }
2314    }
2315
2316    /// Get the correct monoid implementation for a given aggregation function. Note that
2317    /// all hierarchical aggregation functions need to supply a monoid implementation.
2318    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2319        match func {
2320            AggregateFunc::MaxNumeric
2321            | AggregateFunc::MaxInt16
2322            | AggregateFunc::MaxInt32
2323            | AggregateFunc::MaxInt64
2324            | AggregateFunc::MaxUInt16
2325            | AggregateFunc::MaxUInt32
2326            | AggregateFunc::MaxUInt64
2327            | AggregateFunc::MaxMzTimestamp
2328            | AggregateFunc::MaxFloat32
2329            | AggregateFunc::MaxFloat64
2330            | AggregateFunc::MaxBool
2331            | AggregateFunc::MaxString
2332            | AggregateFunc::MaxDate
2333            | AggregateFunc::MaxTimestamp
2334            | AggregateFunc::MaxTimestampTz
2335            | AggregateFunc::MaxInterval
2336            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2337            AggregateFunc::MinNumeric
2338            | AggregateFunc::MinInt16
2339            | AggregateFunc::MinInt32
2340            | AggregateFunc::MinInt64
2341            | AggregateFunc::MinUInt16
2342            | AggregateFunc::MinUInt32
2343            | AggregateFunc::MinUInt64
2344            | AggregateFunc::MinMzTimestamp
2345            | AggregateFunc::MinFloat32
2346            | AggregateFunc::MinFloat64
2347            | AggregateFunc::MinBool
2348            | AggregateFunc::MinString
2349            | AggregateFunc::MinDate
2350            | AggregateFunc::MinTimestamp
2351            | AggregateFunc::MinTimestampTz
2352            | AggregateFunc::MinInterval
2353            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2354            AggregateFunc::SumInt16
2355            | AggregateFunc::SumInt32
2356            | AggregateFunc::SumInt64
2357            | AggregateFunc::SumUInt16
2358            | AggregateFunc::SumUInt32
2359            | AggregateFunc::SumUInt64
2360            | AggregateFunc::SumFloat32
2361            | AggregateFunc::SumFloat64
2362            | AggregateFunc::SumNumeric
2363            | AggregateFunc::Count
2364            | AggregateFunc::Any
2365            | AggregateFunc::All
2366            | AggregateFunc::Dummy
2367            | AggregateFunc::JsonbAgg { .. }
2368            | AggregateFunc::JsonbObjectAgg { .. }
2369            | AggregateFunc::MapAgg { .. }
2370            | AggregateFunc::ArrayConcat { .. }
2371            | AggregateFunc::ListConcat { .. }
2372            | AggregateFunc::StringAgg { .. }
2373            | AggregateFunc::RowNumber { .. }
2374            | AggregateFunc::Rank { .. }
2375            | AggregateFunc::DenseRank { .. }
2376            | AggregateFunc::LagLead { .. }
2377            | AggregateFunc::FirstValue { .. }
2378            | AggregateFunc::LastValue { .. }
2379            | AggregateFunc::WindowAggregate { .. }
2380            | AggregateFunc::FusedValueWindowFunc { .. }
2381            | AggregateFunc::FusedWindowAggregate { .. } => None,
2382        }
2383    }
2384}
2385
2386mod window_agg_helpers {
2387    use crate::render::reduce::*;
2388
2389    /// TODO: It would be better for performance to do the branching that is in the methods of this
2390    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2391    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2392    /// directly.
2393    pub enum OneByOneAggrImpls {
2394        Accumulable(AccumulableOneByOneAggr),
2395        Hierarchical(HierarchicalOneByOneAggr),
2396        Basic(mz_expr::NaiveOneByOneAggr),
2397    }
2398
2399    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2400        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2401            match reduction_type(agg) {
2402                ReductionType::Basic => {
2403                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2404                }
2405                ReductionType::Accumulable => {
2406                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2407                }
2408                ReductionType::Hierarchical => {
2409                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2410                }
2411            }
2412        }
2413
2414        fn give(&mut self, d: &Datum) {
2415            match self {
2416                OneByOneAggrImpls::Basic(i) => i.give(d),
2417                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2418                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2419            }
2420        }
2421
2422        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2423            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2424            match self {
2425                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2426                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2427                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2428            }
2429        }
2430    }
2431
2432    pub struct AccumulableOneByOneAggr {
2433        aggr_func: AggregateFunc,
2434        accum: Accum,
2435        total: Diff,
2436    }
2437
2438    impl AccumulableOneByOneAggr {
2439        fn new(aggr_func: &AggregateFunc) -> Self {
2440            AccumulableOneByOneAggr {
2441                aggr_func: aggr_func.clone(),
2442                accum: accumulable_zero(aggr_func),
2443                total: Diff::ZERO,
2444            }
2445        }
2446
2447        fn give(&mut self, d: &Datum) {
2448            self.accum
2449                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2450            self.total += Diff::ONE;
2451        }
2452
2453        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2454            temp_storage.make_datum(|packer| {
2455                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2456            })
2457        }
2458    }
2459
2460    pub struct HierarchicalOneByOneAggr {
2461        aggr_func: AggregateFunc,
2462        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2463        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2464        monoid: ReductionMonoid,
2465    }
2466
2467    impl HierarchicalOneByOneAggr {
2468        fn new(aggr_func: &AggregateFunc) -> Self {
2469            let mut row_buf = Row::default();
2470            row_buf.packer().push(Datum::Null);
2471            HierarchicalOneByOneAggr {
2472                aggr_func: aggr_func.clone(),
2473                monoid: get_monoid(row_buf, aggr_func)
2474                    .expect("aggr_func should be a hierarchical aggregation function"),
2475            }
2476        }
2477
2478        fn give(&mut self, d: &Datum) {
2479            let mut row_buf = Row::default();
2480            row_buf.packer().push(d);
2481            let m = get_monoid(row_buf, &self.aggr_func)
2482                .expect("aggr_func should be a hierarchical aggregation function");
2483            self.monoid.plus_equals(&m);
2484        }
2485
2486        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2487            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2488        }
2489    }
2490}