Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Data, VecCollection};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58    ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59    RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64    G: Scope,
65    G::Timestamp: MzTimestamp + Refines<T>,
66    T: MzTimestamp,
67{
68    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
69    /// minimize worst-case incremental update times and memory footprint.
70    pub fn render_reduce(
71        &self,
72        input_key: Option<Vec<MirScalarExpr>>,
73        input: CollectionBundle<G, T>,
74        key_val_plan: KeyValPlan,
75        reduce_plan: ReducePlan,
76        mfp_after: Option<MapFilterProject>,
77    ) -> CollectionBundle<G, T> {
78        // Convert `mfp_after` to an actionable plan.
79        let mfp_after = mfp_after.map(|m| {
80            m.into_plan()
81                .expect("MFP planning must succeed")
82                .into_nontemporal()
83                .expect("Fused Reduce MFPs do not have temporal predicates")
84        });
85
86        input.scope().region_named("Reduce", |inner| {
87            let KeyValPlan {
88                mut key_plan,
89                mut val_plan,
90            } = key_val_plan;
91            let key_arity = key_plan.projection.len();
92            let mut datums = DatumVec::new();
93
94            // Determine the columns we'll need from the row.
95            let mut demand = Vec::new();
96            demand.extend(key_plan.demand());
97            demand.extend(val_plan.demand());
98            demand.sort();
99            demand.dedup();
100
101            // remap column references to the subset we use.
102            let mut demand_map = BTreeMap::new();
103            for column in demand.iter() {
104                demand_map.insert(*column, demand_map.len());
105            }
106            let demand_map_len = demand_map.len();
107            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113                input_key.map(|k| (k, None)),
114                max_demand,
115                move |row_datums, time, diff| {
116                    let mut row_builder = SharedRow::get();
117                    let temp_storage = RowArena::new();
118
119                    let mut row_iter = row_datums.drain(..);
120                    let mut datums_local = datums.borrow();
121                    // Unpack only the demanded columns.
122                    for skip in skips.iter() {
123                        datums_local.push(row_iter.nth(*skip).unwrap());
124                    }
125
126                    // Evaluate the key expressions.
127                    let key =
128                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129                    let key = match key {
130                        Err(e) => {
131                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132                        }
133                        Ok(Some(key)) => key.clone(),
134                        Ok(None) => panic!("Row expected as no predicate was used"),
135                    };
136
137                    // Evaluate the value expressions.
138                    // The prior evaluation may have left additional columns we should delete.
139                    datums_local.truncate(skips.len());
140                    let val =
141                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142                    let val = match val {
143                        Err(e) => {
144                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145                        }
146                        Ok(Some(val)) => val.clone(),
147                        Ok(None) => panic!("Row expected as no predicate was used"),
148                    };
149
150                    Some((Ok((key, val)), time.clone(), diff.clone()))
151                },
152            );
153
154            // Demux out the potential errors from key and value selector evaluation.
155            type CB<T> = ConsolidatingContainerBuilder<T>;
156            let (ok, mut err) = key_val_input
157                .as_collection()
158                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160            err = err.concat(&err_input);
161
162            // Render the reduce plan
163            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164                .leave_region()
165        })
166    }
167
168    /// Render a dataflow based on the provided plan.
169    ///
170    /// The output will be an arrangements that looks the same as if
171    /// we just had a single reduce operator computing everything together, and
172    /// this arrangement can also be re-used.
173    fn render_reduce_plan<S>(
174        &self,
175        plan: ReducePlan,
176        collection: VecCollection<S, (Row, Row), Diff>,
177        err_input: VecCollection<S, DataflowError, Diff>,
178        key_arity: usize,
179        mfp_after: Option<SafeMfpPlan>,
180    ) -> CollectionBundle<S, T>
181    where
182        S: Scope<Timestamp = G::Timestamp>,
183    {
184        let mut errors = Default::default();
185        let arrangement =
186            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188        CollectionBundle::from_columns(
189            0..key_arity,
190            ArrangementFlavor::Local(
191                arrangement,
192                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193            ),
194        )
195    }
196
197    fn render_reduce_plan_inner<S>(
198        &self,
199        plan: ReducePlan,
200        collection: VecCollection<S, (Row, Row), Diff>,
201        errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
202        key_arity: usize,
203        mfp_after: Option<SafeMfpPlan>,
204    ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205    where
206        S: Scope<Timestamp = G::Timestamp>,
207    {
208        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
209        // not only values (database-issues#6658).
210        let arrangement = match plan {
211            // If we have no aggregations or just a single type of reduction, we
212            // can go ahead and render them directly.
213            ReducePlan::Distinct => {
214                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215                errors.push(errs);
216                arranged_output
217            }
218            ReducePlan::Accumulable(expr) => {
219                let (arranged_output, errs) =
220                    self.build_accumulable(collection, expr, key_arity, mfp_after);
221                errors.push(errs);
222                arranged_output
223            }
224            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226                errors.push(errs);
227                output
228            }
229            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231                errors.push(errs);
232                output
233            }
234            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235                expr,
236                fused_unnest_list,
237            })) => {
238                // Note that we skip validating for negative diffs when we have a fused unnest list,
239                // because this is already a CPU-intensive situation due to the non-incrementalness
240                // of window functions.
241                let validating = !fused_unnest_list;
242                let (output, errs) = self.build_basic_aggregate(
243                    collection,
244                    0,
245                    &expr,
246                    validating,
247                    key_arity,
248                    mfp_after,
249                    fused_unnest_list,
250                );
251                if validating {
252                    errors.push(errs.expect("validation should have occurred as it was requested"));
253                }
254                output
255            }
256            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257                let (output, errs) =
258                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259                errors.push(errs);
260                output
261            }
262        };
263        arrangement
264    }
265
266    /// Build the dataflow to compute the set of distinct keys.
267    fn build_distinct<S>(
268        &self,
269        collection: VecCollection<S, (Row, Row), Diff>,
270        mfp_after: Option<SafeMfpPlan>,
271    ) -> (
272        Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
273        VecCollection<S, DataflowError, Diff>,
274    )
275    where
276        S: Scope<Timestamp = G::Timestamp>,
277    {
278        let error_logger = self.error_logger();
279
280        // Allocations for the two closures.
281        let mut datums1 = DatumVec::new();
282        let mut datums2 = DatumVec::new();
283        let mfp_after1 = mfp_after.clone();
284        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
285
286        let (output, errors) = collection
287            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
288                "Arranged DistinctBy",
289            )
290            .reduce_pair::<
291                _,
292                RowRowBuilder<_, _>,
293                RowRowSpine<_, _>,
294                _,
295                RowErrBuilder<_, _>,
296                RowErrSpine<_, _>,
297            >(
298                "DistinctBy",
299                "DistinctByErrorCheck",
300                move |key, _input, output| {
301                    let temp_storage = RowArena::new();
302                    let mut datums_local = datums1.borrow();
303                    datums_local.extend(key.to_datum_iter());
304
305                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
306                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
307                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
308                    if mfp_after1
309                        .as_ref()
310                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
311                        .unwrap_or(Ok(true))
312                        == Ok(true)
313                    {
314                        // We're pushing a unit value here because the key is implicitly added by the
315                        // arrangement, and the permutation logic takes care of using the key part of the
316                        // output.
317                        output.push((Row::default(), Diff::ONE));
318                    }
319                },
320                move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
321                    for (_, count) in input.iter() {
322                        if count.is_positive() {
323                            continue;
324                        }
325                        let message = "Non-positive multiplicity in DistinctBy";
326                        error_logger.log(message, &format!("row={key:?}, count={count}"));
327                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
328                        return;
329                    }
330                    // If `mfp_after` can error, then evaluate it here.
331                    let Some(mfp) = &mfp_after2 else { return };
332                    let temp_storage = RowArena::new();
333                    let datum_iter = key.to_datum_iter();
334                    let mut datums_local = datums2.borrow();
335                    datums_local.extend(datum_iter);
336
337                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
338                        output.push((e.into(), Diff::ONE));
339                    }
340                },
341            );
342        (output, errors.as_collection(|_k, v| v.clone()))
343    }
344
345    /// Build the dataflow to compute and arrange multiple non-accumulable,
346    /// non-hierarchical aggregations on `input`.
347    ///
348    /// This function assumes that we are explicitly rendering multiple basic aggregations.
349    /// For each aggregate, we render a different reduce operator, and then fuse
350    /// results together into a final arrangement that presents all the results
351    /// in the order specified by `aggrs`.
352    fn build_basic_aggregates<S>(
353        &self,
354        input: VecCollection<S, (Row, Row), Diff>,
355        aggrs: Vec<AggregateExpr>,
356        key_arity: usize,
357        mfp_after: Option<SafeMfpPlan>,
358    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
359    where
360        S: Scope<Timestamp = G::Timestamp>,
361    {
362        // We are only using this function to render multiple basic aggregates and
363        // stitch them together. If that's not true we should complain.
364        if aggrs.len() <= 1 {
365            self.error_logger().soft_panic_or_log(
366                "Too few aggregations when building basic aggregates",
367                &format!("len={}", aggrs.len()),
368            )
369        }
370        let mut err_output = None;
371        let mut to_collect = Vec::new();
372        for (index, aggr) in aggrs.into_iter().enumerate() {
373            let (result, errs) = self.build_basic_aggregate(
374                input.clone(),
375                index,
376                &aggr,
377                err_output.is_none(),
378                key_arity,
379                None,
380                false,
381            );
382            if errs.is_some() {
383                err_output = errs
384            }
385            to_collect
386                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
387        }
388
389        // Allocations for the two closures.
390        let mut datums1 = DatumVec::new();
391        let mut datums2 = DatumVec::new();
392        let mfp_after1 = mfp_after.clone();
393        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
394
395        let arranged =
396            differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
397                .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
398                "Arranged ReduceFuseBasic input",
399            );
400
401        let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
402            "ReduceFuseBasic",
403            {
404                move |key, input, output| {
405                    let temp_storage = RowArena::new();
406                    let datum_iter = key.to_datum_iter();
407                    let mut datums_local = datums1.borrow();
408                    datums_local.extend(datum_iter);
409                    let key_len = datums_local.len();
410
411                    for ((_, row), _) in input.iter() {
412                        datums_local.push(row.unpack_first());
413                    }
414
415                    if let Some(row) =
416                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
417                    {
418                        output.push((row, Diff::ONE));
419                    }
420                }
421            },
422        );
423        // If `mfp_after` can error, then we need to render a paired reduction
424        // to scan for these potential errors. Note that we cannot directly use
425        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
426        // conditionally render the second component of the reduction pair.
427        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
428        if let Some(mfp) = mfp_after2 {
429            let mfp_errs = arranged
430                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
431                    "ReduceFuseBasic Error Check",
432                    move |key, input, output| {
433                        // Since negative accumulations are checked in at least one component
434                        // aggregate, we only need to look for MFP errors here.
435                        let temp_storage = RowArena::new();
436                        let datum_iter = key.to_datum_iter();
437                        let mut datums_local = datums2.borrow();
438                        datums_local.extend(datum_iter);
439
440                        for ((_, row), _) in input.iter() {
441                            datums_local.push(row.unpack_first());
442                        }
443
444                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
445                            output.push((e.into(), Diff::ONE));
446                        }
447                    },
448                )
449                .as_collection(|_, v| v.clone());
450            (output, validation_errs.concat(&mfp_errs))
451        } else {
452            (output, validation_errs)
453        }
454    }
455
456    /// Build the dataflow to compute a single basic aggregation.
457    ///
458    /// This method also applies distinctness if required.
459    fn build_basic_aggregate<S>(
460        &self,
461        input: VecCollection<S, (Row, Row), Diff>,
462        index: usize,
463        aggr: &AggregateExpr,
464        validating: bool,
465        key_arity: usize,
466        mfp_after: Option<SafeMfpPlan>,
467        fused_unnest_list: bool,
468    ) -> (
469        RowRowArrangement<S>,
470        Option<VecCollection<S, DataflowError, Diff>>,
471    )
472    where
473        S: Scope<Timestamp = G::Timestamp>,
474    {
475        let AggregateExpr {
476            func,
477            expr: _,
478            distinct,
479        } = aggr.clone();
480
481        // Extract the value we were asked to aggregate over.
482        let mut partial = input.map(move |(key, row)| {
483            let mut row_builder = SharedRow::get();
484            let value = row.iter().nth(index).unwrap();
485            row_builder.packer().push(value);
486            (key, row_builder.clone())
487        });
488
489        let mut err_output = None;
490
491        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
492        if distinct {
493            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
494            let pairer = Pairer::new(key_arity);
495            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
496            if validating {
497                let (oks, errs) = self
498                    .build_reduce_inaccumulable_distinct::<
499                        _,
500                        RowValBuilder<Result<(), String>, _, _>,
501                        RowValSpine<Result<(), String>, _, _>,
502                    >(keyed, None)
503                    .as_collection(|k, v| {
504                        (
505                            k.to_row(),
506                            v.as_ref()
507                                .map(|&()| ())
508                                .map_err(|m| m.as_str().into()),
509                        )
510                    })
511                    .map_fallible::<
512                        CapacityContainerBuilder<_>,
513                        CapacityContainerBuilder<_>,
514                        _,
515                        _,
516                        _,
517                    >(
518                        "Demux Errors",
519                        move |(key_val, result)| match result {
520                            Ok(()) => Ok(pairer.split(&key_val)),
521                            Err(m) => {
522                                Err(EvalError::Internal(m).into())
523                            }
524                        },
525                    );
526                err_output = Some(errs);
527                partial = oks;
528            } else {
529                partial = self
530                    .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
531                        keyed,
532                        Some(" [val: empty]"),
533                    )
534                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
535            }
536        }
537
538        // Allocations for the two closures.
539        let mut datums1 = DatumVec::new();
540        let mut datums2 = DatumVec::new();
541        let mut datums_key_1 = DatumVec::new();
542        let mut datums_key_2 = DatumVec::new();
543        let mfp_after1 = mfp_after.clone();
544        let func2 = func.clone();
545
546        let name = if !fused_unnest_list {
547            "ReduceInaccumulable"
548        } else {
549            "FusedReduceUnnestList"
550        };
551        let arranged = partial
552            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
553                "Arranged {name}"
554            ));
555        let oks = if !fused_unnest_list {
556            arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
557                move |key, source, target| {
558                    // We respect the multiplicity here (unlike in hierarchical aggregation)
559                    // because we don't know that the aggregation method is not sensitive
560                    // to the number of records.
561                    let iter = source.iter().flat_map(|(v, w)| {
562                        // Note that in the non-positive case, this is wrong, but harmless because
563                        // our other reduction will produce an error.
564                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
565                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
566                    });
567
568                    let temp_storage = RowArena::new();
569                    let datum_iter = key.to_datum_iter();
570                    let mut datums_local = datums1.borrow();
571                    datums_local.extend(datum_iter);
572                    let key_len = datums_local.len();
573                    datums_local.push(
574                        // Note that this is not necessarily a window aggregation, in which case
575                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
576                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
577                            iter,
578                            &temp_storage,
579                        ),
580                    );
581
582                    if let Some(row) =
583                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
584                    {
585                        target.push((row, Diff::ONE));
586                    }
587                }
588            })
589        } else {
590            arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
591                move |key, source, target| {
592                    // This part is the same as in the `!fused_unnest_list` if branch above.
593                    let iter = source.iter().flat_map(|(v, w)| {
594                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
595                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
596                    });
597
598                    // This is the part that is specific to the `fused_unnest_list` branch.
599                    let temp_storage = RowArena::new();
600                    let mut datums_local = datums_key_1.borrow();
601                    datums_local.extend(key.to_datum_iter());
602                    let key_len = datums_local.len();
603                    for datum in func
604                        .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
605                            iter,
606                            &temp_storage,
607                        )
608                    {
609                        datums_local.truncate(key_len);
610                        datums_local.push(datum);
611                        if let Some(row) = evaluate_mfp_after(
612                            &mfp_after1,
613                            &mut datums_local,
614                            &temp_storage,
615                            key_len,
616                        ) {
617                            target.push((row, Diff::ONE));
618                        }
619                    }
620                }
621            })
622        };
623
624        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
625        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
626        // rationale around using a second reduction here.
627        let must_validate = validating && err_output.is_none();
628        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
629        if must_validate || mfp_after2.is_some() {
630            let error_logger = self.error_logger();
631
632            let errs = if !fused_unnest_list {
633                arranged
634                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
635                        &format!("{name} Error Check"),
636                        move |key, source, target| {
637                            // Negative counts would be surprising, but until we are 100% certain we won't
638                            // see them, we should report when we do. We may want to bake even more info
639                            // in here in the future.
640                            if must_validate {
641                                for (value, count) in source.iter() {
642                                    if count.is_positive() {
643                                        continue;
644                                    }
645                                    let value = value.to_row();
646                                    let message =
647                                        "Non-positive accumulation in ReduceInaccumulable";
648                                    error_logger
649                                        .log(message, &format!("value={value:?}, count={count}"));
650                                    let err = EvalError::Internal(message.into());
651                                    target.push((err.into(), Diff::ONE));
652                                    return;
653                                }
654                            }
655
656                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
657                            let Some(mfp) = &mfp_after2 else { return };
658                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
659                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
660                                // This would ideally use `to_datum_iter` but we cannot as it needs to
661                                // borrow `v` and only presents datums with that lifetime, not any longer.
662                                std::iter::repeat(v.next().unwrap()).take(count)
663                            });
664
665                            let temp_storage = RowArena::new();
666                            let datum_iter = key.to_datum_iter();
667                            let mut datums_local = datums2.borrow();
668                            datums_local.extend(datum_iter);
669                            datums_local.push(
670                                func2.eval_with_fast_window_agg::<
671                                    _,
672                                    window_agg_helpers::OneByOneAggrImpls,
673                                >(
674                                    iter, &temp_storage
675                                ),
676                            );
677                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
678                                target.push((e.into(), Diff::ONE));
679                            }
680                        },
681                    )
682                    .as_collection(|_, v| v.clone())
683            } else {
684                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
685                assert!(!must_validate);
686                // We couldn't have got into this if branch due to `must_validate`, so it must be
687                // because of the `mfp_after2.is_some()`.
688                let Some(mfp) = mfp_after2 else {
689                    unreachable!()
690                };
691                arranged
692                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
693                        &format!("{name} Error Check"),
694                        move |key, source, target| {
695                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
696                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
697                                // This would ideally use `to_datum_iter` but we cannot as it needs to
698                                // borrow `v` and only presents datums with that lifetime, not any longer.
699                                std::iter::repeat(v.next().unwrap()).take(count)
700                            });
701
702                            let temp_storage = RowArena::new();
703                            let mut datums_local = datums_key_2.borrow();
704                            datums_local.extend(key.to_datum_iter());
705                            let key_len = datums_local.len();
706                            for datum in func2
707                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
708                                    iter,
709                                    &temp_storage,
710                                )
711                            {
712                                datums_local.truncate(key_len);
713                                datums_local.push(datum);
714                                // We know that `mfp` can error (because of the `could_error` call
715                                // above), so try to evaluate it here.
716                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
717                                {
718                                    target.push((e.into(), Diff::ONE));
719                                }
720                            }
721                        },
722                    )
723                    .as_collection(|_, v| v.clone())
724            };
725
726            if let Some(e) = err_output {
727                err_output = Some(e.concat(&errs));
728            } else {
729                err_output = Some(errs);
730            }
731        }
732        (oks, err_output)
733    }
734
735    fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
736        &self,
737        input: VecCollection<S, Row, Diff>,
738        name_tag: Option<&str>,
739    ) -> Arranged<S, TraceAgent<Tr>>
740    where
741        S: Scope<Timestamp = G::Timestamp>,
742        Tr: for<'a> Trace<
743                Key<'a> = DatumSeq<'a>,
744                KeyOwn = Row,
745                Time = G::Timestamp,
746                Diff = Diff,
747                ValOwn: Data + MaybeValidatingRow<(), String>,
748            > + 'static,
749        Bu: Builder<
750                Time = G::Timestamp,
751                Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
752                Output = Tr::Batch,
753            >,
754        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
755    {
756        let error_logger = self.error_logger();
757
758        let output_name = format!(
759            "ReduceInaccumulable Distinct{}",
760            name_tag.unwrap_or_default()
761        );
762
763        let input: KeyCollection<_, _, _> = input.into();
764        input
765            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
766                "Arranged ReduceInaccumulable Distinct [val: empty]",
767            )
768            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
769                if let Some(err) = Tr::ValOwn::into_error() {
770                    for (value, count) in source.iter() {
771                        if count.is_positive() {
772                            continue;
773                        }
774
775                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
776                        error_logger.log(message, &format!("value={value:?}, count={count}"));
777                        t.push((err(message.to_string()), Diff::ONE));
778                        return;
779                    }
780                }
781                t.push((Tr::ValOwn::ok(()), Diff::ONE))
782            })
783    }
784
785    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
786    /// on non-monotonic inputs.
787    ///
788    /// This function renders a single reduction tree that computes aggregations with
789    /// a priority queue implemented with a series of reduce operators that partition
790    /// the input into buckets, and compute the aggregation over very small buckets
791    /// and feed the results up to larger buckets.
792    ///
793    /// Note that this implementation currently ignores the distinct bit because we
794    /// currently only perform min / max hierarchically and the reduction tree
795    /// efficiently suppresses non-distinct updates.
796    ///
797    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
798    /// trickery here to limit the memory usage per layer by internally
799    /// holding only the elements that were rejected by this stage. However, the
800    /// output collection maintains the `((key, bucket), (passing value)` for this
801    /// stage.
802    fn build_bucketed<S>(
803        &self,
804        input: VecCollection<S, (Row, Row), Diff>,
805        BucketedPlan {
806            aggr_funcs,
807            buckets,
808        }: BucketedPlan,
809        key_arity: usize,
810        mfp_after: Option<SafeMfpPlan>,
811    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
812    where
813        S: Scope<Timestamp = G::Timestamp>,
814    {
815        let mut err_output: Option<VecCollection<S, _, _>> = None;
816        let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
817            let input = input.enter(inner);
818
819            // The first mod to apply to the hash.
820            let first_mod = buckets.get(0).copied().unwrap_or(1);
821            let aggregations = aggr_funcs.len();
822
823            // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
824            let mut stage = input.map(move |(key, row)| {
825                let mut row_builder = SharedRow::get();
826                let mut row_packer = row_builder.packer();
827                row_packer.extend(row.iter().take(aggregations));
828                let values = row_builder.clone();
829
830                // Apply the initial mod here.
831                let hash = values.hashed() % first_mod;
832                let hash_key =
833                    row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
834                (hash_key, values)
835            });
836
837            // Repeatedly apply hierarchical reduction with a progressively coarser key.
838            for (index, b) in buckets.into_iter().enumerate() {
839                // Apply subsequent bucket mods for all but the first round.
840                let input = if index == 0 {
841                    stage
842                } else {
843                    stage.map(move |(hash_key, values)| {
844                        let mut hash_key_iter = hash_key.iter();
845                        let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
846                        // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
847                        let hash_key = SharedRow::pack(
848                            std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
849                        );
850                        (hash_key, values)
851                    })
852                };
853
854                // We only want the first stage to perform validation of whether invalid accumulations
855                // were observed in the input. Subsequently, we will either produce an error in the error
856                // stream or produce correct data in the output stream.
857                let validating = err_output.is_none();
858
859                let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
860                if let Some(errs) = errs {
861                    err_output = Some(errs.leave_region());
862                }
863                stage = oks
864            }
865
866            // Discard the hash from the key and return to the format of the input data.
867            let partial = stage.map(move |(hash_key, values)| {
868                let mut hash_key_iter = hash_key.iter();
869                let _hash = hash_key_iter.next();
870                (SharedRow::pack(hash_key_iter.take(key_arity)), values)
871            });
872
873            // Allocations for the two closures.
874            let mut datums1 = DatumVec::new();
875            let mut datums2 = DatumVec::new();
876            let mfp_after1 = mfp_after.clone();
877            let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
878            let aggr_funcs2 = aggr_funcs.clone();
879
880            // Build a series of stages for the reduction
881            // Arrange the final result into (key, Row)
882            let error_logger = self.error_logger();
883            // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
884            // view mz_introspection.mz_expected_group_size_advice.
885            let arranged = partial
886                .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
887                    "Arrange ReduceMinsMaxes",
888                );
889            // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
890            // but we then wouldn't be able to do this error check conditionally.  See its documentation
891            // for the rationale around using a second reduction here.
892            let must_validate = err_output.is_none();
893            if must_validate || mfp_after2.is_some() {
894                let errs = arranged
895                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
896                        "ReduceMinsMaxes Error Check",
897                        move |key, source, target| {
898                            // Negative counts would be surprising, but until we are 100% certain we wont
899                            // see them, we should report when we do. We may want to bake even more info
900                            // in here in the future.
901                            if must_validate {
902                                for (val, count) in source.iter() {
903                                    if count.is_positive() {
904                                        continue;
905                                    }
906                                    let val = val.to_row();
907                                    let message = "Non-positive accumulation in ReduceMinsMaxes";
908                                    error_logger
909                                        .log(message, &format!("val={val:?}, count={count}"));
910                                    target.push((
911                                        EvalError::Internal(message.into()).into(),
912                                        Diff::ONE,
913                                    ));
914                                    return;
915                                }
916                            }
917
918                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
919                            let Some(mfp) = &mfp_after2 else { return };
920                            let temp_storage = RowArena::new();
921                            let datum_iter = key.to_datum_iter();
922                            let mut datums_local = datums2.borrow();
923                            datums_local.extend(datum_iter);
924
925                            let mut source_iters = source
926                                .iter()
927                                .map(|(values, _cnt)| *values)
928                                .collect::<Vec<_>>();
929                            for func in aggr_funcs2.iter() {
930                                let column_iter = (0..source_iters.len())
931                                    .map(|i| source_iters[i].next().unwrap());
932                                datums_local.push(func.eval(column_iter, &temp_storage));
933                            }
934                            if let Result::Err(e) =
935                                mfp.evaluate_inner(&mut datums_local, &temp_storage)
936                            {
937                                target.push((e.into(), Diff::ONE));
938                            }
939                        },
940                    )
941                    .as_collection(|_, v| v.clone())
942                    .leave_region();
943                if let Some(e) = &err_output {
944                    err_output = Some(e.concat(&errs));
945                } else {
946                    err_output = Some(errs);
947                }
948            }
949            arranged
950                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
951                    "ReduceMinsMaxes",
952                    move |key, source, target| {
953                        let temp_storage = RowArena::new();
954                        let datum_iter = key.to_datum_iter();
955                        let mut datums_local = datums1.borrow();
956                        datums_local.extend(datum_iter);
957                        let key_len = datums_local.len();
958
959                        let mut source_iters = source
960                            .iter()
961                            .map(|(values, _cnt)| *values)
962                            .collect::<Vec<_>>();
963                        for func in aggr_funcs.iter() {
964                            let column_iter =
965                                (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
966                            datums_local.push(func.eval(column_iter, &temp_storage));
967                        }
968
969                        if let Some(row) = evaluate_mfp_after(
970                            &mfp_after1,
971                            &mut datums_local,
972                            &temp_storage,
973                            key_len,
974                        ) {
975                            target.push((row, Diff::ONE));
976                        }
977                    },
978                )
979                .leave_region()
980        });
981        (
982            arranged_output,
983            err_output.expect("expected to validate in one level of the hierarchy"),
984        )
985    }
986
987    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
988    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
989    /// with the negation of what's produced by the reduction.
990    /// `validating` indicates whether we want this stage to perform error detection
991    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
992    /// stages can skip validation.
993    fn build_bucketed_stage<S>(
994        &self,
995        aggr_funcs: &Vec<AggregateFunc>,
996        input: &VecCollection<S, (Row, Row), Diff>,
997        validating: bool,
998    ) -> (
999        VecCollection<S, (Row, Row), Diff>,
1000        Option<VecCollection<S, DataflowError, Diff>>,
1001    )
1002    where
1003        S: Scope<Timestamp = G::Timestamp>,
1004    {
1005        let (input, negated_output, errs) = if validating {
1006            let (input, reduced) = self
1007                .build_bucketed_negated_output::<
1008                    _,
1009                    RowValBuilder<_, _, _>,
1010                    RowValSpine<Result<Row, Row>, _, _>,
1011                >(
1012                    input,
1013                    aggr_funcs.clone(),
1014                );
1015            let (oks, errs) = reduced
1016                .as_collection(|k, v| (k.to_row(), v.clone()))
1017                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1018                "Checked Invalid Accumulations",
1019                |(hash_key, result)| match result {
1020                    Err(hash_key) => {
1021                        let mut hash_key_iter = hash_key.iter();
1022                        let _hash = hash_key_iter.next();
1023                        let key = SharedRow::pack(hash_key_iter);
1024                        let message = format!(
1025                            "Invalid data in source, saw non-positive accumulation \
1026                                         for key {key:?} in hierarchical mins-maxes aggregate"
1027                        );
1028                        Err(EvalError::Internal(message.into()).into())
1029                    }
1030                    Ok(values) => Ok((hash_key, values)),
1031                },
1032            );
1033            (input, oks, Some(errs))
1034        } else {
1035            let (input, reduced) = self
1036                .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1037                    input,
1038                    aggr_funcs.clone(),
1039                );
1040            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1041            // that we need to apply the mod on both input and oks.
1042            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1043            (input, oks, None)
1044        };
1045
1046        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1047        let oks = negated_output.concat(&input);
1048        (oks, errs)
1049    }
1050
1051    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1052    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1053    /// with all diffs in the reduction's output negated.
1054    fn build_bucketed_negated_output<S, Bu, Tr>(
1055        &self,
1056        input: &VecCollection<S, (Row, Row), Diff>,
1057        aggrs: Vec<AggregateFunc>,
1058    ) -> (
1059        Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1060        Arranged<S, TraceAgent<Tr>>,
1061    )
1062    where
1063        S: Scope<Timestamp = G::Timestamp>,
1064        Tr: for<'a> Trace<
1065                Key<'a> = DatumSeq<'a>,
1066                KeyOwn = Row,
1067                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1068                Time = G::Timestamp,
1069                Diff = Diff,
1070            > + 'static,
1071        Bu: Builder<
1072                Time = G::Timestamp,
1073                Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1074                Output = Tr::Batch,
1075            >,
1076        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1077    {
1078        let error_logger = self.error_logger();
1079        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1080        // view mz_introspection.mz_expected_group_size_advice.
1081        let arranged_input = input
1082            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1083                "Arranged MinsMaxesHierarchical input",
1084            );
1085
1086        let reduced = arranged_input.mz_reduce_abelian::<_, Bu, Tr>(
1087            "Reduced Fallibly MinsMaxesHierarchical",
1088            move |key, source, target| {
1089                if let Some(err) = Tr::ValOwn::into_error() {
1090                    // Should negative accumulations reach us, we should loudly complain.
1091                    for (value, count) in source.iter() {
1092                        if count.is_positive() {
1093                            continue;
1094                        }
1095                        error_logger.log(
1096                            "Non-positive accumulation in MinsMaxesHierarchical",
1097                            &format!("key={key:?}, value={value:?}, count={count}"),
1098                        );
1099                        // After complaining, output an error here so that we can eventually
1100                        // report it in an error stream.
1101                        target.push((err(Tr::owned_key(key)), Diff::ONE));
1102                        return;
1103                    }
1104                }
1105
1106                let mut row_builder = SharedRow::get();
1107                let mut row_packer = row_builder.packer();
1108
1109                let mut source_iters = source
1110                    .iter()
1111                    .map(|(values, _cnt)| *values)
1112                    .collect::<Vec<_>>();
1113                for func in aggrs.iter() {
1114                    let column_iter =
1115                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1116                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1117                }
1118                // We only want to arrange the parts of the input that are not part of the output.
1119                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1120                // gives us the intended value of this aggregate function. Also we assume that regardless
1121                // of the multiplicity of the final result in the input, we only want to have one copy
1122                // in the output.
1123                target.reserve(source.len().saturating_add(1));
1124                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1125                target.extend(source.iter().map(|(values, cnt)| {
1126                    let mut cnt = *cnt;
1127                    cnt.negate();
1128                    (Tr::ValOwn::ok(values.to_row()), cnt)
1129                }));
1130            },
1131        );
1132        (arranged_input, reduced)
1133    }
1134
1135    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1136    /// on monotonic inputs.
1137    fn build_monotonic<S>(
1138        &self,
1139        collection: VecCollection<S, (Row, Row), Diff>,
1140        MonotonicPlan {
1141            aggr_funcs,
1142            must_consolidate,
1143        }: MonotonicPlan,
1144        mfp_after: Option<SafeMfpPlan>,
1145    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1146    where
1147        S: Scope<Timestamp = G::Timestamp>,
1148    {
1149        let aggregations = aggr_funcs.len();
1150        // Gather the relevant values into a vec of rows ordered by aggregation_index
1151        let collection = collection
1152            .map(move |(key, row)| {
1153                let mut row_builder = SharedRow::get();
1154                let mut values = Vec::with_capacity(aggregations);
1155                values.extend(
1156                    row.iter()
1157                        .take(aggregations)
1158                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1159                );
1160
1161                (key, values)
1162            })
1163            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1164                must_consolidate,
1165                "Consolidated ReduceMonotonic input",
1166            );
1167
1168        // It should be now possible to ensure that we have a monotonic collection.
1169        let error_logger = self.error_logger();
1170        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1171            error_logger.log(
1172                "Non-monotonic input to ReduceMonotonic",
1173                &format!("data={data:?}, diff={diff}"),
1174            );
1175            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1176            (EvalError::Internal(m).into(), Diff::ONE)
1177        });
1178        // We can place our rows directly into the diff field, and
1179        // only keep the relevant one corresponding to evaluating our
1180        // aggregate, instead of having to do a hierarchical reduction.
1181        let partial = partial.explode_one(move |(key, values)| {
1182            let mut output = Vec::new();
1183            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1184                output.push(monoids::get_monoid(row, func).expect(
1185                    "hierarchical aggregations are expected to have monoid implementations",
1186                ));
1187            }
1188            (key, output)
1189        });
1190
1191        // Allocations for the two closures.
1192        let mut datums1 = DatumVec::new();
1193        let mut datums2 = DatumVec::new();
1194        let mfp_after1 = mfp_after.clone();
1195        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1196
1197        let partial: KeyCollection<_, _, _> = partial.into();
1198        let arranged = partial
1199            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1200                "ArrangeMonotonic [val: empty]",
1201            );
1202        let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1203            "ReduceMonotonic",
1204            {
1205                move |key, input, output| {
1206                    let temp_storage = RowArena::new();
1207                    let datum_iter = key.to_datum_iter();
1208                    let mut datums_local = datums1.borrow();
1209                    datums_local.extend(datum_iter);
1210                    let key_len = datums_local.len();
1211                    let accum = &input[0].1;
1212                    for monoid in accum.iter() {
1213                        datums_local.extend(monoid.finalize().iter());
1214                    }
1215
1216                    if let Some(row) =
1217                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1218                    {
1219                        output.push((row, Diff::ONE));
1220                    }
1221                }
1222            },
1223        );
1224
1225        // If `mfp_after` can error, then we need to render a paired reduction
1226        // to scan for these potential errors. Note that we cannot directly use
1227        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1228        // conditionally render the second component of the reduction pair.
1229        if let Some(mfp) = mfp_after2 {
1230            let mfp_errs = arranged
1231                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1232                    "ReduceMonotonic Error Check",
1233                    move |key, input, output| {
1234                        let temp_storage = RowArena::new();
1235                        let datum_iter = key.to_datum_iter();
1236                        let mut datums_local = datums2.borrow();
1237                        datums_local.extend(datum_iter);
1238                        let accum = &input[0].1;
1239                        for monoid in accum.iter() {
1240                            datums_local.extend(monoid.finalize().iter());
1241                        }
1242                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1243                        {
1244                            output.push((e.into(), Diff::ONE));
1245                        }
1246                    },
1247                )
1248                .as_collection(|_k, v| v.clone());
1249            (output, validation_errs.concat(&mfp_errs))
1250        } else {
1251            (output, validation_errs)
1252        }
1253    }
1254
1255    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1256    ///
1257    /// The incoming values are moved to the update's "difference" field, at which point
1258    /// they can be accumulated in place. The `count` operator promotes the accumulated
1259    /// values to data, at which point a final map applies operator-specific logic to
1260    /// yield the final aggregate.
1261    fn build_accumulable<S>(
1262        &self,
1263        collection: VecCollection<S, (Row, Row), Diff>,
1264        AccumulablePlan {
1265            full_aggrs,
1266            simple_aggrs,
1267            distinct_aggrs,
1268        }: AccumulablePlan,
1269        key_arity: usize,
1270        mfp_after: Option<SafeMfpPlan>,
1271    ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1272    where
1273        S: Scope<Timestamp = G::Timestamp>,
1274    {
1275        // we must have called this function with something to reduce
1276        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1277            self.error_logger().soft_panic_or_log(
1278                "Incorrect numbers of aggregates in accummulable reduction rendering",
1279                &format!(
1280                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1281                    full_aggrs.len(),
1282                    simple_aggrs.len(),
1283                    distinct_aggrs.len(),
1284                ),
1285            );
1286        }
1287
1288        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1289        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1290        // Other aggregations can be directly moved in to the `diff` field.
1291        //
1292        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1293        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1294        // generally the count, and then two aggregation-specific values. The size could be
1295        // reduced if we want to specialize for the aggregations.
1296
1297        // Instantiate a default vector for diffs with the correct types at each
1298        // position.
1299        let zero_diffs: (Vec<_>, Diff) = (
1300            full_aggrs
1301                .iter()
1302                .map(|f| accumulable_zero(&f.func))
1303                .collect(),
1304            Diff::ZERO,
1305        );
1306
1307        let mut to_aggregate = Vec::new();
1308        if simple_aggrs.len() > 0 {
1309            // First, collect all non-distinct aggregations in one pass.
1310            let easy_cases = collection.explode_one({
1311                let zero_diffs = zero_diffs.clone();
1312                move |(key, row)| {
1313                    let mut diffs = zero_diffs.clone();
1314                    // Try to unpack only the datums we need. Unfortunately, since we
1315                    // can't random access into a Row, we have to iterate through one by one.
1316                    // TODO: Even though we don't have random access, we could still avoid unpacking
1317                    // everything that we don't care about, and it might be worth it to extend the
1318                    // Row API to do that.
1319                    let mut row_iter = row.iter().enumerate();
1320                    for (datum_index, aggr) in simple_aggrs.iter() {
1321                        let mut datum = row_iter.next().unwrap();
1322                        while datum_index != &datum.0 {
1323                            datum = row_iter.next().unwrap();
1324                        }
1325                        let datum = datum.1;
1326                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1327                        diffs.1 = Diff::ONE;
1328                    }
1329                    ((key, ()), diffs)
1330                }
1331            });
1332            to_aggregate.push(easy_cases);
1333        }
1334
1335        // Next, collect all aggregations that require distinctness.
1336        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1337            let pairer = Pairer::new(key_arity);
1338            let collection = collection
1339                .map(move |(key, row)| {
1340                    let value = row.iter().nth(datum_index).unwrap();
1341                    (pairer.merge(&key, std::iter::once(value)), ())
1342                })
1343                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1344                    "Arranged Accumulable Distinct [val: empty]",
1345                )
1346                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1347                    "Reduced Accumulable Distinct [val: empty]",
1348                    move |_k, _s, t| t.push(((), Diff::ONE)),
1349                )
1350                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1351                .explode_one({
1352                    let zero_diffs = zero_diffs.clone();
1353                    move |(key, row)| {
1354                        let datum = row.iter().next().unwrap();
1355                        let mut diffs = zero_diffs.clone();
1356                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1357                        diffs.1 = Diff::ONE;
1358                        ((key, ()), diffs)
1359                    }
1360                });
1361            to_aggregate.push(collection);
1362        }
1363
1364        // now concatenate, if necessary, multiple aggregations
1365        let collection = if to_aggregate.len() == 1 {
1366            to_aggregate.remove(0)
1367        } else {
1368            differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1369        };
1370
1371        // Allocations for the two closures.
1372        let mut datums1 = DatumVec::new();
1373        let mut datums2 = DatumVec::new();
1374        let mfp_after1 = mfp_after.clone();
1375        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1376        let full_aggrs2 = full_aggrs.clone();
1377
1378        let error_logger = self.error_logger();
1379        let err_full_aggrs = full_aggrs.clone();
1380        let (arranged_output, arranged_errs) = collection
1381            .mz_arrange::<
1382                RowBatcher<_, _>,
1383                RowBuilder<_, _>,
1384                RowSpine<_, (Vec<Accum>, Diff)>,
1385            >("ArrangeAccumulable [val: empty]")
1386            .reduce_pair::<
1387                _,
1388                RowRowBuilder<_, _>,
1389                RowRowSpine<_, _>,
1390                _,
1391                RowErrBuilder<_, _>,
1392                RowErrSpine<_, _>,
1393            >(
1394                "ReduceAccumulable",
1395                "AccumulableErrorCheck",
1396                {
1397                    move |key, input, output| {
1398                        let (ref accums, total) = input[0].1;
1399
1400                        let temp_storage = RowArena::new();
1401                        let datum_iter = key.to_datum_iter();
1402                        let mut datums_local = datums1.borrow();
1403                        datums_local.extend(datum_iter);
1404                        let key_len = datums_local.len();
1405                        for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1406                            datums_local.push(finalize_accum(&aggr.func, accum, total));
1407                        }
1408
1409                        if let Some(row) = evaluate_mfp_after(
1410                            &mfp_after1,
1411                            &mut datums_local,
1412                            &temp_storage,
1413                            key_len,
1414                        ) {
1415                            output.push((row, Diff::ONE));
1416                        }
1417                    }
1418                },
1419                move |key, input, output| {
1420                    let (ref accums, total) = input[0].1;
1421                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1422                        // We first test here if inputs without net-positive records are present,
1423                        // producing an error to the logs and to the query output if that is the case.
1424                        if total == Diff::ZERO && !accum.is_zero() {
1425                            error_logger.log(
1426                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1427                                &format!("aggr={aggr:?}, accum={accum:?}"),
1428                            );
1429                            let key = key.to_row();
1430                            let message = format!(
1431                                "Invalid data in source, saw net-zero records for key {key} \
1432                                 with non-zero accumulation in accumulable aggregate"
1433                            );
1434                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1435                        }
1436                        match (&aggr.func, &accum) {
1437                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1438                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1439                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1440                                if accum.is_negative() {
1441                                    error_logger.log(
1442                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1443                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1444                                );
1445                                    let key = key.to_row();
1446                                    let message = format!(
1447                                        "Invalid data in source, saw negative accumulation with \
1448                                         unsigned type for key {key}"
1449                                    );
1450                                    let err =
1451                                        EvalError::Internal(message.into());
1452                                    output.push((err.into(), Diff::ONE));
1453                                }
1454                            }
1455                            _ => (), // no more errors to check for at this point!
1456                        }
1457                    }
1458
1459                    // If `mfp_after` can error, then evaluate it here.
1460                    let Some(mfp) = &mfp_after2 else { return };
1461                    let temp_storage = RowArena::new();
1462                    let datum_iter = key.to_datum_iter();
1463                    let mut datums_local = datums2.borrow();
1464                    datums_local.extend(datum_iter);
1465                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1466                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1467                    }
1468
1469                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1470                        output.push((e.into(), Diff::ONE));
1471                    }
1472                },
1473            );
1474        (
1475            arranged_output,
1476            arranged_errs.as_collection(|_key, error| error.clone()),
1477        )
1478    }
1479}
1480
1481/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1482/// containing key and aggregate values, then returns a result `Row` or `None`
1483/// if the MFP filters the result out.
1484fn evaluate_mfp_after<'a, 'b>(
1485    mfp_after: &'a Option<SafeMfpPlan>,
1486    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1487    temp_storage: &'a RowArena,
1488    key_len: usize,
1489) -> Option<Row> {
1490    let mut row_builder = SharedRow::get();
1491    // Apply MFP if it exists and pack a Row of
1492    // aggregate values from `datums_local`.
1493    if let Some(mfp) = mfp_after {
1494        // It must ignore errors here, but they are scanned
1495        // for elsewhere if the MFP can error.
1496        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1497            // The `mfp_after` must preserve the key columns,
1498            // so we can skip them to form aggregation results.
1499            Some(row_builder.pack_using(iter.skip(key_len)))
1500        } else {
1501            None
1502        }
1503    } else {
1504        Some(row_builder.pack_using(&datums_local[key_len..]))
1505    }
1506}
1507
1508fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1509    match aggr_func {
1510        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1511            trues: Diff::ZERO,
1512            falses: Diff::ZERO,
1513        },
1514        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1515            accum: AccumCount::ZERO,
1516            pos_infs: Diff::ZERO,
1517            neg_infs: Diff::ZERO,
1518            nans: Diff::ZERO,
1519            non_nulls: Diff::ZERO,
1520        },
1521        AggregateFunc::SumNumeric => Accum::Numeric {
1522            accum: OrderedDecimal(NumericAgg::zero()),
1523            pos_infs: Diff::ZERO,
1524            neg_infs: Diff::ZERO,
1525            nans: Diff::ZERO,
1526            non_nulls: Diff::ZERO,
1527        },
1528        _ => Accum::SimpleNumber {
1529            accum: AccumCount::ZERO,
1530            non_nulls: Diff::ZERO,
1531        },
1532    }
1533}
1534
1535static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1536
1537fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1538    match aggregate_func {
1539        AggregateFunc::Count => Accum::SimpleNumber {
1540            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1541            non_nulls: if datum.is_null() {
1542                Diff::ZERO
1543            } else {
1544                Diff::ONE
1545            },
1546        },
1547        AggregateFunc::Any | AggregateFunc::All => match datum {
1548            Datum::True => Accum::Bool {
1549                trues: Diff::ONE,
1550                falses: Diff::ZERO,
1551            },
1552            Datum::Null => Accum::Bool {
1553                trues: Diff::ZERO,
1554                falses: Diff::ZERO,
1555            },
1556            Datum::False => Accum::Bool {
1557                trues: Diff::ZERO,
1558                falses: Diff::ONE,
1559            },
1560            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1561        },
1562        AggregateFunc::Dummy => match datum {
1563            Datum::Dummy => Accum::SimpleNumber {
1564                accum: AccumCount::ZERO,
1565                non_nulls: Diff::ZERO,
1566            },
1567            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1568        },
1569        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1570            let n = match datum {
1571                Datum::Float32(n) => f64::from(*n),
1572                Datum::Float64(n) => *n,
1573                Datum::Null => 0f64,
1574                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1575            };
1576
1577            let nans = Diff::from(n.is_nan());
1578            let pos_infs = Diff::from(n == f64::INFINITY);
1579            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1580            let non_nulls = Diff::from(datum != Datum::Null);
1581
1582            // Map the floating point value onto a fixed precision domain
1583            // All special values should map to zero, since they are tracked separately
1584            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1585                AccumCount::ZERO
1586            } else {
1587                // This operation will truncate to i128::MAX if out of range.
1588                // TODO(benesch): rewrite to avoid `as`.
1589                #[allow(clippy::as_conversions)]
1590                { (n * *FLOAT_SCALE) as i128 }.into()
1591            };
1592
1593            Accum::Float {
1594                accum,
1595                pos_infs,
1596                neg_infs,
1597                nans,
1598                non_nulls,
1599            }
1600        }
1601        AggregateFunc::SumNumeric => match datum {
1602            Datum::Numeric(n) => {
1603                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1604                    if n.0.is_negative() {
1605                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1606                    } else {
1607                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1608                    }
1609                } else if n.0.is_nan() {
1610                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1611                } else {
1612                    // Take a narrow decimal (datum) into a wide decimal
1613                    // (aggregator).
1614                    let mut cx_agg = numeric::cx_agg();
1615                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1616                };
1617
1618                Accum::Numeric {
1619                    accum: OrderedDecimal(accum),
1620                    pos_infs,
1621                    neg_infs,
1622                    nans,
1623                    non_nulls: Diff::ONE,
1624                }
1625            }
1626            Datum::Null => Accum::Numeric {
1627                accum: OrderedDecimal(NumericAgg::zero()),
1628                pos_infs: Diff::ZERO,
1629                neg_infs: Diff::ZERO,
1630                nans: Diff::ZERO,
1631                non_nulls: Diff::ZERO,
1632            },
1633            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1634        },
1635        _ => {
1636            // Other accumulations need to disentangle the accumulable
1637            // value from its NULL-ness, which is not quite as easily
1638            // accumulated.
1639            match datum {
1640                Datum::Int16(i) => Accum::SimpleNumber {
1641                    accum: i.into(),
1642                    non_nulls: Diff::ONE,
1643                },
1644                Datum::Int32(i) => Accum::SimpleNumber {
1645                    accum: i.into(),
1646                    non_nulls: Diff::ONE,
1647                },
1648                Datum::Int64(i) => Accum::SimpleNumber {
1649                    accum: i.into(),
1650                    non_nulls: Diff::ONE,
1651                },
1652                Datum::UInt16(u) => Accum::SimpleNumber {
1653                    accum: u.into(),
1654                    non_nulls: Diff::ONE,
1655                },
1656                Datum::UInt32(u) => Accum::SimpleNumber {
1657                    accum: u.into(),
1658                    non_nulls: Diff::ONE,
1659                },
1660                Datum::UInt64(u) => Accum::SimpleNumber {
1661                    accum: u.into(),
1662                    non_nulls: Diff::ONE,
1663                },
1664                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1665                    accum: u64::from(t).into(),
1666                    non_nulls: Diff::ONE,
1667                },
1668                Datum::Null => Accum::SimpleNumber {
1669                    accum: AccumCount::ZERO,
1670                    non_nulls: Diff::ZERO,
1671                },
1672                x => panic!("Accumulating non-integer data: {x:?}"),
1673            }
1674        }
1675    }
1676}
1677
1678fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1679    // The finished value depends on the aggregation function in a variety of ways.
1680    // For all aggregates but count, if only null values were
1681    // accumulated, then the output is null.
1682    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1683        Datum::Null
1684    } else {
1685        match (&aggr_func, &accum) {
1686            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1687                Datum::Int64(non_nulls.into_inner())
1688            }
1689            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1690                // If any false, else if all true, else must be no false and some nulls.
1691                if falses.is_positive() {
1692                    Datum::False
1693                } else if *trues == total {
1694                    Datum::True
1695                } else {
1696                    Datum::Null
1697                }
1698            }
1699            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1700                // If any true, else if all false, else must be no true and some nulls.
1701                if trues.is_positive() {
1702                    Datum::True
1703                } else if *falses == total {
1704                    Datum::False
1705                } else {
1706                    Datum::Null
1707                }
1708            }
1709            (AggregateFunc::Dummy, _) => Datum::Dummy,
1710            // If any non-nulls, just report the aggregate.
1711            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1712            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1713                // This conversion is safe, as long as we have less than 2^32
1714                // summands.
1715                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1716                // If so, rewrite to avoid `as`.
1717                #[allow(clippy::as_conversions)]
1718                Datum::Int64(accum.into_inner() as i64)
1719            }
1720            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1721            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1722            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1723                if !accum.is_negative() {
1724                    // Our semantics of overflow are not clearly articulated wrt.
1725                    // unsigned vs. signed types (database-issues#5172). We adopt an
1726                    // unsigned wrapping behavior to match what we do above for
1727                    // signed types.
1728                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1729                    #[allow(clippy::as_conversions)]
1730                    Datum::UInt64(accum.into_inner() as u64)
1731                } else {
1732                    // Note that we return a value here, but an error in the other
1733                    // operator of the reduce_pair. Therefore, we expect that this
1734                    // value will never be exposed as an output.
1735                    Datum::Null
1736                }
1737            }
1738            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1739                if !accum.is_negative() {
1740                    Datum::from(*accum)
1741                } else {
1742                    // Note that we return a value here, but an error in the other
1743                    // operator of the reduce_pair. Therefore, we expect that this
1744                    // value will never be exposed as an output.
1745                    Datum::Null
1746                }
1747            }
1748            (
1749                AggregateFunc::SumFloat32,
1750                Accum::Float {
1751                    accum,
1752                    pos_infs,
1753                    neg_infs,
1754                    nans,
1755                    non_nulls: _,
1756                },
1757            ) => {
1758                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1759                    // NaNs are NaNs and cases where we've seen a
1760                    // mixture of positive and negative infinities.
1761                    Datum::from(f32::NAN)
1762                } else if pos_infs.is_positive() {
1763                    Datum::from(f32::INFINITY)
1764                } else if neg_infs.is_positive() {
1765                    Datum::from(f32::NEG_INFINITY)
1766                } else {
1767                    // TODO(benesch): remove potentially dangerous usage of `as`.
1768                    #[allow(clippy::as_conversions)]
1769                    {
1770                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1771                    }
1772                }
1773            }
1774            (
1775                AggregateFunc::SumFloat64,
1776                Accum::Float {
1777                    accum,
1778                    pos_infs,
1779                    neg_infs,
1780                    nans,
1781                    non_nulls: _,
1782                },
1783            ) => {
1784                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1785                    // NaNs are NaNs and cases where we've seen a
1786                    // mixture of positive and negative infinities.
1787                    Datum::from(f64::NAN)
1788                } else if pos_infs.is_positive() {
1789                    Datum::from(f64::INFINITY)
1790                } else if neg_infs.is_positive() {
1791                    Datum::from(f64::NEG_INFINITY)
1792                } else {
1793                    // TODO(benesch): remove potentially dangerous usage of `as`.
1794                    #[allow(clippy::as_conversions)]
1795                    {
1796                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1797                    }
1798                }
1799            }
1800            (
1801                AggregateFunc::SumNumeric,
1802                Accum::Numeric {
1803                    accum,
1804                    pos_infs,
1805                    neg_infs,
1806                    nans,
1807                    non_nulls: _,
1808                },
1809            ) => {
1810                let mut cx_datum = numeric::cx_datum();
1811                let d = cx_datum.to_width(accum.0);
1812                // Take a wide decimal (aggregator) into a
1813                // narrow decimal (datum). If this operation
1814                // overflows the datum, this new value will be
1815                // +/- infinity. However, the aggregator tracks
1816                // the amount of overflow, making it invertible.
1817                let inf_d = d.is_infinite();
1818                let neg_d = d.is_negative();
1819                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1820                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1821                if nans.is_positive() || (pos_inf && neg_inf) {
1822                    // NaNs are NaNs and cases where we've seen a
1823                    // mixture of positive and negative infinities.
1824                    Datum::from(Numeric::nan())
1825                } else if pos_inf {
1826                    Datum::from(Numeric::infinity())
1827                } else if neg_inf {
1828                    let mut cx = numeric::cx_datum();
1829                    let mut d = Numeric::infinity();
1830                    cx.neg(&mut d);
1831                    Datum::from(d)
1832                } else {
1833                    Datum::from(d)
1834                }
1835            }
1836            _ => panic!(
1837                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1838                aggr_func
1839            ),
1840        }
1841    }
1842}
1843
1844/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1845type AccumCount = mz_ore::Overflowing<i128>;
1846
1847/// Accumulates values for the various types of accumulable aggregations.
1848///
1849/// We assume that there are not more than 2^32 elements for the aggregation.
1850/// Thus we can perform a summation over i32 in an i64 accumulator
1851/// and not worry about exceeding its bounds.
1852///
1853/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1854/// point representation has less precision than a double. It is entirely possible
1855/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1856/// to preserve group guarantees.
1857#[derive(
1858    Debug,
1859    Clone,
1860    Copy,
1861    PartialEq,
1862    Eq,
1863    PartialOrd,
1864    Ord,
1865    Serialize,
1866    Deserialize
1867)]
1868enum Accum {
1869    /// Accumulates boolean values.
1870    Bool {
1871        /// The number of `true` values observed.
1872        trues: Diff,
1873        /// The number of `false` values observed.
1874        falses: Diff,
1875    },
1876    /// Accumulates simple numeric values.
1877    SimpleNumber {
1878        /// The accumulation of all non-NULL values observed.
1879        accum: AccumCount,
1880        /// The number of non-NULL values observed.
1881        non_nulls: Diff,
1882    },
1883    /// Accumulates float values.
1884    Float {
1885        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
1886        /// preserve associativity and commutativity
1887        accum: AccumCount,
1888        /// Counts +inf
1889        pos_infs: Diff,
1890        /// Counts -inf
1891        neg_infs: Diff,
1892        /// Counts NaNs
1893        nans: Diff,
1894        /// Counts non-NULL values
1895        non_nulls: Diff,
1896    },
1897    /// Accumulates arbitrary precision decimals.
1898    Numeric {
1899        /// Accumulates non-special values
1900        accum: OrderedDecimal<NumericAgg>,
1901        /// Counts +inf
1902        pos_infs: Diff,
1903        /// Counts -inf
1904        neg_infs: Diff,
1905        /// Counts NaNs
1906        nans: Diff,
1907        /// Counts non-NULL values
1908        non_nulls: Diff,
1909    },
1910}
1911
1912impl IsZero for Accum {
1913    fn is_zero(&self) -> bool {
1914        match self {
1915            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1916            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1917            Accum::Float {
1918                accum,
1919                pos_infs,
1920                neg_infs,
1921                nans,
1922                non_nulls,
1923            } => {
1924                accum.is_zero()
1925                    && pos_infs.is_zero()
1926                    && neg_infs.is_zero()
1927                    && nans.is_zero()
1928                    && non_nulls.is_zero()
1929            }
1930            Accum::Numeric {
1931                accum,
1932                pos_infs,
1933                neg_infs,
1934                nans,
1935                non_nulls,
1936            } => {
1937                accum.0.is_zero()
1938                    && pos_infs.is_zero()
1939                    && neg_infs.is_zero()
1940                    && nans.is_zero()
1941                    && non_nulls.is_zero()
1942            }
1943        }
1944    }
1945}
1946
1947impl Semigroup for Accum {
1948    fn plus_equals(&mut self, other: &Accum) {
1949        match (&mut *self, other) {
1950            (
1951                Accum::Bool { trues, falses },
1952                Accum::Bool {
1953                    trues: other_trues,
1954                    falses: other_falses,
1955                },
1956            ) => {
1957                *trues += other_trues;
1958                *falses += other_falses;
1959            }
1960            (
1961                Accum::SimpleNumber { accum, non_nulls },
1962                Accum::SimpleNumber {
1963                    accum: other_accum,
1964                    non_nulls: other_non_nulls,
1965                },
1966            ) => {
1967                *accum += other_accum;
1968                *non_nulls += other_non_nulls;
1969            }
1970            (
1971                Accum::Float {
1972                    accum,
1973                    pos_infs,
1974                    neg_infs,
1975                    nans,
1976                    non_nulls,
1977                },
1978                Accum::Float {
1979                    accum: other_accum,
1980                    pos_infs: other_pos_infs,
1981                    neg_infs: other_neg_infs,
1982                    nans: other_nans,
1983                    non_nulls: other_non_nulls,
1984                },
1985            ) => {
1986                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1987                    warn!("Float accumulator overflow. Incorrect results possible");
1988                    accum.wrapping_add(*other_accum)
1989                });
1990                *pos_infs += other_pos_infs;
1991                *neg_infs += other_neg_infs;
1992                *nans += other_nans;
1993                *non_nulls += other_non_nulls;
1994            }
1995            (
1996                Accum::Numeric {
1997                    accum,
1998                    pos_infs,
1999                    neg_infs,
2000                    nans,
2001                    non_nulls,
2002                },
2003                Accum::Numeric {
2004                    accum: other_accum,
2005                    pos_infs: other_pos_infs,
2006                    neg_infs: other_neg_infs,
2007                    nans: other_nans,
2008                    non_nulls: other_non_nulls,
2009                },
2010            ) => {
2011                let mut cx_agg = numeric::cx_agg();
2012                cx_agg.add(&mut accum.0, &other_accum.0);
2013                // `rounded` signals we have exceeded the aggregator's max
2014                // precision, which means we've lost commutativity and
2015                // associativity; nothing to be done here, so panic. For more
2016                // context, see the DEC_Rounded definition at
2017                // http://speleotrove.com/decimal/dncont.html
2018                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2019                // Reduce to reclaim unused decimal precision. Note that this
2020                // reduction must happen somewhere to make the following
2021                // invertible:
2022                // ```
2023                // CREATE TABLE a (a numeric);
2024                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2025                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2026                // ```
2027                // This will now return infinity. However, we can retract the
2028                // value that blew up its precision:
2029                // ```
2030                // INSERT INTO a VALUES ('-9e-39');
2031                // ```
2032                // This leaves `t`'s aggregator with a value of 9e39. However,
2033                // without doing a reduction, `libdecnum` will store the value
2034                // as 9e39+0e-39, which still exceeds the narrower context's
2035                // precision. By doing the reduction, we can "reclaim" the 39
2036                // digits of precision.
2037                cx_agg.reduce(&mut accum.0);
2038                *pos_infs += other_pos_infs;
2039                *neg_infs += other_neg_infs;
2040                *nans += other_nans;
2041                *non_nulls += other_non_nulls;
2042            }
2043            (l, r) => unreachable!(
2044                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2045            ),
2046        }
2047    }
2048}
2049
2050impl Multiply<Diff> for Accum {
2051    type Output = Accum;
2052
2053    fn multiply(self, factor: &Diff) -> Accum {
2054        let factor = *factor;
2055        match self {
2056            Accum::Bool { trues, falses } => Accum::Bool {
2057                trues: trues * factor,
2058                falses: falses * factor,
2059            },
2060            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2061                accum: accum * AccumCount::from(factor),
2062                non_nulls: non_nulls * factor,
2063            },
2064            Accum::Float {
2065                accum,
2066                pos_infs,
2067                neg_infs,
2068                nans,
2069                non_nulls,
2070            } => Accum::Float {
2071                accum: accum
2072                    .checked_mul(AccumCount::from(factor))
2073                    .unwrap_or_else(|| {
2074                        warn!("Float accumulator overflow. Incorrect results possible");
2075                        accum.wrapping_mul(AccumCount::from(factor))
2076                    }),
2077                pos_infs: pos_infs * factor,
2078                neg_infs: neg_infs * factor,
2079                nans: nans * factor,
2080                non_nulls: non_nulls * factor,
2081            },
2082            Accum::Numeric {
2083                accum,
2084                pos_infs,
2085                neg_infs,
2086                nans,
2087                non_nulls,
2088            } => {
2089                let mut cx = numeric::cx_agg();
2090                let mut f = NumericAgg::from(factor.into_inner());
2091                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2092                // always be an integer, i.e. we are never increasing the
2093                // values' scale.
2094                cx.mul(&mut f, &accum.0);
2095                // `rounded` signals we have exceeded the aggregator's max
2096                // precision, which means we've lost commutativity and
2097                // associativity; nothing to be done here, so panic. For more
2098                // context, see the DEC_Rounded definition at
2099                // http://speleotrove.com/decimal/dncont.html
2100                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2101                Accum::Numeric {
2102                    accum: OrderedDecimal(f),
2103                    pos_infs: pos_infs * factor,
2104                    neg_infs: neg_infs * factor,
2105                    nans: nans * factor,
2106                    non_nulls: non_nulls * factor,
2107                }
2108            }
2109        }
2110    }
2111}
2112
2113impl Columnation for Accum {
2114    type InnerRegion = CopyRegion<Self>;
2115}
2116
2117/// Monoids for in-place compaction of monotonic streams.
2118mod monoids {
2119
2120    // We can improve the performance of some aggregations through the use of algebra.
2121    // In particular, we can move some of the aggregations in to the `diff` field of
2122    // updates, by changing `diff` from integers to a different algebraic structure.
2123    //
2124    // The one we use is called a "semigroup", and it means that the structure has a
2125    // symmetric addition operator. The trait we use also allows the semigroup elements
2126    // to present as "zero", meaning they always act as the identity under +. Here,
2127    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2128    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2129    // of DD, this Semigroup should _not_ have a zero.
2130    //
2131    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2132    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2133    // assumes this.
2134
2135    use differential_dataflow::containers::{Columnation, Region};
2136    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2137    use mz_expr::AggregateFunc;
2138    use mz_ore::soft_panic_or_log;
2139    use mz_repr::{Datum, Diff, Row};
2140    use serde::{Deserialize, Serialize};
2141
2142    /// A monoid containing a single-datum row.
2143    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2144    pub enum ReductionMonoid {
2145        Min(Row),
2146        Max(Row),
2147    }
2148
2149    impl ReductionMonoid {
2150        pub fn finalize(&self) -> &Row {
2151            use ReductionMonoid::*;
2152            match self {
2153                Min(row) | Max(row) => row,
2154            }
2155        }
2156    }
2157
2158    impl Clone for ReductionMonoid {
2159        fn clone(&self) -> Self {
2160            use ReductionMonoid::*;
2161            match self {
2162                Min(row) => Min(row.clone()),
2163                Max(row) => Max(row.clone()),
2164            }
2165        }
2166
2167        fn clone_from(&mut self, source: &Self) {
2168            use ReductionMonoid::*;
2169
2170            let mut row = std::mem::take(match self {
2171                Min(row) | Max(row) => row,
2172            });
2173
2174            let source_row = match source {
2175                Min(row) | Max(row) => row,
2176            };
2177
2178            row.clone_from(source_row);
2179
2180            match source {
2181                Min(_) => *self = Min(row),
2182                Max(_) => *self = Max(row),
2183            }
2184        }
2185    }
2186
2187    impl Multiply<Diff> for ReductionMonoid {
2188        type Output = Self;
2189
2190        fn multiply(self, factor: &Diff) -> Self {
2191            // Multiplication in ReductionMonoid is idempotent, and
2192            // its users must ascertain its monotonicity beforehand
2193            // (typically with ensure_monotonic) since it has no zero
2194            // value for us to use here.
2195            assert!(factor.is_positive());
2196            self
2197        }
2198    }
2199
2200    impl Semigroup for ReductionMonoid {
2201        fn plus_equals(&mut self, rhs: &Self) {
2202            match (self, rhs) {
2203                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2204                    let swap = {
2205                        let lhs_val = lhs.unpack_first();
2206                        let rhs_val = rhs.unpack_first();
2207                        // Datum::Null is the identity, not a small element.
2208                        match (lhs_val, rhs_val) {
2209                            (_, Datum::Null) => false,
2210                            (Datum::Null, _) => true,
2211                            (lhs, rhs) => rhs < lhs,
2212                        }
2213                    };
2214                    if swap {
2215                        lhs.clone_from(rhs);
2216                    }
2217                }
2218                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2219                    let swap = {
2220                        let lhs_val = lhs.unpack_first();
2221                        let rhs_val = rhs.unpack_first();
2222                        // Datum::Null is the identity, not a large element.
2223                        match (lhs_val, rhs_val) {
2224                            (_, Datum::Null) => false,
2225                            (Datum::Null, _) => true,
2226                            (lhs, rhs) => rhs > lhs,
2227                        }
2228                    };
2229                    if swap {
2230                        lhs.clone_from(rhs);
2231                    }
2232                }
2233                (lhs, rhs) => {
2234                    soft_panic_or_log!(
2235                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2236                    );
2237                }
2238            }
2239        }
2240    }
2241
2242    impl IsZero for ReductionMonoid {
2243        fn is_zero(&self) -> bool {
2244            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2245            // DD uses true results of this method to make stuff disappear. This makes sense when
2246            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2247            // We don't want funny stuff, like disappearing, happening to reduction results even
2248            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2249            false
2250        }
2251    }
2252
2253    impl Columnation for ReductionMonoid {
2254        type InnerRegion = ReductionMonoidRegion;
2255    }
2256
2257    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2258    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2259    /// the former for simplicity reasons.
2260    #[derive(Default)]
2261    pub struct ReductionMonoidRegion {
2262        inner: <Row as Columnation>::InnerRegion,
2263    }
2264
2265    impl Region for ReductionMonoidRegion {
2266        type Item = ReductionMonoid;
2267
2268        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2269            use ReductionMonoid::*;
2270            match item {
2271                Min(row) => Min(unsafe { self.inner.copy(row) }),
2272                Max(row) => Max(unsafe { self.inner.copy(row) }),
2273            }
2274        }
2275
2276        fn clear(&mut self) {
2277            self.inner.clear();
2278        }
2279
2280        fn reserve_items<'a, I>(&mut self, items: I)
2281        where
2282            Self: 'a,
2283            I: Iterator<Item = &'a Self::Item> + Clone,
2284        {
2285            self.inner
2286                .reserve_items(items.map(ReductionMonoid::finalize));
2287        }
2288
2289        fn reserve_regions<'a, I>(&mut self, regions: I)
2290        where
2291            Self: 'a,
2292            I: Iterator<Item = &'a Self> + Clone,
2293        {
2294            self.inner.reserve_regions(regions.map(|r| &r.inner));
2295        }
2296
2297        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2298            self.inner.heap_size(callback);
2299        }
2300    }
2301
2302    /// Get the correct monoid implementation for a given aggregation function. Note that
2303    /// all hierarchical aggregation functions need to supply a monoid implementation.
2304    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2305        match func {
2306            AggregateFunc::MaxNumeric
2307            | AggregateFunc::MaxInt16
2308            | AggregateFunc::MaxInt32
2309            | AggregateFunc::MaxInt64
2310            | AggregateFunc::MaxUInt16
2311            | AggregateFunc::MaxUInt32
2312            | AggregateFunc::MaxUInt64
2313            | AggregateFunc::MaxMzTimestamp
2314            | AggregateFunc::MaxFloat32
2315            | AggregateFunc::MaxFloat64
2316            | AggregateFunc::MaxBool
2317            | AggregateFunc::MaxString
2318            | AggregateFunc::MaxDate
2319            | AggregateFunc::MaxTimestamp
2320            | AggregateFunc::MaxTimestampTz
2321            | AggregateFunc::MaxInterval
2322            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2323            AggregateFunc::MinNumeric
2324            | AggregateFunc::MinInt16
2325            | AggregateFunc::MinInt32
2326            | AggregateFunc::MinInt64
2327            | AggregateFunc::MinUInt16
2328            | AggregateFunc::MinUInt32
2329            | AggregateFunc::MinUInt64
2330            | AggregateFunc::MinMzTimestamp
2331            | AggregateFunc::MinFloat32
2332            | AggregateFunc::MinFloat64
2333            | AggregateFunc::MinBool
2334            | AggregateFunc::MinString
2335            | AggregateFunc::MinDate
2336            | AggregateFunc::MinTimestamp
2337            | AggregateFunc::MinTimestampTz
2338            | AggregateFunc::MinInterval
2339            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2340            AggregateFunc::SumInt16
2341            | AggregateFunc::SumInt32
2342            | AggregateFunc::SumInt64
2343            | AggregateFunc::SumUInt16
2344            | AggregateFunc::SumUInt32
2345            | AggregateFunc::SumUInt64
2346            | AggregateFunc::SumFloat32
2347            | AggregateFunc::SumFloat64
2348            | AggregateFunc::SumNumeric
2349            | AggregateFunc::Count
2350            | AggregateFunc::Any
2351            | AggregateFunc::All
2352            | AggregateFunc::Dummy
2353            | AggregateFunc::JsonbAgg { .. }
2354            | AggregateFunc::JsonbObjectAgg { .. }
2355            | AggregateFunc::MapAgg { .. }
2356            | AggregateFunc::ArrayConcat { .. }
2357            | AggregateFunc::ListConcat { .. }
2358            | AggregateFunc::StringAgg { .. }
2359            | AggregateFunc::RowNumber { .. }
2360            | AggregateFunc::Rank { .. }
2361            | AggregateFunc::DenseRank { .. }
2362            | AggregateFunc::LagLead { .. }
2363            | AggregateFunc::FirstValue { .. }
2364            | AggregateFunc::LastValue { .. }
2365            | AggregateFunc::WindowAggregate { .. }
2366            | AggregateFunc::FusedValueWindowFunc { .. }
2367            | AggregateFunc::FusedWindowAggregate { .. } => None,
2368        }
2369    }
2370}
2371
2372mod window_agg_helpers {
2373    use crate::render::reduce::*;
2374
2375    /// TODO: It would be better for performance to do the branching that is in the methods of this
2376    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2377    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2378    /// directly.
2379    pub enum OneByOneAggrImpls {
2380        Accumulable(AccumulableOneByOneAggr),
2381        Hierarchical(HierarchicalOneByOneAggr),
2382        Basic(mz_expr::NaiveOneByOneAggr),
2383    }
2384
2385    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2386        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2387            match reduction_type(agg) {
2388                ReductionType::Basic => {
2389                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2390                }
2391                ReductionType::Accumulable => {
2392                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2393                }
2394                ReductionType::Hierarchical => {
2395                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2396                }
2397            }
2398        }
2399
2400        fn give(&mut self, d: &Datum) {
2401            match self {
2402                OneByOneAggrImpls::Basic(i) => i.give(d),
2403                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2404                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2405            }
2406        }
2407
2408        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2409            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2410            match self {
2411                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2412                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2413                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2414            }
2415        }
2416    }
2417
2418    pub struct AccumulableOneByOneAggr {
2419        aggr_func: AggregateFunc,
2420        accum: Accum,
2421        total: Diff,
2422    }
2423
2424    impl AccumulableOneByOneAggr {
2425        fn new(aggr_func: &AggregateFunc) -> Self {
2426            AccumulableOneByOneAggr {
2427                aggr_func: aggr_func.clone(),
2428                accum: accumulable_zero(aggr_func),
2429                total: Diff::ZERO,
2430            }
2431        }
2432
2433        fn give(&mut self, d: &Datum) {
2434            self.accum
2435                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2436            self.total += Diff::ONE;
2437        }
2438
2439        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2440            temp_storage.make_datum(|packer| {
2441                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2442            })
2443        }
2444    }
2445
2446    pub struct HierarchicalOneByOneAggr {
2447        aggr_func: AggregateFunc,
2448        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2449        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2450        monoid: ReductionMonoid,
2451    }
2452
2453    impl HierarchicalOneByOneAggr {
2454        fn new(aggr_func: &AggregateFunc) -> Self {
2455            let mut row_buf = Row::default();
2456            row_buf.packer().push(Datum::Null);
2457            HierarchicalOneByOneAggr {
2458                aggr_func: aggr_func.clone(),
2459                monoid: get_monoid(row_buf, aggr_func)
2460                    .expect("aggr_func should be a hierarchical aggregation function"),
2461            }
2462        }
2463
2464        fn give(&mut self, d: &Datum) {
2465            let mut row_buf = Row::default();
2466            row_buf.packer().push(d);
2467            let m = get_monoid(row_buf, &self.aggr_func)
2468                .expect("aggr_func should be a hierarchical aggregation function");
2469            self.monoid.plus_equals(&m);
2470        }
2471
2472        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2473            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2474        }
2475    }
2476}