mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::IntoOwned;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
26use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
27use differential_dataflow::{Collection, Diff as _};
28use mz_compute_types::plan::reduce::{
29    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
30    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
31};
32use mz_expr::{
33    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
34};
35use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
36use mz_repr::fixed_length::ToDatumIter;
37use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
38use mz_storage_types::errors::DataflowError;
39use mz_timely_util::operator::CollectionExt;
40use serde::{Deserialize, Serialize};
41use timely::Container;
42use timely::container::{CapacityContainerBuilder, PushInto};
43use timely::dataflow::Scope;
44use timely::progress::Timestamp;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58    ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
59    RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64    G: Scope,
65    G::Timestamp: Lattice + Refines<T> + Columnation,
66    T: Timestamp + Lattice + Columnation,
67{
68    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
69    /// minimize worst-case incremental update times and memory footprint.
70    pub fn render_reduce(
71        &self,
72        input: CollectionBundle<G, T>,
73        key_val_plan: KeyValPlan,
74        reduce_plan: ReducePlan,
75        input_key: Option<Vec<MirScalarExpr>>,
76        mfp_after: Option<MapFilterProject>,
77    ) -> CollectionBundle<G, T> {
78        // Convert `mfp_after` to an actionable plan.
79        let mfp_after = mfp_after.map(|m| {
80            m.into_plan()
81                .expect("MFP planning must succeed")
82                .into_nontemporal()
83                .expect("Fused Reduce MFPs do not have temporal predicates")
84        });
85
86        input.scope().region_named("Reduce", |inner| {
87            let KeyValPlan {
88                mut key_plan,
89                mut val_plan,
90            } = key_val_plan;
91            let key_arity = key_plan.projection.len();
92            let mut datums = DatumVec::new();
93
94            // Determine the columns we'll need from the row.
95            let mut demand = Vec::new();
96            demand.extend(key_plan.demand());
97            demand.extend(val_plan.demand());
98            demand.sort();
99            demand.dedup();
100
101            // remap column references to the subset we use.
102            let mut demand_map = BTreeMap::new();
103            for column in demand.iter() {
104                demand_map.insert(*column, demand_map.len());
105            }
106            let demand_map_len = demand_map.len();
107            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113                input_key.map(|k| (k, None)),
114                max_demand,
115                move |row_datums, time, diff| {
116                    let binding = SharedRow::get();
117                    let mut row_builder = binding.borrow_mut();
118                    let temp_storage = RowArena::new();
119
120                    let mut row_iter = row_datums.drain(..);
121                    let mut datums_local = datums.borrow();
122                    // Unpack only the demanded columns.
123                    for skip in skips.iter() {
124                        datums_local.push(row_iter.nth(*skip).unwrap());
125                    }
126
127                    // Evaluate the key expressions.
128                    let key =
129                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
130                    let key = match key {
131                        Err(e) => {
132                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
133                        }
134                        Ok(Some(key)) => key.clone(),
135                        Ok(None) => panic!("Row expected as no predicate was used"),
136                    };
137
138                    // Evaluate the value expressions.
139                    // The prior evaluation may have left additional columns we should delete.
140                    datums_local.truncate(skips.len());
141                    let val =
142                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
143                    let val = match val {
144                        Err(e) => {
145                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
146                        }
147                        Ok(Some(val)) => val.clone(),
148                        Ok(None) => panic!("Row expected as no predicate was used"),
149                    };
150
151                    Some((Ok((key, val)), time.clone(), diff.clone()))
152                },
153            );
154
155            // Demux out the potential errors from key and value selector evaluation.
156            type CB<T> = ConsolidatingContainerBuilder<T>;
157            let (ok, mut err) = key_val_input
158                .as_collection()
159                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
160
161            err = err.concat(&err_input);
162
163            // Render the reduce plan
164            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
165                .leave_region()
166        })
167    }
168
169    /// Render a dataflow based on the provided plan.
170    ///
171    /// The output will be an arrangements that looks the same as if
172    /// we just had a single reduce operator computing everything together, and
173    /// this arrangement can also be re-used.
174    fn render_reduce_plan<S>(
175        &self,
176        plan: ReducePlan,
177        collection: Collection<S, (Row, Row), Diff>,
178        err_input: Collection<S, DataflowError, Diff>,
179        key_arity: usize,
180        mfp_after: Option<SafeMfpPlan>,
181    ) -> CollectionBundle<S, T>
182    where
183        S: Scope<Timestamp = G::Timestamp>,
184    {
185        let mut errors = Default::default();
186        let arrangement =
187            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
188        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
189        CollectionBundle::from_columns(
190            0..key_arity,
191            ArrangementFlavor::Local(
192                arrangement,
193                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
194            ),
195        )
196    }
197
198    fn render_reduce_plan_inner<S>(
199        &self,
200        plan: ReducePlan,
201        collection: Collection<S, (Row, Row), Diff>,
202        errors: &mut Vec<Collection<S, DataflowError, Diff>>,
203        key_arity: usize,
204        mfp_after: Option<SafeMfpPlan>,
205    ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
206    where
207        S: Scope<Timestamp = G::Timestamp>,
208    {
209        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
210        // not only values (database-issues#6658).
211        let arrangement = match plan {
212            // If we have no aggregations or just a single type of reduction, we
213            // can go ahead and render them directly.
214            ReducePlan::Distinct => {
215                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
216                errors.push(errs);
217                arranged_output
218            }
219            ReducePlan::Accumulable(expr) => {
220                let (arranged_output, errs) =
221                    self.build_accumulable(collection, expr, key_arity, mfp_after);
222                errors.push(errs);
223                arranged_output
224            }
225            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
226                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
227                errors.push(errs);
228                output
229            }
230            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
231                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
232                errors.push(errs);
233                output
234            }
235            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
236                index,
237                expr,
238                fused_unnest_list,
239            })) => {
240                // Note that we skip validating for negative diffs when we have a fused unnest list,
241                // because this is already a CPU-intensive situation due to the non-incrementalness
242                // of window functions.
243                let validating = !fused_unnest_list;
244                let (output, errs) = self.build_basic_aggregate(
245                    collection,
246                    index,
247                    &expr,
248                    validating,
249                    key_arity,
250                    mfp_after,
251                    fused_unnest_list,
252                );
253                if validating {
254                    errors.push(errs.expect("validation should have occurred as it was requested"));
255                }
256                output
257            }
258            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
259                let (output, errs) =
260                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
261                errors.push(errs);
262                output
263            }
264            // Otherwise, we need to render something different for each type of
265            // reduction, and then stitch them together.
266            ReducePlan::Collation(expr) => {
267                // First, we need to render our constituent aggregations.
268                let mut to_collate = vec![];
269
270                for plan in [
271                    expr.hierarchical.map(ReducePlan::Hierarchical),
272                    expr.accumulable.map(ReducePlan::Accumulable),
273                    expr.basic.map(ReducePlan::Basic),
274                ]
275                .into_iter()
276                .flat_map(std::convert::identity)
277                {
278                    let r#type = ReductionType::try_from(&plan)
279                        .expect("only representable reduction types were used above");
280
281                    let arrangement = self.render_reduce_plan_inner(
282                        plan,
283                        collection.clone(),
284                        errors,
285                        key_arity,
286                        None,
287                    );
288                    to_collate.push((r#type, arrangement));
289                }
290
291                // Now we need to collate them together.
292                let (oks, errs) = self.build_collation(
293                    to_collate,
294                    expr.aggregate_types,
295                    &mut collection.scope(),
296                    mfp_after,
297                );
298                errors.push(errs);
299                oks
300            }
301        };
302        arrangement
303    }
304
305    /// Build the dataflow to combine arrangements containing results of different
306    /// aggregation types into a single arrangement.
307    ///
308    /// This computes the same thing as a join on the group key followed by shuffling
309    /// the values into the correct order. This implementation assumes that all input
310    /// arrangements present values in a way that respects the desired output order,
311    /// so we can do a linear merge to form the output.
312    fn build_collation<S>(
313        &self,
314        arrangements: Vec<(ReductionType, RowRowArrangement<S>)>,
315        aggregate_types: Vec<ReductionType>,
316        scope: &mut S,
317        mfp_after: Option<SafeMfpPlan>,
318    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
319    where
320        S: Scope<Timestamp = G::Timestamp>,
321    {
322        let error_logger = self.error_logger();
323
324        // We must have more than one arrangement to collate.
325        if arrangements.len() <= 1 {
326            error_logger.soft_panic_or_log(
327                "Incorrect number of arrangements in reduce collation",
328                &format!("len={}", arrangements.len()),
329            );
330        }
331
332        let mut to_concat = vec![];
333
334        // First, lets collect all results into a single collection.
335        for (reduction_type, arrangement) in arrangements.into_iter() {
336            let collection = arrangement.as_collection(move |key, val| {
337                (key.into_owned(), (reduction_type, val.into_owned()))
338            });
339            to_concat.push(collection);
340        }
341
342        // For each key above, we need to have exactly as many rows as there are distinct
343        // reduction types required by `aggregate_types`. We thus prepare here a properly
344        // deduplicated version of `aggregate_types` for validation during processing below.
345        let mut distinct_aggregate_types = aggregate_types.clone();
346        distinct_aggregate_types.sort_unstable();
347        distinct_aggregate_types.dedup();
348        let n_distinct_aggregate_types = distinct_aggregate_types.len();
349
350        // Allocations for the two closures.
351        let mut datums1 = DatumVec::new();
352        let mut datums2 = DatumVec::new();
353        let mfp_after1 = mfp_after.clone();
354        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
355
356        let aggregate_types_err = aggregate_types.clone();
357        let (oks, errs) = differential_dataflow::collection::concatenate(scope, to_concat)
358            .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_,_,_>, RowValSpine<_, _, _>>("Arrange ReduceCollation")
359            .reduce_pair::<_, _, _, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
360                "ReduceCollation",
361                "ReduceCollation Errors",
362                {
363                    move |key, input, output| {
364                        // The inputs are pairs of a reduction type, and a row consisting of densely
365                        // packed fused aggregate values.
366                        //
367                        // We need to reconstitute the final value by:
368                        // 1. Extracting out the fused rows by type
369                        // 2. For each aggregate, figure out what type it is, and grab the relevant
370                        //    value from the corresponding fused row.
371                        // 3. Stitch all the values together into one row.
372                        let mut accumulable = DatumList::empty().iter();
373                        let mut hierarchical = DatumList::empty().iter();
374                        let mut basic = DatumList::empty().iter();
375
376                        // Note that hierarchical and basic reductions guard against negative
377                        // multiplicities, and if we only had accumulable aggregations, we would not
378                        // have produced a collation plan, so we do not repeat the check here.
379                        if input.len() != n_distinct_aggregate_types {
380                            return;
381                        }
382
383                        for (item, _) in input.iter() {
384                            let reduction_type = &item.0;
385                            let row = &item.1;
386                            match reduction_type {
387                                ReductionType::Accumulable => accumulable = row.iter(),
388                                ReductionType::Hierarchical => hierarchical = row.iter(),
389                                ReductionType::Basic => basic = row.iter(),
390                            }
391                        }
392
393                        let temp_storage = RowArena::new();
394                        let datum_iter = key.to_datum_iter();
395                        let mut datums_local = datums1.borrow();
396                        datums_local.extend(datum_iter);
397                        let key_len = datums_local.len();
398
399                        // Merge results into the order they were asked for.
400                        for typ in aggregate_types.iter() {
401                            let datum = match typ {
402                                ReductionType::Accumulable => accumulable.next(),
403                                ReductionType::Hierarchical => hierarchical.next(),
404                                ReductionType::Basic => basic.next(),
405                            };
406                            let Some(datum) = datum else { return };
407                            datums_local.push(datum);
408                        }
409
410                        // If we did not have enough values to stitch together, then we do not
411                        // generate an output row. Not outputting here corresponds to the semantics
412                        // of an equi-join on the key, similarly to the proposal in PR materialize#17013.
413                        //
414                        // Note that we also do not want to have anything left over to stich. If we
415                        // do, then we also have an error, reported elsewhere, and would violate
416                        // join semantics.
417                        if (accumulable.next(), hierarchical.next(), basic.next())
418                            == (None, None, None)
419                        {
420                            if let Some(row) = evaluate_mfp_after(
421                                &mfp_after1,
422                                &mut datums_local,
423                                &temp_storage,
424                                key_len,
425                            ) {
426                                output.push((row, Diff::ONE));
427                            }
428                        }
429                    }
430                },
431                move |key, input, output| {
432                    if input.len() != n_distinct_aggregate_types {
433                        // We expected to stitch together exactly as many aggregate types as requested
434                        // by the collation. If we cannot, we log an error and produce no output for
435                        // this key.
436                        let message = "Mismatched aggregates for key in ReduceCollation";
437                        error_logger.log(
438                            message,
439                            &format!(
440                                "key={key:?}, n_aggregates_requested={requested}, \
441                                 n_distinct_aggregate_types={n_distinct_aggregate_types}",
442                                requested = input.len(),
443                            ),
444                        );
445                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
446                        return;
447                    }
448
449                    let mut accumulable = DatumList::empty().iter();
450                    let mut hierarchical = DatumList::empty().iter();
451                    let mut basic = DatumList::empty().iter();
452                    for (item, _) in input.iter() {
453                        let reduction_type = &item.0;
454                        let row = &item.1;
455                        match reduction_type {
456                            ReductionType::Accumulable => accumulable = row.iter(),
457                            ReductionType::Hierarchical => hierarchical = row.iter(),
458                            ReductionType::Basic => basic = row.iter(),
459                        }
460                    }
461
462                    let temp_storage = RowArena::new();
463                    let datum_iter = key.to_datum_iter();
464                    let mut datums_local = datums2.borrow();
465                    datums_local.extend(datum_iter);
466
467                    for typ in aggregate_types_err.iter() {
468                        let datum = match typ {
469                            ReductionType::Accumulable => accumulable.next(),
470                            ReductionType::Hierarchical => hierarchical.next(),
471                            ReductionType::Basic => basic.next(),
472                        };
473                        if let Some(datum) = datum {
474                            datums_local.push(datum);
475                        } else {
476                            // We cannot properly reconstruct a row if aggregates are missing.
477                            // This situation is not expected, so we log an error if it occurs.
478                            let message = "Missing value for key in ReduceCollation";
479                            error_logger.log(message, &format!("typ={typ:?}, key={key:?}"));
480                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
481                            return;
482                        }
483                    }
484
485                    // Note that we also do not want to have anything left over to stich.
486                    // If we do, then we also have an error and would violate join semantics.
487                    if (accumulable.next(), hierarchical.next(), basic.next()) != (None, None, None)
488                    {
489                        let message = "Rows too large for key in ReduceCollation";
490                        error_logger.log(message, &format!("key={key:?}"));
491                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
492                    }
493
494                    // Finally, if `mfp_after` can produce errors, then we should also report
495                    // these here.
496                    let Some(mfp) = &mfp_after2 else { return };
497                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
498                        output.push((e.into(), Diff::ONE));
499                    }
500                },
501            );
502        (oks, errs.as_collection(|_, v| v.clone()))
503    }
504
505    /// Build the dataflow to compute the set of distinct keys.
506    fn build_distinct<S>(
507        &self,
508        collection: Collection<S, (Row, Row), Diff>,
509        mfp_after: Option<SafeMfpPlan>,
510    ) -> (
511        Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
512        Collection<S, DataflowError, Diff>,
513    )
514    where
515        S: Scope<Timestamp = G::Timestamp>,
516    {
517        let error_logger = self.error_logger();
518
519        // Allocations for the two closures.
520        let mut datums1 = DatumVec::new();
521        let mut datums2 = DatumVec::new();
522        let mfp_after1 = mfp_after.clone();
523        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
524
525        let (output, errors) = collection
526            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _,>, RowRowSpine<_, _>>("Arranged DistinctBy")
527            .reduce_pair::<_, _, _, RowRowBuilder<_, _,>, RowRowSpine<_, _>, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
528                "DistinctBy",
529                "DistinctByErrorCheck",
530                move |key, _input, output| {
531                    let temp_storage = RowArena::new();
532                    let mut datums_local = datums1.borrow();
533                    datums_local.extend(key.to_datum_iter());
534
535                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
536                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
537                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
538                    if mfp_after1
539                        .as_ref()
540                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
541                        .unwrap_or(Ok(true))
542                        == Ok(true)
543                    {
544                        // We're pushing a unit value here because the key is implicitly added by the
545                        // arrangement, and the permutation logic takes care of using the key part of the
546                        // output.
547                        output.push((Row::default(), Diff::ONE));
548                    }
549                },
550                move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
551                    for (_, count) in input.iter() {
552                        if count.is_positive() {
553                            continue;
554                        }
555                        let message = "Non-positive multiplicity in DistinctBy";
556                        error_logger.log(message, &format!("row={key:?}, count={count}"));
557                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
558                        return;
559                    }
560                    // If `mfp_after` can error, then evaluate it here.
561                    let Some(mfp) = &mfp_after2 else { return };
562                    let temp_storage = RowArena::new();
563                    let datum_iter = key.to_datum_iter();
564                    let mut datums_local = datums2.borrow();
565                    datums_local.extend(datum_iter);
566
567                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
568                        output.push((e.into(), Diff::ONE));
569                    }
570                },
571            );
572        (output, errors.as_collection(|_k, v| v.clone()))
573    }
574
575    /// Build the dataflow to compute and arrange multiple non-accumulable,
576    /// non-hierarchical aggregations on `input`.
577    ///
578    /// This function assumes that we are explicitly rendering multiple basic aggregations.
579    /// For each aggregate, we render a different reduce operator, and then fuse
580    /// results together into a final arrangement that presents all the results
581    /// in the order specified by `aggrs`.
582    fn build_basic_aggregates<S>(
583        &self,
584        input: Collection<S, (Row, Row), Diff>,
585        aggrs: Vec<(usize, AggregateExpr)>,
586        key_arity: usize,
587        mfp_after: Option<SafeMfpPlan>,
588    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
589    where
590        S: Scope<Timestamp = G::Timestamp>,
591    {
592        // We are only using this function to render multiple basic aggregates and
593        // stitch them together. If that's not true we should complain.
594        if aggrs.len() <= 1 {
595            self.error_logger().soft_panic_or_log(
596                "Too few aggregations when building basic aggregates",
597                &format!("len={}", aggrs.len()),
598            )
599        }
600        let mut err_output = None;
601        let mut to_collect = Vec::new();
602        for (index, aggr) in aggrs {
603            let (result, errs) = self.build_basic_aggregate(
604                input.clone(),
605                index,
606                &aggr,
607                err_output.is_none(),
608                key_arity,
609                None,
610                false,
611            );
612            if errs.is_some() {
613                err_output = errs
614            }
615            to_collect.push(
616                result.as_collection(move |key, val| (key.into_owned(), (index, val.into_owned()))),
617            );
618        }
619
620        // Allocations for the two closures.
621        let mut datums1 = DatumVec::new();
622        let mut datums2 = DatumVec::new();
623        let mfp_after1 = mfp_after.clone();
624        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
625
626        let arranged =
627            differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
628                .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
629                "Arranged ReduceFuseBasic input",
630            );
631
632        let output = arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
633            "ReduceFuseBasic",
634            {
635                move |key, input, output| {
636                    let temp_storage = RowArena::new();
637                    let datum_iter = key.to_datum_iter();
638                    let mut datums_local = datums1.borrow();
639                    datums_local.extend(datum_iter);
640                    let key_len = datums_local.len();
641
642                    for ((_, row), _) in input.iter() {
643                        datums_local.push(row.unpack_first());
644                    }
645
646                    if let Some(row) =
647                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
648                    {
649                        output.push((row, Diff::ONE));
650                    }
651                }
652            },
653        );
654        // If `mfp_after` can error, then we need to render a paired reduction
655        // to scan for these potential errors. Note that we cannot directly use
656        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
657        // conditionally render the second component of the reduction pair.
658        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
659        if let Some(mfp) = mfp_after2 {
660            let mfp_errs = arranged
661                .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
662                    "ReduceFuseBasic Error Check",
663                    move |key, input, output| {
664                        // Since negative accumulations are checked in at least one component
665                        // aggregate, we only need to look for MFP errors here.
666                        let temp_storage = RowArena::new();
667                        let datum_iter = key.to_datum_iter();
668                        let mut datums_local = datums2.borrow();
669                        datums_local.extend(datum_iter);
670
671                        for ((_, row), _) in input.iter() {
672                            datums_local.push(row.unpack_first());
673                        }
674
675                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
676                            output.push((e.into(), Diff::ONE));
677                        }
678                    },
679                )
680                .as_collection(|_, v| v.into_owned());
681            (output, validation_errs.concat(&mfp_errs))
682        } else {
683            (output, validation_errs)
684        }
685    }
686
687    /// Build the dataflow to compute a single basic aggregation.
688    ///
689    /// This method also applies distinctness if required.
690    fn build_basic_aggregate<S>(
691        &self,
692        input: Collection<S, (Row, Row), Diff>,
693        index: usize,
694        aggr: &AggregateExpr,
695        validating: bool,
696        key_arity: usize,
697        mfp_after: Option<SafeMfpPlan>,
698        fused_unnest_list: bool,
699    ) -> (
700        RowRowArrangement<S>,
701        Option<Collection<S, DataflowError, Diff>>,
702    )
703    where
704        S: Scope<Timestamp = G::Timestamp>,
705    {
706        let AggregateExpr {
707            func,
708            expr: _,
709            distinct,
710        } = aggr.clone();
711
712        // Extract the value we were asked to aggregate over.
713        let mut partial = input.map(move |(key, row)| {
714            let binding = SharedRow::get();
715            let mut row_builder = binding.borrow_mut();
716            let value = row.iter().nth(index).unwrap();
717            row_builder.packer().push(value);
718            (key, row_builder.clone())
719        });
720
721        let mut err_output = None;
722
723        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
724        if distinct {
725            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
726            let pairer = Pairer::new(key_arity);
727            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
728            if validating {
729                let (oks, errs) = self
730                    .build_reduce_inaccumulable_distinct::<_, _, RowValBuilder<Result<(), String>, _,_>, RowValSpine<Result<(), String>, _, _>>(keyed, None)
731                    .as_collection(|k, v| (k.into_owned(), v.as_ref().map(|&()| ()).map_err(|m| m.as_str().into())))
732                    .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>("Demux Errors", move |(key_val, result)| match result {
733                        Ok(()) => Ok(pairer.split(&key_val)),
734                        Err(m) => Err(EvalError::Internal(m).into()),
735                    });
736                err_output = Some(errs);
737                partial = oks;
738            } else {
739                partial = self
740                    .build_reduce_inaccumulable_distinct::<_, _, RowBuilder<_, _>, RowSpine<_, _>>(
741                        keyed,
742                        Some(" [val: empty]"),
743                    )
744                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
745            }
746        }
747
748        // Allocations for the two closures.
749        let mut datums1 = DatumVec::new();
750        let mut datums2 = DatumVec::new();
751        let mut datums_key_1 = DatumVec::new();
752        let mut datums_key_2 = DatumVec::new();
753        let mfp_after1 = mfp_after.clone();
754        let func2 = func.clone();
755
756        let name = if !fused_unnest_list {
757            "ReduceInaccumulable"
758        } else {
759            "FusedReduceUnnestList"
760        };
761        let arranged = partial
762            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
763                "Arranged {name}"
764            ));
765        let oks = if !fused_unnest_list {
766            arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
767                move |key, source, target| {
768                    // We respect the multiplicity here (unlike in hierarchical aggregation)
769                    // because we don't know that the aggregation method is not sensitive
770                    // to the number of records.
771                    let iter = source.iter().flat_map(|(v, w)| {
772                        // Note that in the non-positive case, this is wrong, but harmless because
773                        // our other reduction will produce an error.
774                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
775                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
776                    });
777
778                    let temp_storage = RowArena::new();
779                    let datum_iter = key.to_datum_iter();
780                    let mut datums_local = datums1.borrow();
781                    datums_local.extend(datum_iter);
782                    let key_len = datums_local.len();
783                    datums_local.push(
784                        // Note that this is not necessarily a window aggregation, in which case
785                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
786                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
787                            iter,
788                            &temp_storage,
789                        ),
790                    );
791
792                    if let Some(row) =
793                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
794                    {
795                        target.push((row, Diff::ONE));
796                    }
797                }
798            })
799        } else {
800            arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
801                move |key, source, target| {
802                    // This part is the same as in the `!fused_unnest_list` if branch above.
803                    let iter = source.iter().flat_map(|(v, w)| {
804                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
805                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
806                    });
807
808                    // This is the part that is specific to the `fused_unnest_list` branch.
809                    let temp_storage = RowArena::new();
810                    let mut datums_local = datums_key_1.borrow();
811                    datums_local.extend(key.to_datum_iter());
812                    let key_len = datums_local.len();
813                    for datum in func
814                        .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
815                            iter,
816                            &temp_storage,
817                        )
818                    {
819                        datums_local.truncate(key_len);
820                        datums_local.push(datum);
821                        if let Some(row) = evaluate_mfp_after(
822                            &mfp_after1,
823                            &mut datums_local,
824                            &temp_storage,
825                            key_len,
826                        ) {
827                            target.push((row, Diff::ONE));
828                        }
829                    }
830                }
831            })
832        };
833
834        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
835        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
836        // rationale around using a second reduction here.
837        let must_validate = validating && err_output.is_none();
838        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
839        if must_validate || mfp_after2.is_some() {
840            let error_logger = self.error_logger();
841
842            let errs = if !fused_unnest_list {
843                arranged
844                    .mz_reduce_abelian::<_, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
845                        &format!("{name} Error Check"),
846                        move |key, source, target| {
847                            // Negative counts would be surprising, but until we are 100% certain we won't
848                            // see them, we should report when we do. We may want to bake even more info
849                            // in here in the future.
850                            if must_validate {
851                                for (value, count) in source.iter() {
852                                    if count.is_positive() {
853                                        continue;
854                                    }
855                                    let value = value.into_owned();
856                                    let message = "Non-positive accumulation in ReduceInaccumulable";
857                                    error_logger
858                                        .log(message, &format!("value={value:?}, count={count}"));
859                                    target.push((EvalError::Internal(message.into()).into(), Diff::ONE));
860                                    return;
861                                }
862                            }
863
864                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
865                            let Some(mfp) = &mfp_after2 else { return };
866                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
867                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
868                                // This would ideally use `to_datum_iter` but we cannot as it needs to
869                                // borrow `v` and only presents datums with that lifetime, not any longer.
870                                std::iter::repeat(v.next().unwrap()).take(count)
871                            });
872
873                            let temp_storage = RowArena::new();
874                            let datum_iter = key.to_datum_iter();
875                            let mut datums_local = datums2.borrow();
876                            datums_local.extend(datum_iter);
877                            datums_local.push(
878                                func2.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
879                                    iter,
880                                    &temp_storage,
881                                ),
882                            );
883                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
884                            {
885                                target.push((e.into(), Diff::ONE));
886                            }
887                        },
888                    )
889                    .as_collection(|_, v| v.into_owned())
890            } else {
891                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
892                assert!(!must_validate);
893                // We couldn't have got into this if branch due to `must_validate`, so it must be
894                // because of the `mfp_after2.is_some()`.
895                let Some(mfp) = mfp_after2 else {
896                    unreachable!()
897                };
898                arranged
899                    .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
900                        &format!("{name} Error Check"),
901                        move |key, source, target| {
902                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
903                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
904                                // This would ideally use `to_datum_iter` but we cannot as it needs to
905                                // borrow `v` and only presents datums with that lifetime, not any longer.
906                                std::iter::repeat(v.next().unwrap()).take(count)
907                            });
908
909                            let temp_storage = RowArena::new();
910                            let mut datums_local = datums_key_2.borrow();
911                            datums_local.extend(key.to_datum_iter());
912                            let key_len = datums_local.len();
913                            for datum in func2
914                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
915                                    iter,
916                                    &temp_storage,
917                                )
918                            {
919                                datums_local.truncate(key_len);
920                                datums_local.push(datum);
921                                // We know that `mfp` can error (because of the `could_error` call
922                                // above), so try to evaluate it here.
923                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
924                                {
925                                    target.push((e.into(), Diff::ONE));
926                                }
927                            }
928                        },
929                    )
930                    .as_collection(|_, v| v.into_owned())
931            };
932
933            if let Some(e) = err_output {
934                err_output = Some(e.concat(&errs));
935            } else {
936                err_output = Some(errs);
937            }
938        }
939        (oks, err_output)
940    }
941
942    fn build_reduce_inaccumulable_distinct<S, V, Bu, Tr>(
943        &self,
944        input: Collection<S, Row, Diff>,
945        name_tag: Option<&str>,
946    ) -> Arranged<S, TraceAgent<Tr>>
947    where
948        S: Scope<Timestamp = G::Timestamp>,
949        V: MaybeValidatingRow<(), String>,
950        Tr: Trace
951            + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff>
952            + 'static,
953        Tr::Batch: Batch,
954        Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
955        Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
956        for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = Row>,
957        for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
958        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
959    {
960        let error_logger = self.error_logger();
961
962        let output_name = format!(
963            "ReduceInaccumulable Distinct{}",
964            name_tag.unwrap_or_default()
965        );
966
967        let input: KeyCollection<_, _, _> = input.into();
968        input
969            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
970                "Arranged ReduceInaccumulable Distinct [val: empty]",
971            )
972            .mz_reduce_abelian::<_, _, _, Bu, Tr>(&output_name, move |_, source, t| {
973                if let Some(err) = V::into_error() {
974                    for (value, count) in source.iter() {
975                        if count.is_positive() {
976                            continue;
977                        }
978
979                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
980                        error_logger.log(message, &format!("value={value:?}, count={count}"));
981                        t.push((err(message.to_string()), Diff::ONE));
982                        return;
983                    }
984                }
985                t.push((V::ok(()), Diff::ONE))
986            })
987    }
988
989    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
990    /// on non-monotonic inputs.
991    ///
992    /// This function renders a single reduction tree that computes aggregations with
993    /// a priority queue implemented with a series of reduce operators that partition
994    /// the input into buckets, and compute the aggregation over very small buckets
995    /// and feed the results up to larger buckets.
996    ///
997    /// Note that this implementation currently ignores the distinct bit because we
998    /// currently only perform min / max hierarchically and the reduction tree
999    /// efficiently suppresses non-distinct updates.
1000    ///
1001    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
1002    /// trickery here to limit the memory usage per layer by internally
1003    /// holding only the elements that were rejected by this stage. However, the
1004    /// output collection maintains the `((key, bucket), (passing value)` for this
1005    /// stage.
1006    fn build_bucketed<S>(
1007        &self,
1008        input: Collection<S, (Row, Row), Diff>,
1009        BucketedPlan {
1010            aggr_funcs,
1011            skips,
1012            buckets,
1013        }: BucketedPlan,
1014        key_arity: usize,
1015        mfp_after: Option<SafeMfpPlan>,
1016    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1017    where
1018        S: Scope<Timestamp = G::Timestamp>,
1019    {
1020        let mut err_output: Option<Collection<S, _, _>> = None;
1021        let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
1022            let input = input.enter(inner);
1023
1024            // The first mod to apply to the hash.
1025            let first_mod = buckets.get(0).copied().unwrap_or(1);
1026
1027            // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
1028            let mut stage = input.map(move |(key, row)| {
1029                let binding = SharedRow::get();
1030                let mut row_builder = binding.borrow_mut();
1031                let mut row_packer = row_builder.packer();
1032                let mut row_iter = row.iter();
1033                for skip in skips.iter() {
1034                    row_packer.push(row_iter.nth(*skip).unwrap());
1035                }
1036                let values = row_builder.clone();
1037
1038                // Apply the initial mod here.
1039                let hash = values.hashed() % first_mod;
1040                let hash_key =
1041                    row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
1042                (hash_key, values)
1043            });
1044
1045            // Repeatedly apply hierarchical reduction with a progressively coarser key.
1046            for (index, b) in buckets.into_iter().enumerate() {
1047                // Apply subsequent bucket mods for all but the first round.
1048                let input = if index == 0 {
1049                    stage
1050                } else {
1051                    stage.map(move |(hash_key, values)| {
1052                        let mut hash_key_iter = hash_key.iter();
1053                        let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
1054                        // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
1055                        let hash_key = SharedRow::pack(
1056                            std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
1057                        );
1058                        (hash_key, values)
1059                    })
1060                };
1061
1062                // We only want the first stage to perform validation of whether invalid accumulations
1063                // were observed in the input. Subsequently, we will either produce an error in the error
1064                // stream or produce correct data in the output stream.
1065                let validating = err_output.is_none();
1066
1067                let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
1068                if let Some(errs) = errs {
1069                    err_output = Some(errs.leave_region());
1070                }
1071                stage = oks
1072            }
1073
1074            // Discard the hash from the key and return to the format of the input data.
1075            let partial = stage.map(move |(hash_key, values)| {
1076                let mut hash_key_iter = hash_key.iter();
1077                let _hash = hash_key_iter.next();
1078                (SharedRow::pack(hash_key_iter.take(key_arity)), values)
1079            });
1080
1081            // Allocations for the two closures.
1082            let mut datums1 = DatumVec::new();
1083            let mut datums2 = DatumVec::new();
1084            let mfp_after1 = mfp_after.clone();
1085            let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1086            let aggr_funcs2 = aggr_funcs.clone();
1087
1088            // Build a series of stages for the reduction
1089            // Arrange the final result into (key, Row)
1090            let error_logger = self.error_logger();
1091            // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1092            // view mz_introspection.mz_expected_group_size_advice.
1093            let arranged = partial
1094                .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1095                    "Arrange ReduceMinsMaxes",
1096                );
1097            // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
1098            // but we then wouldn't be able to do this error check conditionally.  See its documentation
1099            // for the rationale around using a second reduction here.
1100            let must_validate = err_output.is_none();
1101            if must_validate || mfp_after2.is_some() {
1102                let errs = arranged
1103                    .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1104                        "ReduceMinsMaxes Error Check",
1105                        move |key, source, target| {
1106                            // Negative counts would be surprising, but until we are 100% certain we wont
1107                            // see them, we should report when we do. We may want to bake even more info
1108                            // in here in the future.
1109                            if must_validate {
1110                                for (val, count) in source.iter() {
1111                                    if count.is_positive() {
1112                                        continue;
1113                                    }
1114                                    let val = val.into_owned();
1115                                    let message = "Non-positive accumulation in ReduceMinsMaxes";
1116                                    error_logger
1117                                        .log(message, &format!("val={val:?}, count={count}"));
1118                                    target.push((
1119                                        EvalError::Internal(message.into()).into(),
1120                                        Diff::ONE,
1121                                    ));
1122                                    return;
1123                                }
1124                            }
1125
1126                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
1127                            let Some(mfp) = &mfp_after2 else { return };
1128                            let temp_storage = RowArena::new();
1129                            let datum_iter = key.to_datum_iter();
1130                            let mut datums_local = datums2.borrow();
1131                            datums_local.extend(datum_iter);
1132
1133                            let mut source_iters = source
1134                                .iter()
1135                                .map(|(values, _cnt)| *values)
1136                                .collect::<Vec<_>>();
1137                            for func in aggr_funcs2.iter() {
1138                                let column_iter = (0..source_iters.len())
1139                                    .map(|i| source_iters[i].next().unwrap());
1140                                datums_local.push(func.eval(column_iter, &temp_storage));
1141                            }
1142                            if let Result::Err(e) =
1143                                mfp.evaluate_inner(&mut datums_local, &temp_storage)
1144                            {
1145                                target.push((e.into(), Diff::ONE));
1146                            }
1147                        },
1148                    )
1149                    .as_collection(|_, v| v.into_owned())
1150                    .leave_region();
1151                if let Some(e) = &err_output {
1152                    err_output = Some(e.concat(&errs));
1153                } else {
1154                    err_output = Some(errs);
1155                }
1156            }
1157            arranged
1158                .mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1159                    "ReduceMinsMaxes",
1160                    move |key, source, target| {
1161                        let temp_storage = RowArena::new();
1162                        let datum_iter = key.to_datum_iter();
1163                        let mut datums_local = datums1.borrow();
1164                        datums_local.extend(datum_iter);
1165                        let key_len = datums_local.len();
1166
1167                        let mut source_iters = source
1168                            .iter()
1169                            .map(|(values, _cnt)| *values)
1170                            .collect::<Vec<_>>();
1171                        for func in aggr_funcs.iter() {
1172                            let column_iter =
1173                                (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1174                            datums_local.push(func.eval(column_iter, &temp_storage));
1175                        }
1176
1177                        if let Some(row) = evaluate_mfp_after(
1178                            &mfp_after1,
1179                            &mut datums_local,
1180                            &temp_storage,
1181                            key_len,
1182                        ) {
1183                            target.push((row, Diff::ONE));
1184                        }
1185                    },
1186                )
1187                .leave_region()
1188        });
1189        (
1190            arranged_output,
1191            err_output.expect("expected to validate in one level of the hierarchy"),
1192        )
1193    }
1194
1195    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
1196    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
1197    /// with the negation of what's produced by the reduction.
1198    /// `validating` indicates whether we want this stage to perform error detection
1199    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
1200    /// stages can skip validation.
1201    fn build_bucketed_stage<S>(
1202        &self,
1203        aggr_funcs: &Vec<AggregateFunc>,
1204        input: &Collection<S, (Row, Row), Diff>,
1205        validating: bool,
1206    ) -> (
1207        Collection<S, (Row, Row), Diff>,
1208        Option<Collection<S, DataflowError, Diff>>,
1209    )
1210    where
1211        S: Scope<Timestamp = G::Timestamp>,
1212    {
1213        let (input, negated_output, errs) = if validating {
1214            let (input, reduced) = self
1215                .build_bucketed_negated_output::<_, _, RowValBuilder<_,_,_>, RowValSpine<Result<Row, Row>, _, _>>(
1216                    input,
1217                    aggr_funcs.clone(),
1218                );
1219            let (oks, errs) = reduced
1220                .as_collection(|k, v| (k.into_owned(), v.clone()))
1221                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1222                "Checked Invalid Accumulations",
1223                |(hash_key, result)| match result {
1224                    Err(hash_key) => {
1225                        let mut hash_key_iter = hash_key.iter();
1226                        let _hash = hash_key_iter.next();
1227                        let key = SharedRow::pack(hash_key_iter);
1228                        let message = format!(
1229                            "Invalid data in source, saw non-positive accumulation \
1230                                         for key {key:?} in hierarchical mins-maxes aggregate"
1231                        );
1232                        Err(EvalError::Internal(message.into()).into())
1233                    }
1234                    Ok(values) => Ok((hash_key, values)),
1235                },
1236            );
1237            (input, oks, Some(errs))
1238        } else {
1239            let (input, reduced) = self
1240                .build_bucketed_negated_output::<_, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1241                    input,
1242                    aggr_funcs.clone(),
1243                );
1244            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1245            // that we need to apply the mod on both input and oks.
1246            let oks = reduced.as_collection(|k, v| (k.into_owned(), v.into_owned()));
1247            (input, oks, None)
1248        };
1249
1250        let input = input.as_collection(|k, v| (k.into_owned(), v.into_owned()));
1251        let oks = negated_output.concat(&input);
1252        (oks, errs)
1253    }
1254
1255    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1256    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1257    /// with all diffs in the reduction's output negated.
1258    fn build_bucketed_negated_output<S, V, Bu, Tr>(
1259        &self,
1260        input: &Collection<S, (Row, Row), Diff>,
1261        aggrs: Vec<AggregateFunc>,
1262    ) -> (
1263        Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1264        Arranged<S, TraceAgent<Tr>>,
1265    )
1266    where
1267        S: Scope<Timestamp = G::Timestamp>,
1268        V: MaybeValidatingRow<Row, Row>,
1269        Tr: Trace
1270            + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff>
1271            + 'static,
1272        Tr::Batch: Batch,
1273        Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
1274        Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
1275        for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
1276        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1277    {
1278        let error_logger = self.error_logger();
1279        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1280        // view mz_introspection.mz_expected_group_size_advice.
1281        let arranged_input = input
1282            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1283                "Arranged MinsMaxesHierarchical input",
1284            );
1285
1286        let reduced = arranged_input.mz_reduce_abelian::<_, _, _, Bu, Tr>(
1287            "Reduced Fallibly MinsMaxesHierarchical",
1288            move |key, source, target| {
1289                if let Some(err) = V::into_error() {
1290                    // Should negative accumulations reach us, we should loudly complain.
1291                    for (value, count) in source.iter() {
1292                        if count.is_positive() {
1293                            continue;
1294                        }
1295                        error_logger.log(
1296                            "Non-positive accumulation in MinsMaxesHierarchical",
1297                            &format!("key={key:?}, value={value:?}, count={count}"),
1298                        );
1299                        // After complaining, output an error here so that we can eventually
1300                        // report it in an error stream.
1301                        target.push((err(key.into_owned()), Diff::MINUS_ONE));
1302                        return;
1303                    }
1304                }
1305
1306                let binding = SharedRow::get();
1307                let mut row_builder = binding.borrow_mut();
1308                let mut row_packer = row_builder.packer();
1309
1310                let mut source_iters = source
1311                    .iter()
1312                    .map(|(values, _cnt)| *values)
1313                    .collect::<Vec<_>>();
1314                for func in aggrs.iter() {
1315                    let column_iter =
1316                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1317                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1318                }
1319                // We only want to arrange the parts of the input that are not part of the output.
1320                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1321                // gives us the intended value of this aggregate function. Also we assume that regardless
1322                // of the multiplicity of the final result in the input, we only want to have one copy
1323                // in the output.
1324                target.reserve(source.len().saturating_add(1));
1325                target.push((V::ok(row_builder.clone()), Diff::MINUS_ONE));
1326                target.extend(source.iter().map(|(values, cnt)| {
1327                    let mut cnt = *cnt;
1328                    cnt.negate();
1329                    (V::ok((*values).into_owned()), cnt)
1330                }));
1331            },
1332        );
1333        (arranged_input, reduced)
1334    }
1335
1336    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1337    /// on monotonic inputs.
1338    fn build_monotonic<S>(
1339        &self,
1340        collection: Collection<S, (Row, Row), Diff>,
1341        MonotonicPlan {
1342            aggr_funcs,
1343            skips,
1344            must_consolidate,
1345        }: MonotonicPlan,
1346        mfp_after: Option<SafeMfpPlan>,
1347    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1348    where
1349        S: Scope<Timestamp = G::Timestamp>,
1350    {
1351        // Gather the relevant values into a vec of rows ordered by aggregation_index
1352        let collection = collection
1353            .map(move |(key, row)| {
1354                let binding = SharedRow::get();
1355                let mut row_builder = binding.borrow_mut();
1356                let mut values = Vec::with_capacity(skips.len());
1357                let mut row_iter = row.iter();
1358                for skip in skips.iter() {
1359                    values.push(
1360                        row_builder.pack_using(std::iter::once(row_iter.nth(*skip).unwrap())),
1361                    );
1362                }
1363
1364                (key, values)
1365            })
1366            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1367                must_consolidate,
1368                "Consolidated ReduceMonotonic input",
1369            );
1370
1371        // It should be now possible to ensure that we have a monotonic collection.
1372        let error_logger = self.error_logger();
1373        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1374            error_logger.log(
1375                "Non-monotonic input to ReduceMonotonic",
1376                &format!("data={data:?}, diff={diff}"),
1377            );
1378            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1379            (EvalError::Internal(m).into(), Diff::ONE)
1380        });
1381        // We can place our rows directly into the diff field, and
1382        // only keep the relevant one corresponding to evaluating our
1383        // aggregate, instead of having to do a hierarchical reduction.
1384        let partial = partial.explode_one(move |(key, values)| {
1385            let mut output = Vec::new();
1386            for (row, func) in values.into_iter().zip(aggr_funcs.iter()) {
1387                output.push(monoids::get_monoid(row, func).expect(
1388                    "hierarchical aggregations are expected to have monoid implementations",
1389                ));
1390            }
1391            (key, output)
1392        });
1393
1394        // Allocations for the two closures.
1395        let mut datums1 = DatumVec::new();
1396        let mut datums2 = DatumVec::new();
1397        let mfp_after1 = mfp_after.clone();
1398        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1399
1400        let partial: KeyCollection<_, _, _> = partial.into();
1401        let arranged = partial
1402            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1403                "ArrangeMonotonic [val: empty]",
1404            );
1405        let output = arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1406            "ReduceMonotonic",
1407            {
1408                move |key, input, output| {
1409                    let temp_storage = RowArena::new();
1410                    let datum_iter = key.to_datum_iter();
1411                    let mut datums_local = datums1.borrow();
1412                    datums_local.extend(datum_iter);
1413                    let key_len = datums_local.len();
1414                    let accum = &input[0].1;
1415                    for monoid in accum.iter() {
1416                        datums_local.extend(monoid.finalize().iter());
1417                    }
1418
1419                    if let Some(row) =
1420                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1421                    {
1422                        output.push((row, Diff::ONE));
1423                    }
1424                }
1425            },
1426        );
1427
1428        // If `mfp_after` can error, then we need to render a paired reduction
1429        // to scan for these potential errors. Note that we cannot directly use
1430        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1431        // conditionally render the second component of the reduction pair.
1432        if let Some(mfp) = mfp_after2 {
1433            let mfp_errs = arranged
1434                .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1435                    "ReduceMonotonic Error Check",
1436                    move |key, input, output| {
1437                        let temp_storage = RowArena::new();
1438                        let datum_iter = key.to_datum_iter();
1439                        let mut datums_local = datums2.borrow();
1440                        datums_local.extend(datum_iter);
1441                        let accum = &input[0].1;
1442                        for monoid in accum.iter() {
1443                            datums_local.extend(monoid.finalize().iter());
1444                        }
1445                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1446                        {
1447                            output.push((e.into(), Diff::ONE));
1448                        }
1449                    },
1450                )
1451                .as_collection(|_k, v| v.clone());
1452            (output, validation_errs.concat(&mfp_errs))
1453        } else {
1454            (output, validation_errs)
1455        }
1456    }
1457
1458    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1459    ///
1460    /// The incoming values are moved to the update's "difference" field, at which point
1461    /// they can be accumulated in place. The `count` operator promotes the accumulated
1462    /// values to data, at which point a final map applies operator-specific logic to
1463    /// yield the final aggregate.
1464    fn build_accumulable<S>(
1465        &self,
1466        collection: Collection<S, (Row, Row), Diff>,
1467        AccumulablePlan {
1468            full_aggrs,
1469            simple_aggrs,
1470            distinct_aggrs,
1471        }: AccumulablePlan,
1472        key_arity: usize,
1473        mfp_after: Option<SafeMfpPlan>,
1474    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1475    where
1476        S: Scope<Timestamp = G::Timestamp>,
1477    {
1478        // we must have called this function with something to reduce
1479        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1480            self.error_logger().soft_panic_or_log(
1481                "Incorrect numbers of aggregates in accummulable reduction rendering",
1482                &format!(
1483                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1484                    full_aggrs.len(),
1485                    simple_aggrs.len(),
1486                    distinct_aggrs.len(),
1487                ),
1488            );
1489        }
1490
1491        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1492        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1493        // Other aggregations can be directly moved in to the `diff` field.
1494        //
1495        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1496        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1497        // generally the count, and then two aggregation-specific values. The size could be
1498        // reduced if we want to specialize for the aggregations.
1499
1500        // Instantiate a default vector for diffs with the correct types at each
1501        // position.
1502        let zero_diffs: (Vec<_>, Diff) = (
1503            full_aggrs
1504                .iter()
1505                .map(|f| accumulable_zero(&f.func))
1506                .collect(),
1507            Diff::ZERO,
1508        );
1509
1510        let mut to_aggregate = Vec::new();
1511        if simple_aggrs.len() > 0 {
1512            // First, collect all non-distinct aggregations in one pass.
1513            let easy_cases = collection.explode_one({
1514                let zero_diffs = zero_diffs.clone();
1515                move |(key, row)| {
1516                    let mut diffs = zero_diffs.clone();
1517                    // Try to unpack only the datums we need. Unfortunately, since we
1518                    // can't random access into a Row, we have to iterate through one by one.
1519                    // TODO: Even though we don't have random access, we could still avoid unpacking
1520                    // everything that we don't care about, and it might be worth it to extend the
1521                    // Row API to do that.
1522                    let mut row_iter = row.iter().enumerate();
1523                    for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
1524                        let mut datum = row_iter.next().unwrap();
1525                        while datum_index != &datum.0 {
1526                            datum = row_iter.next().unwrap();
1527                        }
1528                        let datum = datum.1;
1529                        diffs.0[*accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1530                        diffs.1 = Diff::ONE;
1531                    }
1532                    ((key, ()), diffs)
1533                }
1534            });
1535            to_aggregate.push(easy_cases);
1536        }
1537
1538        // Next, collect all aggregations that require distinctness.
1539        for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
1540            let pairer = Pairer::new(key_arity);
1541            let collection = collection
1542                .map(move |(key, row)| {
1543                    let value = row.iter().nth(datum_index).unwrap();
1544                    (pairer.merge(&key, std::iter::once(value)), ())
1545                })
1546                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1547                    "Arranged Accumulable Distinct [val: empty]",
1548                )
1549                .mz_reduce_abelian::<_, _, _, RowBuilder<_, _>, RowSpine<_, _>>(
1550                    "Reduced Accumulable Distinct [val: empty]",
1551                    move |_k, _s, t| t.push(((), Diff::ONE)),
1552                )
1553                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1554                .explode_one({
1555                    let zero_diffs = zero_diffs.clone();
1556                    move |(key, row)| {
1557                        let datum = row.iter().next().unwrap();
1558                        let mut diffs = zero_diffs.clone();
1559                        diffs.0[accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1560                        diffs.1 = Diff::ONE;
1561                        ((key, ()), diffs)
1562                    }
1563                });
1564            to_aggregate.push(collection);
1565        }
1566
1567        // now concatenate, if necessary, multiple aggregations
1568        let collection = if to_aggregate.len() == 1 {
1569            to_aggregate.remove(0)
1570        } else {
1571            differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1572        };
1573
1574        // Allocations for the two closures.
1575        let mut datums1 = DatumVec::new();
1576        let mut datums2 = DatumVec::new();
1577        let mfp_after1 = mfp_after.clone();
1578        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1579        let full_aggrs2 = full_aggrs.clone();
1580
1581        let error_logger = self.error_logger();
1582        let err_full_aggrs = full_aggrs.clone();
1583        let (arranged_output, arranged_errs) = collection
1584            .mz_arrange::<RowBatcher<_,_>, RowBuilder<_,_>, RowSpine<_, (Vec<Accum>, Diff)>>("ArrangeAccumulable [val: empty]")
1585            .reduce_pair::<_, _, _, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
1586                "ReduceAccumulable",
1587                "AccumulableErrorCheck",
1588                {
1589                    move |key, input, output| {
1590                        let (ref accums, total) = input[0].1;
1591
1592                        let temp_storage = RowArena::new();
1593                        let datum_iter = key.to_datum_iter();
1594                        let mut datums_local = datums1.borrow();
1595                        datums_local.extend(datum_iter);
1596                        let key_len = datums_local.len();
1597                        for (aggr, accum) in full_aggrs.iter().zip(accums) {
1598                            datums_local.push(finalize_accum(&aggr.func, accum, total));
1599                        }
1600
1601                        if let Some(row) = evaluate_mfp_after(
1602                            &mfp_after1,
1603                            &mut datums_local,
1604                            &temp_storage,
1605                            key_len,
1606                        ) {
1607                            output.push((row, Diff::ONE));
1608                        }
1609                    }
1610                },
1611                move |key, input, output| {
1612                    let (ref accums, total) = input[0].1;
1613                    for (aggr, accum) in err_full_aggrs.iter().zip(accums) {
1614                        // We first test here if inputs without net-positive records are present,
1615                        // producing an error to the logs and to the query output if that is the case.
1616                        if total == Diff::ZERO && !accum.is_zero() {
1617                            error_logger.log(
1618                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1619                                &format!("aggr={aggr:?}, accum={accum:?}"),
1620                            );
1621                            let key = key.into_owned();
1622                            let message = format!(
1623                                "Invalid data in source, saw net-zero records for key {key} \
1624                                 with non-zero accumulation in accumulable aggregate"
1625                            );
1626                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1627                        }
1628                        match (&aggr.func, &accum) {
1629                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1630                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1631                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1632                                if accum.is_negative() {
1633                                    error_logger.log(
1634                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1635                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1636                                );
1637                                    let key = key.into_owned();
1638                                    let message = format!(
1639                                        "Invalid data in source, saw negative accumulation with \
1640                                         unsigned type for key {key}"
1641                                    );
1642                                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1643                                }
1644                            }
1645                            _ => (), // no more errors to check for at this point!
1646                        }
1647                    }
1648
1649                    // If `mfp_after` can error, then evaluate it here.
1650                    let Some(mfp) = &mfp_after2 else { return };
1651                    let temp_storage = RowArena::new();
1652                    let datum_iter = key.to_datum_iter();
1653                    let mut datums_local = datums2.borrow();
1654                    datums_local.extend(datum_iter);
1655                    for (aggr, accum) in full_aggrs2.iter().zip(accums) {
1656                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1657                    }
1658
1659                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1660                        output.push((e.into(), Diff::ONE));
1661                    }
1662                },
1663            );
1664        (
1665            arranged_output,
1666            arranged_errs.as_collection(|_key, error| error.into_owned()),
1667        )
1668    }
1669}
1670
1671/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1672/// containing key and aggregate values, then returns a result `Row` or `None`
1673/// if the MFP filters the result out.
1674fn evaluate_mfp_after<'a, 'b>(
1675    mfp_after: &'a Option<SafeMfpPlan>,
1676    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1677    temp_storage: &'a RowArena,
1678    key_len: usize,
1679) -> Option<Row> {
1680    let binding = SharedRow::get();
1681    let mut row_builder = binding.borrow_mut();
1682    // Apply MFP if it exists and pack a Row of
1683    // aggregate values from `datums_local`.
1684    if let Some(mfp) = mfp_after {
1685        // It must ignore errors here, but they are scanned
1686        // for elsewhere if the MFP can error.
1687        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1688            // The `mfp_after` must preserve the key columns,
1689            // so we can skip them to form aggregation results.
1690            Some(row_builder.pack_using(iter.skip(key_len)))
1691        } else {
1692            None
1693        }
1694    } else {
1695        Some(row_builder.pack_using(&datums_local[key_len..]))
1696    }
1697}
1698
1699fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1700    match aggr_func {
1701        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1702            trues: Diff::ZERO,
1703            falses: Diff::ZERO,
1704        },
1705        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1706            accum: AccumCount::ZERO,
1707            pos_infs: Diff::ZERO,
1708            neg_infs: Diff::ZERO,
1709            nans: Diff::ZERO,
1710            non_nulls: Diff::ZERO,
1711        },
1712        AggregateFunc::SumNumeric => Accum::Numeric {
1713            accum: OrderedDecimal(NumericAgg::zero()),
1714            pos_infs: Diff::ZERO,
1715            neg_infs: Diff::ZERO,
1716            nans: Diff::ZERO,
1717            non_nulls: Diff::ZERO,
1718        },
1719        _ => Accum::SimpleNumber {
1720            accum: AccumCount::ZERO,
1721            non_nulls: Diff::ZERO,
1722        },
1723    }
1724}
1725
1726static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1727
1728fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1729    match aggregate_func {
1730        AggregateFunc::Count => Accum::SimpleNumber {
1731            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1732            non_nulls: if datum.is_null() {
1733                Diff::ZERO
1734            } else {
1735                Diff::ONE
1736            },
1737        },
1738        AggregateFunc::Any | AggregateFunc::All => match datum {
1739            Datum::True => Accum::Bool {
1740                trues: Diff::ONE,
1741                falses: Diff::ZERO,
1742            },
1743            Datum::Null => Accum::Bool {
1744                trues: Diff::ZERO,
1745                falses: Diff::ZERO,
1746            },
1747            Datum::False => Accum::Bool {
1748                trues: Diff::ZERO,
1749                falses: Diff::ONE,
1750            },
1751            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1752        },
1753        AggregateFunc::Dummy => match datum {
1754            Datum::Dummy => Accum::SimpleNumber {
1755                accum: AccumCount::ZERO,
1756                non_nulls: Diff::ZERO,
1757            },
1758            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1759        },
1760        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1761            let n = match datum {
1762                Datum::Float32(n) => f64::from(*n),
1763                Datum::Float64(n) => *n,
1764                Datum::Null => 0f64,
1765                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1766            };
1767
1768            let nans = Diff::from(n.is_nan());
1769            let pos_infs = Diff::from(n == f64::INFINITY);
1770            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1771            let non_nulls = Diff::from(datum != Datum::Null);
1772
1773            // Map the floating point value onto a fixed precision domain
1774            // All special values should map to zero, since they are tracked separately
1775            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1776                AccumCount::ZERO
1777            } else {
1778                // This operation will truncate to i128::MAX if out of range.
1779                // TODO(benesch): rewrite to avoid `as`.
1780                #[allow(clippy::as_conversions)]
1781                { (n * *FLOAT_SCALE) as i128 }.into()
1782            };
1783
1784            Accum::Float {
1785                accum,
1786                pos_infs,
1787                neg_infs,
1788                nans,
1789                non_nulls,
1790            }
1791        }
1792        AggregateFunc::SumNumeric => match datum {
1793            Datum::Numeric(n) => {
1794                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1795                    if n.0.is_negative() {
1796                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1797                    } else {
1798                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1799                    }
1800                } else if n.0.is_nan() {
1801                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1802                } else {
1803                    // Take a narrow decimal (datum) into a wide decimal
1804                    // (aggregator).
1805                    let mut cx_agg = numeric::cx_agg();
1806                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1807                };
1808
1809                Accum::Numeric {
1810                    accum: OrderedDecimal(accum),
1811                    pos_infs,
1812                    neg_infs,
1813                    nans,
1814                    non_nulls: Diff::ONE,
1815                }
1816            }
1817            Datum::Null => Accum::Numeric {
1818                accum: OrderedDecimal(NumericAgg::zero()),
1819                pos_infs: Diff::ZERO,
1820                neg_infs: Diff::ZERO,
1821                nans: Diff::ZERO,
1822                non_nulls: Diff::ZERO,
1823            },
1824            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1825        },
1826        _ => {
1827            // Other accumulations need to disentangle the accumulable
1828            // value from its NULL-ness, which is not quite as easily
1829            // accumulated.
1830            match datum {
1831                Datum::Int16(i) => Accum::SimpleNumber {
1832                    accum: i.into(),
1833                    non_nulls: Diff::ONE,
1834                },
1835                Datum::Int32(i) => Accum::SimpleNumber {
1836                    accum: i.into(),
1837                    non_nulls: Diff::ONE,
1838                },
1839                Datum::Int64(i) => Accum::SimpleNumber {
1840                    accum: i.into(),
1841                    non_nulls: Diff::ONE,
1842                },
1843                Datum::UInt16(u) => Accum::SimpleNumber {
1844                    accum: u.into(),
1845                    non_nulls: Diff::ONE,
1846                },
1847                Datum::UInt32(u) => Accum::SimpleNumber {
1848                    accum: u.into(),
1849                    non_nulls: Diff::ONE,
1850                },
1851                Datum::UInt64(u) => Accum::SimpleNumber {
1852                    accum: u.into(),
1853                    non_nulls: Diff::ONE,
1854                },
1855                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1856                    accum: u64::from(t).into(),
1857                    non_nulls: Diff::ONE,
1858                },
1859                Datum::Null => Accum::SimpleNumber {
1860                    accum: AccumCount::ZERO,
1861                    non_nulls: Diff::ZERO,
1862                },
1863                x => panic!("Accumulating non-integer data: {x:?}"),
1864            }
1865        }
1866    }
1867}
1868
1869fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1870    // The finished value depends on the aggregation function in a variety of ways.
1871    // For all aggregates but count, if only null values were
1872    // accumulated, then the output is null.
1873    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1874        Datum::Null
1875    } else {
1876        match (&aggr_func, &accum) {
1877            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1878                Datum::Int64(non_nulls.into_inner())
1879            }
1880            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1881                // If any false, else if all true, else must be no false and some nulls.
1882                if falses.is_positive() {
1883                    Datum::False
1884                } else if *trues == total {
1885                    Datum::True
1886                } else {
1887                    Datum::Null
1888                }
1889            }
1890            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1891                // If any true, else if all false, else must be no true and some nulls.
1892                if trues.is_positive() {
1893                    Datum::True
1894                } else if *falses == total {
1895                    Datum::False
1896                } else {
1897                    Datum::Null
1898                }
1899            }
1900            (AggregateFunc::Dummy, _) => Datum::Dummy,
1901            // If any non-nulls, just report the aggregate.
1902            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1903            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1904                // This conversion is safe, as long as we have less than 2^32
1905                // summands.
1906                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1907                // If so, rewrite to avoid `as`.
1908                #[allow(clippy::as_conversions)]
1909                Datum::Int64(accum.into_inner() as i64)
1910            }
1911            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1912            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1913            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1914                if !accum.is_negative() {
1915                    // Our semantics of overflow are not clearly articulated wrt.
1916                    // unsigned vs. signed types (database-issues#5172). We adopt an
1917                    // unsigned wrapping behavior to match what we do above for
1918                    // signed types.
1919                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1920                    #[allow(clippy::as_conversions)]
1921                    Datum::UInt64(accum.into_inner() as u64)
1922                } else {
1923                    // Note that we return a value here, but an error in the other
1924                    // operator of the reduce_pair. Therefore, we expect that this
1925                    // value will never be exposed as an output.
1926                    Datum::Null
1927                }
1928            }
1929            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1930                if !accum.is_negative() {
1931                    Datum::from(*accum)
1932                } else {
1933                    // Note that we return a value here, but an error in the other
1934                    // operator of the reduce_pair. Therefore, we expect that this
1935                    // value will never be exposed as an output.
1936                    Datum::Null
1937                }
1938            }
1939            (
1940                AggregateFunc::SumFloat32,
1941                Accum::Float {
1942                    accum,
1943                    pos_infs,
1944                    neg_infs,
1945                    nans,
1946                    non_nulls: _,
1947                },
1948            ) => {
1949                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1950                    // NaNs are NaNs and cases where we've seen a
1951                    // mixture of positive and negative infinities.
1952                    Datum::from(f32::NAN)
1953                } else if pos_infs.is_positive() {
1954                    Datum::from(f32::INFINITY)
1955                } else if neg_infs.is_positive() {
1956                    Datum::from(f32::NEG_INFINITY)
1957                } else {
1958                    // TODO(benesch): remove potentially dangerous usage of `as`.
1959                    #[allow(clippy::as_conversions)]
1960                    {
1961                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1962                    }
1963                }
1964            }
1965            (
1966                AggregateFunc::SumFloat64,
1967                Accum::Float {
1968                    accum,
1969                    pos_infs,
1970                    neg_infs,
1971                    nans,
1972                    non_nulls: _,
1973                },
1974            ) => {
1975                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1976                    // NaNs are NaNs and cases where we've seen a
1977                    // mixture of positive and negative infinities.
1978                    Datum::from(f64::NAN)
1979                } else if pos_infs.is_positive() {
1980                    Datum::from(f64::INFINITY)
1981                } else if neg_infs.is_positive() {
1982                    Datum::from(f64::NEG_INFINITY)
1983                } else {
1984                    // TODO(benesch): remove potentially dangerous usage of `as`.
1985                    #[allow(clippy::as_conversions)]
1986                    {
1987                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1988                    }
1989                }
1990            }
1991            (
1992                AggregateFunc::SumNumeric,
1993                Accum::Numeric {
1994                    accum,
1995                    pos_infs,
1996                    neg_infs,
1997                    nans,
1998                    non_nulls: _,
1999                },
2000            ) => {
2001                let mut cx_datum = numeric::cx_datum();
2002                let d = cx_datum.to_width(accum.0);
2003                // Take a wide decimal (aggregator) into a
2004                // narrow decimal (datum). If this operation
2005                // overflows the datum, this new value will be
2006                // +/- infinity. However, the aggregator tracks
2007                // the amount of overflow, making it invertible.
2008                let inf_d = d.is_infinite();
2009                let neg_d = d.is_negative();
2010                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
2011                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
2012                if nans.is_positive() || (pos_inf && neg_inf) {
2013                    // NaNs are NaNs and cases where we've seen a
2014                    // mixture of positive and negative infinities.
2015                    Datum::from(Numeric::nan())
2016                } else if pos_inf {
2017                    Datum::from(Numeric::infinity())
2018                } else if neg_inf {
2019                    let mut cx = numeric::cx_datum();
2020                    let mut d = Numeric::infinity();
2021                    cx.neg(&mut d);
2022                    Datum::from(d)
2023                } else {
2024                    Datum::from(d)
2025                }
2026            }
2027            _ => panic!(
2028                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
2029                aggr_func
2030            ),
2031        }
2032    }
2033}
2034
2035/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
2036type AccumCount = mz_ore::Overflowing<i128>;
2037
2038/// Accumulates values for the various types of accumulable aggregations.
2039///
2040/// We assume that there are not more than 2^32 elements for the aggregation.
2041/// Thus we can perform a summation over i32 in an i64 accumulator
2042/// and not worry about exceeding its bounds.
2043///
2044/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
2045/// point representation has less precision than a double. It is entirely possible
2046/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
2047/// to preserve group guarantees.
2048#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2049enum Accum {
2050    /// Accumulates boolean values.
2051    Bool {
2052        /// The number of `true` values observed.
2053        trues: Diff,
2054        /// The number of `false` values observed.
2055        falses: Diff,
2056    },
2057    /// Accumulates simple numeric values.
2058    SimpleNumber {
2059        /// The accumulation of all non-NULL values observed.
2060        accum: AccumCount,
2061        /// The number of non-NULL values observed.
2062        non_nulls: Diff,
2063    },
2064    /// Accumulates float values.
2065    Float {
2066        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
2067        /// preserve associativity and commutativity
2068        accum: AccumCount,
2069        /// Counts +inf
2070        pos_infs: Diff,
2071        /// Counts -inf
2072        neg_infs: Diff,
2073        /// Counts NaNs
2074        nans: Diff,
2075        /// Counts non-NULL values
2076        non_nulls: Diff,
2077    },
2078    /// Accumulates arbitrary precision decimals.
2079    Numeric {
2080        /// Accumulates non-special values
2081        accum: OrderedDecimal<NumericAgg>,
2082        /// Counts +inf
2083        pos_infs: Diff,
2084        /// Counts -inf
2085        neg_infs: Diff,
2086        /// Counts NaNs
2087        nans: Diff,
2088        /// Counts non-NULL values
2089        non_nulls: Diff,
2090    },
2091}
2092
2093impl IsZero for Accum {
2094    fn is_zero(&self) -> bool {
2095        match self {
2096            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2097            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2098            Accum::Float {
2099                accum,
2100                pos_infs,
2101                neg_infs,
2102                nans,
2103                non_nulls,
2104            } => {
2105                accum.is_zero()
2106                    && pos_infs.is_zero()
2107                    && neg_infs.is_zero()
2108                    && nans.is_zero()
2109                    && non_nulls.is_zero()
2110            }
2111            Accum::Numeric {
2112                accum,
2113                pos_infs,
2114                neg_infs,
2115                nans,
2116                non_nulls,
2117            } => {
2118                accum.0.is_zero()
2119                    && pos_infs.is_zero()
2120                    && neg_infs.is_zero()
2121                    && nans.is_zero()
2122                    && non_nulls.is_zero()
2123            }
2124        }
2125    }
2126}
2127
2128impl Semigroup for Accum {
2129    fn plus_equals(&mut self, other: &Accum) {
2130        match (&mut *self, other) {
2131            (
2132                Accum::Bool { trues, falses },
2133                Accum::Bool {
2134                    trues: other_trues,
2135                    falses: other_falses,
2136                },
2137            ) => {
2138                *trues += other_trues;
2139                *falses += other_falses;
2140            }
2141            (
2142                Accum::SimpleNumber { accum, non_nulls },
2143                Accum::SimpleNumber {
2144                    accum: other_accum,
2145                    non_nulls: other_non_nulls,
2146                },
2147            ) => {
2148                *accum += other_accum;
2149                *non_nulls += other_non_nulls;
2150            }
2151            (
2152                Accum::Float {
2153                    accum,
2154                    pos_infs,
2155                    neg_infs,
2156                    nans,
2157                    non_nulls,
2158                },
2159                Accum::Float {
2160                    accum: other_accum,
2161                    pos_infs: other_pos_infs,
2162                    neg_infs: other_neg_infs,
2163                    nans: other_nans,
2164                    non_nulls: other_non_nulls,
2165                },
2166            ) => {
2167                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2168                    warn!("Float accumulator overflow. Incorrect results possible");
2169                    accum.wrapping_add(*other_accum)
2170                });
2171                *pos_infs += other_pos_infs;
2172                *neg_infs += other_neg_infs;
2173                *nans += other_nans;
2174                *non_nulls += other_non_nulls;
2175            }
2176            (
2177                Accum::Numeric {
2178                    accum,
2179                    pos_infs,
2180                    neg_infs,
2181                    nans,
2182                    non_nulls,
2183                },
2184                Accum::Numeric {
2185                    accum: other_accum,
2186                    pos_infs: other_pos_infs,
2187                    neg_infs: other_neg_infs,
2188                    nans: other_nans,
2189                    non_nulls: other_non_nulls,
2190                },
2191            ) => {
2192                let mut cx_agg = numeric::cx_agg();
2193                cx_agg.add(&mut accum.0, &other_accum.0);
2194                // `rounded` signals we have exceeded the aggregator's max
2195                // precision, which means we've lost commutativity and
2196                // associativity; nothing to be done here, so panic. For more
2197                // context, see the DEC_Rounded definition at
2198                // http://speleotrove.com/decimal/dncont.html
2199                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2200                // Reduce to reclaim unused decimal precision. Note that this
2201                // reduction must happen somewhere to make the following
2202                // invertible:
2203                // ```
2204                // CREATE TABLE a (a numeric);
2205                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2206                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2207                // ```
2208                // This will now return infinity. However, we can retract the
2209                // value that blew up its precision:
2210                // ```
2211                // INSERT INTO a VALUES ('-9e-39');
2212                // ```
2213                // This leaves `t`'s aggregator with a value of 9e39. However,
2214                // without doing a reduction, `libdecnum` will store the value
2215                // as 9e39+0e-39, which still exceeds the narrower context's
2216                // precision. By doing the reduction, we can "reclaim" the 39
2217                // digits of precision.
2218                cx_agg.reduce(&mut accum.0);
2219                *pos_infs += other_pos_infs;
2220                *neg_infs += other_neg_infs;
2221                *nans += other_nans;
2222                *non_nulls += other_non_nulls;
2223            }
2224            (l, r) => unreachable!(
2225                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2226            ),
2227        }
2228    }
2229}
2230
2231impl Multiply<Diff> for Accum {
2232    type Output = Accum;
2233
2234    fn multiply(self, factor: &Diff) -> Accum {
2235        let factor = *factor;
2236        match self {
2237            Accum::Bool { trues, falses } => Accum::Bool {
2238                trues: trues * factor,
2239                falses: falses * factor,
2240            },
2241            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2242                accum: accum * AccumCount::from(factor),
2243                non_nulls: non_nulls * factor,
2244            },
2245            Accum::Float {
2246                accum,
2247                pos_infs,
2248                neg_infs,
2249                nans,
2250                non_nulls,
2251            } => Accum::Float {
2252                accum: accum
2253                    .checked_mul(AccumCount::from(factor))
2254                    .unwrap_or_else(|| {
2255                        warn!("Float accumulator overflow. Incorrect results possible");
2256                        accum.wrapping_mul(AccumCount::from(factor))
2257                    }),
2258                pos_infs: pos_infs * factor,
2259                neg_infs: neg_infs * factor,
2260                nans: nans * factor,
2261                non_nulls: non_nulls * factor,
2262            },
2263            Accum::Numeric {
2264                accum,
2265                pos_infs,
2266                neg_infs,
2267                nans,
2268                non_nulls,
2269            } => {
2270                let mut cx = numeric::cx_agg();
2271                let mut f = NumericAgg::from(factor.into_inner());
2272                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2273                // always be an integer, i.e. we are never increasing the
2274                // values' scale.
2275                cx.mul(&mut f, &accum.0);
2276                // `rounded` signals we have exceeded the aggregator's max
2277                // precision, which means we've lost commutativity and
2278                // associativity; nothing to be done here, so panic. For more
2279                // context, see the DEC_Rounded definition at
2280                // http://speleotrove.com/decimal/dncont.html
2281                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2282                Accum::Numeric {
2283                    accum: OrderedDecimal(f),
2284                    pos_infs: pos_infs * factor,
2285                    neg_infs: neg_infs * factor,
2286                    nans: nans * factor,
2287                    non_nulls: non_nulls * factor,
2288                }
2289            }
2290        }
2291    }
2292}
2293
2294impl Columnation for Accum {
2295    type InnerRegion = CopyRegion<Self>;
2296}
2297
2298/// Monoids for in-place compaction of monotonic streams.
2299mod monoids {
2300
2301    // We can improve the performance of some aggregations through the use of algebra.
2302    // In particular, we can move some of the aggregations in to the `diff` field of
2303    // updates, by changing `diff` from integers to a different algebraic structure.
2304    //
2305    // The one we use is called a "semigroup", and it means that the structure has a
2306    // symmetric addition operator. The trait we use also allows the semigroup elements
2307    // to present as "zero", meaning they always act as the identity under +. Here,
2308    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2309    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2310    // of DD, this Semigroup should _not_ have a zero.
2311    //
2312    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2313    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2314    // assumes this.
2315
2316    use differential_dataflow::containers::{Columnation, Region};
2317    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2318    use mz_expr::AggregateFunc;
2319    use mz_ore::soft_panic_or_log;
2320    use mz_repr::{Datum, Diff, Row};
2321    use serde::{Deserialize, Serialize};
2322
2323    /// A monoid containing a single-datum row.
2324    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
2325    pub enum ReductionMonoid {
2326        Min(Row),
2327        Max(Row),
2328    }
2329
2330    impl ReductionMonoid {
2331        pub fn finalize(&self) -> &Row {
2332            use ReductionMonoid::*;
2333            match self {
2334                Min(row) | Max(row) => row,
2335            }
2336        }
2337    }
2338
2339    impl Multiply<Diff> for ReductionMonoid {
2340        type Output = Self;
2341
2342        fn multiply(self, factor: &Diff) -> Self {
2343            // Multiplication in ReductionMonoid is idempotent, and
2344            // its users must ascertain its monotonicity beforehand
2345            // (typically with ensure_monotonic) since it has no zero
2346            // value for us to use here.
2347            assert!(factor.is_positive());
2348            self
2349        }
2350    }
2351
2352    impl Semigroup for ReductionMonoid {
2353        fn plus_equals(&mut self, rhs: &Self) {
2354            match (self, rhs) {
2355                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2356                    let swap = {
2357                        let lhs_val = lhs.unpack_first();
2358                        let rhs_val = rhs.unpack_first();
2359                        // Datum::Null is the identity, not a small element.
2360                        match (lhs_val, rhs_val) {
2361                            (_, Datum::Null) => false,
2362                            (Datum::Null, _) => true,
2363                            (lhs, rhs) => rhs < lhs,
2364                        }
2365                    };
2366                    if swap {
2367                        lhs.clone_from(rhs);
2368                    }
2369                }
2370                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2371                    let swap = {
2372                        let lhs_val = lhs.unpack_first();
2373                        let rhs_val = rhs.unpack_first();
2374                        // Datum::Null is the identity, not a large element.
2375                        match (lhs_val, rhs_val) {
2376                            (_, Datum::Null) => false,
2377                            (Datum::Null, _) => true,
2378                            (lhs, rhs) => rhs > lhs,
2379                        }
2380                    };
2381                    if swap {
2382                        lhs.clone_from(rhs);
2383                    }
2384                }
2385                (lhs, rhs) => {
2386                    soft_panic_or_log!(
2387                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2388                    );
2389                }
2390            }
2391        }
2392    }
2393
2394    impl IsZero for ReductionMonoid {
2395        fn is_zero(&self) -> bool {
2396            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2397            // DD uses true results of this method to make stuff disappear. This makes sense when
2398            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2399            // We don't want funny stuff, like disappearing, happening to reduction results even
2400            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2401            false
2402        }
2403    }
2404
2405    impl Columnation for ReductionMonoid {
2406        type InnerRegion = ReductionMonoidRegion;
2407    }
2408
2409    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2410    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2411    /// the former for simplicity reasons.
2412    #[derive(Default)]
2413    pub struct ReductionMonoidRegion {
2414        inner: <Row as Columnation>::InnerRegion,
2415    }
2416
2417    impl Region for ReductionMonoidRegion {
2418        type Item = ReductionMonoid;
2419
2420        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2421            use ReductionMonoid::*;
2422            match item {
2423                Min(row) => Min(unsafe { self.inner.copy(row) }),
2424                Max(row) => Max(unsafe { self.inner.copy(row) }),
2425            }
2426        }
2427
2428        fn clear(&mut self) {
2429            self.inner.clear();
2430        }
2431
2432        fn reserve_items<'a, I>(&mut self, items: I)
2433        where
2434            Self: 'a,
2435            I: Iterator<Item = &'a Self::Item> + Clone,
2436        {
2437            self.inner
2438                .reserve_items(items.map(ReductionMonoid::finalize));
2439        }
2440
2441        fn reserve_regions<'a, I>(&mut self, regions: I)
2442        where
2443            Self: 'a,
2444            I: Iterator<Item = &'a Self> + Clone,
2445        {
2446            self.inner.reserve_regions(regions.map(|r| &r.inner));
2447        }
2448
2449        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2450            self.inner.heap_size(callback);
2451        }
2452    }
2453
2454    /// Get the correct monoid implementation for a given aggregation function. Note that
2455    /// all hierarchical aggregation functions need to supply a monoid implementation.
2456    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2457        match func {
2458            AggregateFunc::MaxNumeric
2459            | AggregateFunc::MaxInt16
2460            | AggregateFunc::MaxInt32
2461            | AggregateFunc::MaxInt64
2462            | AggregateFunc::MaxUInt16
2463            | AggregateFunc::MaxUInt32
2464            | AggregateFunc::MaxUInt64
2465            | AggregateFunc::MaxMzTimestamp
2466            | AggregateFunc::MaxFloat32
2467            | AggregateFunc::MaxFloat64
2468            | AggregateFunc::MaxBool
2469            | AggregateFunc::MaxString
2470            | AggregateFunc::MaxDate
2471            | AggregateFunc::MaxTimestamp
2472            | AggregateFunc::MaxTimestampTz
2473            | AggregateFunc::MaxInterval
2474            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2475            AggregateFunc::MinNumeric
2476            | AggregateFunc::MinInt16
2477            | AggregateFunc::MinInt32
2478            | AggregateFunc::MinInt64
2479            | AggregateFunc::MinUInt16
2480            | AggregateFunc::MinUInt32
2481            | AggregateFunc::MinUInt64
2482            | AggregateFunc::MinMzTimestamp
2483            | AggregateFunc::MinFloat32
2484            | AggregateFunc::MinFloat64
2485            | AggregateFunc::MinBool
2486            | AggregateFunc::MinString
2487            | AggregateFunc::MinDate
2488            | AggregateFunc::MinTimestamp
2489            | AggregateFunc::MinTimestampTz
2490            | AggregateFunc::MinInterval
2491            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2492            AggregateFunc::SumInt16
2493            | AggregateFunc::SumInt32
2494            | AggregateFunc::SumInt64
2495            | AggregateFunc::SumUInt16
2496            | AggregateFunc::SumUInt32
2497            | AggregateFunc::SumUInt64
2498            | AggregateFunc::SumFloat32
2499            | AggregateFunc::SumFloat64
2500            | AggregateFunc::SumNumeric
2501            | AggregateFunc::Count
2502            | AggregateFunc::Any
2503            | AggregateFunc::All
2504            | AggregateFunc::Dummy
2505            | AggregateFunc::JsonbAgg { .. }
2506            | AggregateFunc::JsonbObjectAgg { .. }
2507            | AggregateFunc::MapAgg { .. }
2508            | AggregateFunc::ArrayConcat { .. }
2509            | AggregateFunc::ListConcat { .. }
2510            | AggregateFunc::StringAgg { .. }
2511            | AggregateFunc::RowNumber { .. }
2512            | AggregateFunc::Rank { .. }
2513            | AggregateFunc::DenseRank { .. }
2514            | AggregateFunc::LagLead { .. }
2515            | AggregateFunc::FirstValue { .. }
2516            | AggregateFunc::LastValue { .. }
2517            | AggregateFunc::WindowAggregate { .. }
2518            | AggregateFunc::FusedValueWindowFunc { .. }
2519            | AggregateFunc::FusedWindowAggregate { .. } => None,
2520        }
2521    }
2522}
2523
2524mod window_agg_helpers {
2525    use crate::render::reduce::*;
2526
2527    /// TODO: It would be better for performance to do the branching that is in the methods of this
2528    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2529    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2530    /// directly.
2531    pub enum OneByOneAggrImpls {
2532        Accumulable(AccumulableOneByOneAggr),
2533        Hierarchical(HierarchicalOneByOneAggr),
2534        Basic(mz_expr::NaiveOneByOneAggr),
2535    }
2536
2537    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2538        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2539            match reduction_type(agg) {
2540                ReductionType::Basic => {
2541                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2542                }
2543                ReductionType::Accumulable => {
2544                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2545                }
2546                ReductionType::Hierarchical => {
2547                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2548                }
2549            }
2550        }
2551
2552        fn give(&mut self, d: &Datum) {
2553            match self {
2554                OneByOneAggrImpls::Basic(i) => i.give(d),
2555                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2556                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2557            }
2558        }
2559
2560        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2561            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2562            match self {
2563                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2564                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2565                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2566            }
2567        }
2568    }
2569
2570    pub struct AccumulableOneByOneAggr {
2571        aggr_func: AggregateFunc,
2572        accum: Accum,
2573        total: Diff,
2574    }
2575
2576    impl AccumulableOneByOneAggr {
2577        fn new(aggr_func: &AggregateFunc) -> Self {
2578            AccumulableOneByOneAggr {
2579                aggr_func: aggr_func.clone(),
2580                accum: accumulable_zero(aggr_func),
2581                total: Diff::ZERO,
2582            }
2583        }
2584
2585        fn give(&mut self, d: &Datum) {
2586            self.accum
2587                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2588            self.total += Diff::ONE;
2589        }
2590
2591        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2592            temp_storage.make_datum(|packer| {
2593                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2594            })
2595        }
2596    }
2597
2598    pub struct HierarchicalOneByOneAggr {
2599        aggr_func: AggregateFunc,
2600        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2601        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2602        monoid: ReductionMonoid,
2603    }
2604
2605    impl HierarchicalOneByOneAggr {
2606        fn new(aggr_func: &AggregateFunc) -> Self {
2607            let mut row_buf = Row::default();
2608            row_buf.packer().push(Datum::Null);
2609            HierarchicalOneByOneAggr {
2610                aggr_func: aggr_func.clone(),
2611                monoid: get_monoid(row_buf, aggr_func)
2612                    .expect("aggr_func should be a hierarchical aggregation function"),
2613            }
2614        }
2615
2616        fn give(&mut self, d: &Datum) {
2617            let mut row_buf = Row::default();
2618            row_buf.packer().push(d);
2619            let m = get_monoid(row_buf, &self.aggr_func)
2620                .expect("aggr_func should be a hierarchical aggregation function");
2621            self.monoid.plus_equals(&m);
2622        }
2623
2624        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2625            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2626        }
2627    }
2628}