mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Data;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Collection, Diff as _};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58    ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59    RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64    G: Scope,
65    G::Timestamp: MzTimestamp + Refines<T>,
66    T: MzTimestamp,
67{
68    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
69    /// minimize worst-case incremental update times and memory footprint.
70    pub fn render_reduce(
71        &self,
72        input_key: Option<Vec<MirScalarExpr>>,
73        input: CollectionBundle<G, T>,
74        key_val_plan: KeyValPlan,
75        reduce_plan: ReducePlan,
76        mfp_after: Option<MapFilterProject>,
77    ) -> CollectionBundle<G, T> {
78        // Convert `mfp_after` to an actionable plan.
79        let mfp_after = mfp_after.map(|m| {
80            m.into_plan()
81                .expect("MFP planning must succeed")
82                .into_nontemporal()
83                .expect("Fused Reduce MFPs do not have temporal predicates")
84        });
85
86        input.scope().region_named("Reduce", |inner| {
87            let KeyValPlan {
88                mut key_plan,
89                mut val_plan,
90            } = key_val_plan;
91            let key_arity = key_plan.projection.len();
92            let mut datums = DatumVec::new();
93
94            // Determine the columns we'll need from the row.
95            let mut demand = Vec::new();
96            demand.extend(key_plan.demand());
97            demand.extend(val_plan.demand());
98            demand.sort();
99            demand.dedup();
100
101            // remap column references to the subset we use.
102            let mut demand_map = BTreeMap::new();
103            for column in demand.iter() {
104                demand_map.insert(*column, demand_map.len());
105            }
106            let demand_map_len = demand_map.len();
107            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113                input_key.map(|k| (k, None)),
114                max_demand,
115                move |row_datums, time, diff| {
116                    let mut row_builder = SharedRow::get();
117                    let temp_storage = RowArena::new();
118
119                    let mut row_iter = row_datums.drain(..);
120                    let mut datums_local = datums.borrow();
121                    // Unpack only the demanded columns.
122                    for skip in skips.iter() {
123                        datums_local.push(row_iter.nth(*skip).unwrap());
124                    }
125
126                    // Evaluate the key expressions.
127                    let key =
128                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129                    let key = match key {
130                        Err(e) => {
131                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132                        }
133                        Ok(Some(key)) => key.clone(),
134                        Ok(None) => panic!("Row expected as no predicate was used"),
135                    };
136
137                    // Evaluate the value expressions.
138                    // The prior evaluation may have left additional columns we should delete.
139                    datums_local.truncate(skips.len());
140                    let val =
141                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142                    let val = match val {
143                        Err(e) => {
144                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145                        }
146                        Ok(Some(val)) => val.clone(),
147                        Ok(None) => panic!("Row expected as no predicate was used"),
148                    };
149
150                    Some((Ok((key, val)), time.clone(), diff.clone()))
151                },
152            );
153
154            // Demux out the potential errors from key and value selector evaluation.
155            type CB<T> = ConsolidatingContainerBuilder<T>;
156            let (ok, mut err) = key_val_input
157                .as_collection()
158                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160            err = err.concat(&err_input);
161
162            // Render the reduce plan
163            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164                .leave_region()
165        })
166    }
167
168    /// Render a dataflow based on the provided plan.
169    ///
170    /// The output will be an arrangements that looks the same as if
171    /// we just had a single reduce operator computing everything together, and
172    /// this arrangement can also be re-used.
173    fn render_reduce_plan<S>(
174        &self,
175        plan: ReducePlan,
176        collection: Collection<S, (Row, Row), Diff>,
177        err_input: Collection<S, DataflowError, Diff>,
178        key_arity: usize,
179        mfp_after: Option<SafeMfpPlan>,
180    ) -> CollectionBundle<S, T>
181    where
182        S: Scope<Timestamp = G::Timestamp>,
183    {
184        let mut errors = Default::default();
185        let arrangement =
186            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188        CollectionBundle::from_columns(
189            0..key_arity,
190            ArrangementFlavor::Local(
191                arrangement,
192                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193            ),
194        )
195    }
196
197    fn render_reduce_plan_inner<S>(
198        &self,
199        plan: ReducePlan,
200        collection: Collection<S, (Row, Row), Diff>,
201        errors: &mut Vec<Collection<S, DataflowError, Diff>>,
202        key_arity: usize,
203        mfp_after: Option<SafeMfpPlan>,
204    ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205    where
206        S: Scope<Timestamp = G::Timestamp>,
207    {
208        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
209        // not only values (database-issues#6658).
210        let arrangement = match plan {
211            // If we have no aggregations or just a single type of reduction, we
212            // can go ahead and render them directly.
213            ReducePlan::Distinct => {
214                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215                errors.push(errs);
216                arranged_output
217            }
218            ReducePlan::Accumulable(expr) => {
219                let (arranged_output, errs) =
220                    self.build_accumulable(collection, expr, key_arity, mfp_after);
221                errors.push(errs);
222                arranged_output
223            }
224            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226                errors.push(errs);
227                output
228            }
229            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231                errors.push(errs);
232                output
233            }
234            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235                index,
236                expr,
237                fused_unnest_list,
238            })) => {
239                // Note that we skip validating for negative diffs when we have a fused unnest list,
240                // because this is already a CPU-intensive situation due to the non-incrementalness
241                // of window functions.
242                let validating = !fused_unnest_list;
243                let (output, errs) = self.build_basic_aggregate(
244                    collection,
245                    index,
246                    &expr,
247                    validating,
248                    key_arity,
249                    mfp_after,
250                    fused_unnest_list,
251                );
252                if validating {
253                    errors.push(errs.expect("validation should have occurred as it was requested"));
254                }
255                output
256            }
257            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
258                let (output, errs) =
259                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
260                errors.push(errs);
261                output
262            }
263            // Otherwise, we need to render something different for each type of
264            // reduction, and then stitch them together.
265            ReducePlan::Collation(expr) => {
266                // First, we need to render our constituent aggregations.
267                let mut to_collate = vec![];
268
269                for plan in [
270                    expr.hierarchical.map(ReducePlan::Hierarchical),
271                    expr.accumulable.map(ReducePlan::Accumulable),
272                    expr.basic.map(ReducePlan::Basic),
273                ]
274                .into_iter()
275                .flat_map(std::convert::identity)
276                {
277                    let r#type = ReductionType::try_from(&plan)
278                        .expect("only representable reduction types were used above");
279
280                    let arrangement = self.render_reduce_plan_inner(
281                        plan,
282                        collection.clone(),
283                        errors,
284                        key_arity,
285                        None,
286                    );
287                    to_collate.push((r#type, arrangement));
288                }
289
290                // Now we need to collate them together.
291                let (oks, errs) = self.build_collation(
292                    to_collate,
293                    expr.aggregate_types,
294                    &mut collection.scope(),
295                    mfp_after,
296                );
297                errors.push(errs);
298                oks
299            }
300        };
301        arrangement
302    }
303
304    /// Build the dataflow to combine arrangements containing results of different
305    /// aggregation types into a single arrangement.
306    ///
307    /// This computes the same thing as a join on the group key followed by shuffling
308    /// the values into the correct order. This implementation assumes that all input
309    /// arrangements present values in a way that respects the desired output order,
310    /// so we can do a linear merge to form the output.
311    fn build_collation<S>(
312        &self,
313        arrangements: Vec<(ReductionType, RowRowArrangement<S>)>,
314        aggregate_types: Vec<ReductionType>,
315        scope: &mut S,
316        mfp_after: Option<SafeMfpPlan>,
317    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
318    where
319        S: Scope<Timestamp = G::Timestamp>,
320    {
321        let error_logger = self.error_logger();
322
323        // We must have more than one arrangement to collate.
324        if arrangements.len() <= 1 {
325            error_logger.soft_panic_or_log(
326                "Incorrect number of arrangements in reduce collation",
327                &format!("len={}", arrangements.len()),
328            );
329        }
330
331        let mut to_concat = vec![];
332
333        // First, lets collect all results into a single collection.
334        for (reduction_type, arrangement) in arrangements.into_iter() {
335            let collection = arrangement
336                .as_collection(move |key, val| (key.to_row(), (reduction_type, val.to_row())));
337            to_concat.push(collection);
338        }
339
340        // For each key above, we need to have exactly as many rows as there are distinct
341        // reduction types required by `aggregate_types`. We thus prepare here a properly
342        // deduplicated version of `aggregate_types` for validation during processing below.
343        let mut distinct_aggregate_types = aggregate_types.clone();
344        distinct_aggregate_types.sort_unstable();
345        distinct_aggregate_types.dedup();
346        let n_distinct_aggregate_types = distinct_aggregate_types.len();
347
348        // Allocations for the two closures.
349        let mut datums1 = DatumVec::new();
350        let mut datums2 = DatumVec::new();
351        let mfp_after1 = mfp_after.clone();
352        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
353
354        let aggregate_types_err = aggregate_types.clone();
355        let (oks, errs) = differential_dataflow::collection::concatenate(scope, to_concat)
356            .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_,_,_>, RowValSpine<_, _, _>>("Arrange ReduceCollation")
357            .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
358                "ReduceCollation",
359                "ReduceCollation Errors",
360                {
361                    move |key, input, output| {
362                        // The inputs are pairs of a reduction type, and a row consisting of densely
363                        // packed fused aggregate values.
364                        //
365                        // We need to reconstitute the final value by:
366                        // 1. Extracting out the fused rows by type
367                        // 2. For each aggregate, figure out what type it is, and grab the relevant
368                        //    value from the corresponding fused row.
369                        // 3. Stitch all the values together into one row.
370                        let mut accumulable = DatumList::empty().iter();
371                        let mut hierarchical = DatumList::empty().iter();
372                        let mut basic = DatumList::empty().iter();
373
374                        // Note that hierarchical and basic reductions guard against negative
375                        // multiplicities, and if we only had accumulable aggregations, we would not
376                        // have produced a collation plan, so we do not repeat the check here.
377                        if input.len() != n_distinct_aggregate_types {
378                            return;
379                        }
380
381                        for (item, _) in input.iter() {
382                            let reduction_type = &item.0;
383                            let row = &item.1;
384                            match reduction_type {
385                                ReductionType::Accumulable => accumulable = row.iter(),
386                                ReductionType::Hierarchical => hierarchical = row.iter(),
387                                ReductionType::Basic => basic = row.iter(),
388                            }
389                        }
390
391                        let temp_storage = RowArena::new();
392                        let datum_iter = key.to_datum_iter();
393                        let mut datums_local = datums1.borrow();
394                        datums_local.extend(datum_iter);
395                        let key_len = datums_local.len();
396
397                        // Merge results into the order they were asked for.
398                        for typ in aggregate_types.iter() {
399                            let datum = match typ {
400                                ReductionType::Accumulable => accumulable.next(),
401                                ReductionType::Hierarchical => hierarchical.next(),
402                                ReductionType::Basic => basic.next(),
403                            };
404                            let Some(datum) = datum else { return };
405                            datums_local.push(datum);
406                        }
407
408                        // If we did not have enough values to stitch together, then we do not
409                        // generate an output row. Not outputting here corresponds to the semantics
410                        // of an equi-join on the key, similarly to the proposal in PR materialize#17013.
411                        //
412                        // Note that we also do not want to have anything left over to stich. If we
413                        // do, then we also have an error, reported elsewhere, and would violate
414                        // join semantics.
415                        if (accumulable.next(), hierarchical.next(), basic.next())
416                            == (None, None, None)
417                        {
418                            if let Some(row) = evaluate_mfp_after(
419                                &mfp_after1,
420                                &mut datums_local,
421                                &temp_storage,
422                                key_len,
423                            ) {
424                                output.push((row, Diff::ONE));
425                            }
426                        }
427                    }
428                },
429                move |key, input, output| {
430                    if input.len() != n_distinct_aggregate_types {
431                        // We expected to stitch together exactly as many aggregate types as requested
432                        // by the collation. If we cannot, we log an error and produce no output for
433                        // this key.
434                        let message = "Mismatched aggregates for key in ReduceCollation";
435                        error_logger.log(
436                            message,
437                            &format!(
438                                "key={key:?}, n_aggregates_requested={requested}, \
439                                 n_distinct_aggregate_types={n_distinct_aggregate_types}",
440                                requested = input.len(),
441                            ),
442                        );
443                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
444                        return;
445                    }
446
447                    let mut accumulable = DatumList::empty().iter();
448                    let mut hierarchical = DatumList::empty().iter();
449                    let mut basic = DatumList::empty().iter();
450                    for (item, _) in input.iter() {
451                        let reduction_type = &item.0;
452                        let row = &item.1;
453                        match reduction_type {
454                            ReductionType::Accumulable => accumulable = row.iter(),
455                            ReductionType::Hierarchical => hierarchical = row.iter(),
456                            ReductionType::Basic => basic = row.iter(),
457                        }
458                    }
459
460                    let temp_storage = RowArena::new();
461                    let datum_iter = key.to_datum_iter();
462                    let mut datums_local = datums2.borrow();
463                    datums_local.extend(datum_iter);
464
465                    for typ in aggregate_types_err.iter() {
466                        let datum = match typ {
467                            ReductionType::Accumulable => accumulable.next(),
468                            ReductionType::Hierarchical => hierarchical.next(),
469                            ReductionType::Basic => basic.next(),
470                        };
471                        if let Some(datum) = datum {
472                            datums_local.push(datum);
473                        } else {
474                            // We cannot properly reconstruct a row if aggregates are missing.
475                            // This situation is not expected, so we log an error if it occurs.
476                            let message = "Missing value for key in ReduceCollation";
477                            error_logger.log(message, &format!("typ={typ:?}, key={key:?}"));
478                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
479                            return;
480                        }
481                    }
482
483                    // Note that we also do not want to have anything left over to stich.
484                    // If we do, then we also have an error and would violate join semantics.
485                    if (accumulable.next(), hierarchical.next(), basic.next()) != (None, None, None)
486                    {
487                        let message = "Rows too large for key in ReduceCollation";
488                        error_logger.log(message, &format!("key={key:?}"));
489                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
490                    }
491
492                    // Finally, if `mfp_after` can produce errors, then we should also report
493                    // these here.
494                    let Some(mfp) = &mfp_after2 else { return };
495                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
496                        output.push((e.into(), Diff::ONE));
497                    }
498                },
499            );
500        (oks, errs.as_collection(|_, v| v.clone()))
501    }
502
503    /// Build the dataflow to compute the set of distinct keys.
504    fn build_distinct<S>(
505        &self,
506        collection: Collection<S, (Row, Row), Diff>,
507        mfp_after: Option<SafeMfpPlan>,
508    ) -> (
509        Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
510        Collection<S, DataflowError, Diff>,
511    )
512    where
513        S: Scope<Timestamp = G::Timestamp>,
514    {
515        let error_logger = self.error_logger();
516
517        // Allocations for the two closures.
518        let mut datums1 = DatumVec::new();
519        let mut datums2 = DatumVec::new();
520        let mfp_after1 = mfp_after.clone();
521        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
522
523        let (output, errors) = collection
524            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _,>, RowRowSpine<_, _>>("Arranged DistinctBy")
525            .reduce_pair::<_, RowRowBuilder<_, _,>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
526                "DistinctBy",
527                "DistinctByErrorCheck",
528                move |key, _input, output| {
529                    let temp_storage = RowArena::new();
530                    let mut datums_local = datums1.borrow();
531                    datums_local.extend(key.to_datum_iter());
532
533                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
534                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
535                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
536                    if mfp_after1
537                        .as_ref()
538                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
539                        .unwrap_or(Ok(true))
540                        == Ok(true)
541                    {
542                        // We're pushing a unit value here because the key is implicitly added by the
543                        // arrangement, and the permutation logic takes care of using the key part of the
544                        // output.
545                        output.push((Row::default(), Diff::ONE));
546                    }
547                },
548                move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
549                    for (_, count) in input.iter() {
550                        if count.is_positive() {
551                            continue;
552                        }
553                        let message = "Non-positive multiplicity in DistinctBy";
554                        error_logger.log(message, &format!("row={key:?}, count={count}"));
555                        output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
556                        return;
557                    }
558                    // If `mfp_after` can error, then evaluate it here.
559                    let Some(mfp) = &mfp_after2 else { return };
560                    let temp_storage = RowArena::new();
561                    let datum_iter = key.to_datum_iter();
562                    let mut datums_local = datums2.borrow();
563                    datums_local.extend(datum_iter);
564
565                    if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
566                        output.push((e.into(), Diff::ONE));
567                    }
568                },
569            );
570        (output, errors.as_collection(|_k, v| v.clone()))
571    }
572
573    /// Build the dataflow to compute and arrange multiple non-accumulable,
574    /// non-hierarchical aggregations on `input`.
575    ///
576    /// This function assumes that we are explicitly rendering multiple basic aggregations.
577    /// For each aggregate, we render a different reduce operator, and then fuse
578    /// results together into a final arrangement that presents all the results
579    /// in the order specified by `aggrs`.
580    fn build_basic_aggregates<S>(
581        &self,
582        input: Collection<S, (Row, Row), Diff>,
583        aggrs: Vec<(usize, AggregateExpr)>,
584        key_arity: usize,
585        mfp_after: Option<SafeMfpPlan>,
586    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
587    where
588        S: Scope<Timestamp = G::Timestamp>,
589    {
590        // We are only using this function to render multiple basic aggregates and
591        // stitch them together. If that's not true we should complain.
592        if aggrs.len() <= 1 {
593            self.error_logger().soft_panic_or_log(
594                "Too few aggregations when building basic aggregates",
595                &format!("len={}", aggrs.len()),
596            )
597        }
598        let mut err_output = None;
599        let mut to_collect = Vec::new();
600        for (index, aggr) in aggrs {
601            let (result, errs) = self.build_basic_aggregate(
602                input.clone(),
603                index,
604                &aggr,
605                err_output.is_none(),
606                key_arity,
607                None,
608                false,
609            );
610            if errs.is_some() {
611                err_output = errs
612            }
613            to_collect
614                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
615        }
616
617        // Allocations for the two closures.
618        let mut datums1 = DatumVec::new();
619        let mut datums2 = DatumVec::new();
620        let mfp_after1 = mfp_after.clone();
621        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
622
623        let arranged =
624            differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
625                .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
626                "Arranged ReduceFuseBasic input",
627            );
628
629        let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
630            "ReduceFuseBasic",
631            {
632                move |key, input, output| {
633                    let temp_storage = RowArena::new();
634                    let datum_iter = key.to_datum_iter();
635                    let mut datums_local = datums1.borrow();
636                    datums_local.extend(datum_iter);
637                    let key_len = datums_local.len();
638
639                    for ((_, row), _) in input.iter() {
640                        datums_local.push(row.unpack_first());
641                    }
642
643                    if let Some(row) =
644                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
645                    {
646                        output.push((row, Diff::ONE));
647                    }
648                }
649            },
650        );
651        // If `mfp_after` can error, then we need to render a paired reduction
652        // to scan for these potential errors. Note that we cannot directly use
653        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
654        // conditionally render the second component of the reduction pair.
655        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
656        if let Some(mfp) = mfp_after2 {
657            let mfp_errs = arranged
658                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
659                    "ReduceFuseBasic Error Check",
660                    move |key, input, output| {
661                        // Since negative accumulations are checked in at least one component
662                        // aggregate, we only need to look for MFP errors here.
663                        let temp_storage = RowArena::new();
664                        let datum_iter = key.to_datum_iter();
665                        let mut datums_local = datums2.borrow();
666                        datums_local.extend(datum_iter);
667
668                        for ((_, row), _) in input.iter() {
669                            datums_local.push(row.unpack_first());
670                        }
671
672                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
673                            output.push((e.into(), Diff::ONE));
674                        }
675                    },
676                )
677                .as_collection(|_, v| v.clone());
678            (output, validation_errs.concat(&mfp_errs))
679        } else {
680            (output, validation_errs)
681        }
682    }
683
684    /// Build the dataflow to compute a single basic aggregation.
685    ///
686    /// This method also applies distinctness if required.
687    fn build_basic_aggregate<S>(
688        &self,
689        input: Collection<S, (Row, Row), Diff>,
690        index: usize,
691        aggr: &AggregateExpr,
692        validating: bool,
693        key_arity: usize,
694        mfp_after: Option<SafeMfpPlan>,
695        fused_unnest_list: bool,
696    ) -> (
697        RowRowArrangement<S>,
698        Option<Collection<S, DataflowError, Diff>>,
699    )
700    where
701        S: Scope<Timestamp = G::Timestamp>,
702    {
703        let AggregateExpr {
704            func,
705            expr: _,
706            distinct,
707        } = aggr.clone();
708
709        // Extract the value we were asked to aggregate over.
710        let mut partial = input.map(move |(key, row)| {
711            let mut row_builder = SharedRow::get();
712            let value = row.iter().nth(index).unwrap();
713            row_builder.packer().push(value);
714            (key, row_builder.clone())
715        });
716
717        let mut err_output = None;
718
719        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
720        if distinct {
721            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
722            let pairer = Pairer::new(key_arity);
723            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
724            if validating {
725                let (oks, errs) = self
726                    .build_reduce_inaccumulable_distinct::<_,RowValBuilder<Result<(), String>, _,_>, RowValSpine<Result<(), String>, _, _>>(keyed, None)
727                    .as_collection(|k, v| (k.to_row(), v.as_ref().map(|&()| ()).map_err(|m| m.as_str().into())))
728                    .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>("Demux Errors", move |(key_val, result)| match result {
729                        Ok(()) => Ok(pairer.split(&key_val)),
730                        Err(m) => Err(EvalError::Internal(m).into()),
731                    });
732                err_output = Some(errs);
733                partial = oks;
734            } else {
735                partial = self
736                    .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
737                        keyed,
738                        Some(" [val: empty]"),
739                    )
740                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
741            }
742        }
743
744        // Allocations for the two closures.
745        let mut datums1 = DatumVec::new();
746        let mut datums2 = DatumVec::new();
747        let mut datums_key_1 = DatumVec::new();
748        let mut datums_key_2 = DatumVec::new();
749        let mfp_after1 = mfp_after.clone();
750        let func2 = func.clone();
751
752        let name = if !fused_unnest_list {
753            "ReduceInaccumulable"
754        } else {
755            "FusedReduceUnnestList"
756        };
757        let arranged = partial
758            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
759                "Arranged {name}"
760            ));
761        let oks = if !fused_unnest_list {
762            arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
763                move |key, source, target| {
764                    // We respect the multiplicity here (unlike in hierarchical aggregation)
765                    // because we don't know that the aggregation method is not sensitive
766                    // to the number of records.
767                    let iter = source.iter().flat_map(|(v, w)| {
768                        // Note that in the non-positive case, this is wrong, but harmless because
769                        // our other reduction will produce an error.
770                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
771                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
772                    });
773
774                    let temp_storage = RowArena::new();
775                    let datum_iter = key.to_datum_iter();
776                    let mut datums_local = datums1.borrow();
777                    datums_local.extend(datum_iter);
778                    let key_len = datums_local.len();
779                    datums_local.push(
780                        // Note that this is not necessarily a window aggregation, in which case
781                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
782                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
783                            iter,
784                            &temp_storage,
785                        ),
786                    );
787
788                    if let Some(row) =
789                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
790                    {
791                        target.push((row, Diff::ONE));
792                    }
793                }
794            })
795        } else {
796            arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
797                move |key, source, target| {
798                    // This part is the same as in the `!fused_unnest_list` if branch above.
799                    let iter = source.iter().flat_map(|(v, w)| {
800                        let count = usize::try_from(w.into_inner()).unwrap_or(0);
801                        std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
802                    });
803
804                    // This is the part that is specific to the `fused_unnest_list` branch.
805                    let temp_storage = RowArena::new();
806                    let mut datums_local = datums_key_1.borrow();
807                    datums_local.extend(key.to_datum_iter());
808                    let key_len = datums_local.len();
809                    for datum in func
810                        .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
811                            iter,
812                            &temp_storage,
813                        )
814                    {
815                        datums_local.truncate(key_len);
816                        datums_local.push(datum);
817                        if let Some(row) = evaluate_mfp_after(
818                            &mfp_after1,
819                            &mut datums_local,
820                            &temp_storage,
821                            key_len,
822                        ) {
823                            target.push((row, Diff::ONE));
824                        }
825                    }
826                }
827            })
828        };
829
830        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
831        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
832        // rationale around using a second reduction here.
833        let must_validate = validating && err_output.is_none();
834        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
835        if must_validate || mfp_after2.is_some() {
836            let error_logger = self.error_logger();
837
838            let errs = if !fused_unnest_list {
839                arranged
840                    .mz_reduce_abelian::<_,  RowErrBuilder<_,_>, RowErrSpine<_, _>>(
841                        &format!("{name} Error Check"),
842                        move |key, source, target| {
843                            // Negative counts would be surprising, but until we are 100% certain we won't
844                            // see them, we should report when we do. We may want to bake even more info
845                            // in here in the future.
846                            if must_validate {
847                                for (value, count) in source.iter() {
848                                    if count.is_positive() {
849                                        continue;
850                                    }
851                                    let value = value.to_row();
852                                    let message = "Non-positive accumulation in ReduceInaccumulable";
853                                    error_logger
854                                        .log(message, &format!("value={value:?}, count={count}"));
855                                    target.push((EvalError::Internal(message.into()).into(), Diff::ONE));
856                                    return;
857                                }
858                            }
859
860                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
861                            let Some(mfp) = &mfp_after2 else { return };
862                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
863                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
864                                // This would ideally use `to_datum_iter` but we cannot as it needs to
865                                // borrow `v` and only presents datums with that lifetime, not any longer.
866                                std::iter::repeat(v.next().unwrap()).take(count)
867                            });
868
869                            let temp_storage = RowArena::new();
870                            let datum_iter = key.to_datum_iter();
871                            let mut datums_local = datums2.borrow();
872                            datums_local.extend(datum_iter);
873                            datums_local.push(
874                                func2.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
875                                    iter,
876                                    &temp_storage,
877                                ),
878                            );
879                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
880                            {
881                                target.push((e.into(), Diff::ONE));
882                            }
883                        },
884                    )
885                    .as_collection(|_, v| v.clone())
886            } else {
887                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
888                assert!(!must_validate);
889                // We couldn't have got into this if branch due to `must_validate`, so it must be
890                // because of the `mfp_after2.is_some()`.
891                let Some(mfp) = mfp_after2 else {
892                    unreachable!()
893                };
894                arranged
895                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
896                        &format!("{name} Error Check"),
897                        move |key, source, target| {
898                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
899                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
900                                // This would ideally use `to_datum_iter` but we cannot as it needs to
901                                // borrow `v` and only presents datums with that lifetime, not any longer.
902                                std::iter::repeat(v.next().unwrap()).take(count)
903                            });
904
905                            let temp_storage = RowArena::new();
906                            let mut datums_local = datums_key_2.borrow();
907                            datums_local.extend(key.to_datum_iter());
908                            let key_len = datums_local.len();
909                            for datum in func2
910                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
911                                    iter,
912                                    &temp_storage,
913                                )
914                            {
915                                datums_local.truncate(key_len);
916                                datums_local.push(datum);
917                                // We know that `mfp` can error (because of the `could_error` call
918                                // above), so try to evaluate it here.
919                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
920                                {
921                                    target.push((e.into(), Diff::ONE));
922                                }
923                            }
924                        },
925                    )
926                    .as_collection(|_, v| v.clone())
927            };
928
929            if let Some(e) = err_output {
930                err_output = Some(e.concat(&errs));
931            } else {
932                err_output = Some(errs);
933            }
934        }
935        (oks, err_output)
936    }
937
938    fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
939        &self,
940        input: Collection<S, Row, Diff>,
941        name_tag: Option<&str>,
942    ) -> Arranged<S, TraceAgent<Tr>>
943    where
944        S: Scope<Timestamp = G::Timestamp>,
945        Tr: for<'a> Trace<
946                Key<'a> = DatumSeq<'a>,
947                KeyOwn = Row,
948                Time = G::Timestamp,
949                Diff = Diff,
950                ValOwn: Data + MaybeValidatingRow<(), String>,
951            > + 'static,
952        Bu: Builder<
953                Time = G::Timestamp,
954                Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
955                Output = Tr::Batch,
956            >,
957        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
958    {
959        let error_logger = self.error_logger();
960
961        let output_name = format!(
962            "ReduceInaccumulable Distinct{}",
963            name_tag.unwrap_or_default()
964        );
965
966        let input: KeyCollection<_, _, _> = input.into();
967        input
968            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
969                "Arranged ReduceInaccumulable Distinct [val: empty]",
970            )
971            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
972                if let Some(err) = Tr::ValOwn::into_error() {
973                    for (value, count) in source.iter() {
974                        if count.is_positive() {
975                            continue;
976                        }
977
978                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
979                        error_logger.log(message, &format!("value={value:?}, count={count}"));
980                        t.push((err(message.to_string()), Diff::ONE));
981                        return;
982                    }
983                }
984                t.push((Tr::ValOwn::ok(()), Diff::ONE))
985            })
986    }
987
988    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
989    /// on non-monotonic inputs.
990    ///
991    /// This function renders a single reduction tree that computes aggregations with
992    /// a priority queue implemented with a series of reduce operators that partition
993    /// the input into buckets, and compute the aggregation over very small buckets
994    /// and feed the results up to larger buckets.
995    ///
996    /// Note that this implementation currently ignores the distinct bit because we
997    /// currently only perform min / max hierarchically and the reduction tree
998    /// efficiently suppresses non-distinct updates.
999    ///
1000    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
1001    /// trickery here to limit the memory usage per layer by internally
1002    /// holding only the elements that were rejected by this stage. However, the
1003    /// output collection maintains the `((key, bucket), (passing value)` for this
1004    /// stage.
1005    fn build_bucketed<S>(
1006        &self,
1007        input: Collection<S, (Row, Row), Diff>,
1008        BucketedPlan {
1009            aggr_funcs,
1010            skips,
1011            buckets,
1012        }: BucketedPlan,
1013        key_arity: usize,
1014        mfp_after: Option<SafeMfpPlan>,
1015    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1016    where
1017        S: Scope<Timestamp = G::Timestamp>,
1018    {
1019        let mut err_output: Option<Collection<S, _, _>> = None;
1020        let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
1021            let input = input.enter(inner);
1022
1023            // The first mod to apply to the hash.
1024            let first_mod = buckets.get(0).copied().unwrap_or(1);
1025
1026            // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
1027            let mut stage = input.map(move |(key, row)| {
1028                let mut row_builder = SharedRow::get();
1029                let mut row_packer = row_builder.packer();
1030                let mut row_iter = row.iter();
1031                for skip in skips.iter() {
1032                    row_packer.push(row_iter.nth(*skip).unwrap());
1033                }
1034                let values = row_builder.clone();
1035
1036                // Apply the initial mod here.
1037                let hash = values.hashed() % first_mod;
1038                let hash_key =
1039                    row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
1040                (hash_key, values)
1041            });
1042
1043            // Repeatedly apply hierarchical reduction with a progressively coarser key.
1044            for (index, b) in buckets.into_iter().enumerate() {
1045                // Apply subsequent bucket mods for all but the first round.
1046                let input = if index == 0 {
1047                    stage
1048                } else {
1049                    stage.map(move |(hash_key, values)| {
1050                        let mut hash_key_iter = hash_key.iter();
1051                        let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
1052                        // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
1053                        let hash_key = SharedRow::pack(
1054                            std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
1055                        );
1056                        (hash_key, values)
1057                    })
1058                };
1059
1060                // We only want the first stage to perform validation of whether invalid accumulations
1061                // were observed in the input. Subsequently, we will either produce an error in the error
1062                // stream or produce correct data in the output stream.
1063                let validating = err_output.is_none();
1064
1065                let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
1066                if let Some(errs) = errs {
1067                    err_output = Some(errs.leave_region());
1068                }
1069                stage = oks
1070            }
1071
1072            // Discard the hash from the key and return to the format of the input data.
1073            let partial = stage.map(move |(hash_key, values)| {
1074                let mut hash_key_iter = hash_key.iter();
1075                let _hash = hash_key_iter.next();
1076                (SharedRow::pack(hash_key_iter.take(key_arity)), values)
1077            });
1078
1079            // Allocations for the two closures.
1080            let mut datums1 = DatumVec::new();
1081            let mut datums2 = DatumVec::new();
1082            let mfp_after1 = mfp_after.clone();
1083            let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1084            let aggr_funcs2 = aggr_funcs.clone();
1085
1086            // Build a series of stages for the reduction
1087            // Arrange the final result into (key, Row)
1088            let error_logger = self.error_logger();
1089            // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1090            // view mz_introspection.mz_expected_group_size_advice.
1091            let arranged = partial
1092                .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1093                    "Arrange ReduceMinsMaxes",
1094                );
1095            // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
1096            // but we then wouldn't be able to do this error check conditionally.  See its documentation
1097            // for the rationale around using a second reduction here.
1098            let must_validate = err_output.is_none();
1099            if must_validate || mfp_after2.is_some() {
1100                let errs = arranged
1101                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1102                        "ReduceMinsMaxes Error Check",
1103                        move |key, source, target| {
1104                            // Negative counts would be surprising, but until we are 100% certain we wont
1105                            // see them, we should report when we do. We may want to bake even more info
1106                            // in here in the future.
1107                            if must_validate {
1108                                for (val, count) in source.iter() {
1109                                    if count.is_positive() {
1110                                        continue;
1111                                    }
1112                                    let val = val.to_row();
1113                                    let message = "Non-positive accumulation in ReduceMinsMaxes";
1114                                    error_logger
1115                                        .log(message, &format!("val={val:?}, count={count}"));
1116                                    target.push((
1117                                        EvalError::Internal(message.into()).into(),
1118                                        Diff::ONE,
1119                                    ));
1120                                    return;
1121                                }
1122                            }
1123
1124                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
1125                            let Some(mfp) = &mfp_after2 else { return };
1126                            let temp_storage = RowArena::new();
1127                            let datum_iter = key.to_datum_iter();
1128                            let mut datums_local = datums2.borrow();
1129                            datums_local.extend(datum_iter);
1130
1131                            let mut source_iters = source
1132                                .iter()
1133                                .map(|(values, _cnt)| *values)
1134                                .collect::<Vec<_>>();
1135                            for func in aggr_funcs2.iter() {
1136                                let column_iter = (0..source_iters.len())
1137                                    .map(|i| source_iters[i].next().unwrap());
1138                                datums_local.push(func.eval(column_iter, &temp_storage));
1139                            }
1140                            if let Result::Err(e) =
1141                                mfp.evaluate_inner(&mut datums_local, &temp_storage)
1142                            {
1143                                target.push((e.into(), Diff::ONE));
1144                            }
1145                        },
1146                    )
1147                    .as_collection(|_, v| v.clone())
1148                    .leave_region();
1149                if let Some(e) = &err_output {
1150                    err_output = Some(e.concat(&errs));
1151                } else {
1152                    err_output = Some(errs);
1153                }
1154            }
1155            arranged
1156                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1157                    "ReduceMinsMaxes",
1158                    move |key, source, target| {
1159                        let temp_storage = RowArena::new();
1160                        let datum_iter = key.to_datum_iter();
1161                        let mut datums_local = datums1.borrow();
1162                        datums_local.extend(datum_iter);
1163                        let key_len = datums_local.len();
1164
1165                        let mut source_iters = source
1166                            .iter()
1167                            .map(|(values, _cnt)| *values)
1168                            .collect::<Vec<_>>();
1169                        for func in aggr_funcs.iter() {
1170                            let column_iter =
1171                                (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1172                            datums_local.push(func.eval(column_iter, &temp_storage));
1173                        }
1174
1175                        if let Some(row) = evaluate_mfp_after(
1176                            &mfp_after1,
1177                            &mut datums_local,
1178                            &temp_storage,
1179                            key_len,
1180                        ) {
1181                            target.push((row, Diff::ONE));
1182                        }
1183                    },
1184                )
1185                .leave_region()
1186        });
1187        (
1188            arranged_output,
1189            err_output.expect("expected to validate in one level of the hierarchy"),
1190        )
1191    }
1192
1193    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
1194    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
1195    /// with the negation of what's produced by the reduction.
1196    /// `validating` indicates whether we want this stage to perform error detection
1197    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
1198    /// stages can skip validation.
1199    fn build_bucketed_stage<S>(
1200        &self,
1201        aggr_funcs: &Vec<AggregateFunc>,
1202        input: &Collection<S, (Row, Row), Diff>,
1203        validating: bool,
1204    ) -> (
1205        Collection<S, (Row, Row), Diff>,
1206        Option<Collection<S, DataflowError, Diff>>,
1207    )
1208    where
1209        S: Scope<Timestamp = G::Timestamp>,
1210    {
1211        let (input, negated_output, errs) = if validating {
1212            let (input, reduced) = self
1213                .build_bucketed_negated_output::<_, RowValBuilder<_,_,_>, RowValSpine<Result<Row, Row>, _, _>>(
1214                    input,
1215                    aggr_funcs.clone(),
1216                );
1217            let (oks, errs) = reduced
1218                .as_collection(|k, v| (k.to_row(), v.clone()))
1219                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1220                "Checked Invalid Accumulations",
1221                |(hash_key, result)| match result {
1222                    Err(hash_key) => {
1223                        let mut hash_key_iter = hash_key.iter();
1224                        let _hash = hash_key_iter.next();
1225                        let key = SharedRow::pack(hash_key_iter);
1226                        let message = format!(
1227                            "Invalid data in source, saw non-positive accumulation \
1228                                         for key {key:?} in hierarchical mins-maxes aggregate"
1229                        );
1230                        Err(EvalError::Internal(message.into()).into())
1231                    }
1232                    Ok(values) => Ok((hash_key, values)),
1233                },
1234            );
1235            (input, oks, Some(errs))
1236        } else {
1237            let (input, reduced) = self
1238                .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1239                    input,
1240                    aggr_funcs.clone(),
1241                );
1242            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1243            // that we need to apply the mod on both input and oks.
1244            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1245            (input, oks, None)
1246        };
1247
1248        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1249        let oks = negated_output.concat(&input);
1250        (oks, errs)
1251    }
1252
1253    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1254    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1255    /// with all diffs in the reduction's output negated.
1256    fn build_bucketed_negated_output<S, Bu, Tr>(
1257        &self,
1258        input: &Collection<S, (Row, Row), Diff>,
1259        aggrs: Vec<AggregateFunc>,
1260    ) -> (
1261        Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1262        Arranged<S, TraceAgent<Tr>>,
1263    )
1264    where
1265        S: Scope<Timestamp = G::Timestamp>,
1266        Tr: for<'a> Trace<
1267                Key<'a> = DatumSeq<'a>,
1268                KeyOwn = Row,
1269                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1270                Time = G::Timestamp,
1271                Diff = Diff,
1272            > + 'static,
1273        Bu: Builder<
1274                Time = G::Timestamp,
1275                Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1276                Output = Tr::Batch,
1277            >,
1278        Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1279    {
1280        let error_logger = self.error_logger();
1281        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1282        // view mz_introspection.mz_expected_group_size_advice.
1283        let arranged_input = input
1284            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1285                "Arranged MinsMaxesHierarchical input",
1286            );
1287
1288        let reduced = arranged_input.mz_reduce_abelian::<_, Bu, Tr>(
1289            "Reduced Fallibly MinsMaxesHierarchical",
1290            move |key, source, target| {
1291                if let Some(err) = Tr::ValOwn::into_error() {
1292                    // Should negative accumulations reach us, we should loudly complain.
1293                    for (value, count) in source.iter() {
1294                        if count.is_positive() {
1295                            continue;
1296                        }
1297                        error_logger.log(
1298                            "Non-positive accumulation in MinsMaxesHierarchical",
1299                            &format!("key={key:?}, value={value:?}, count={count}"),
1300                        );
1301                        // After complaining, output an error here so that we can eventually
1302                        // report it in an error stream.
1303                        target.push((err(Tr::owned_key(key)), Diff::ONE));
1304                        return;
1305                    }
1306                }
1307
1308                let mut row_builder = SharedRow::get();
1309                let mut row_packer = row_builder.packer();
1310
1311                let mut source_iters = source
1312                    .iter()
1313                    .map(|(values, _cnt)| *values)
1314                    .collect::<Vec<_>>();
1315                for func in aggrs.iter() {
1316                    let column_iter =
1317                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1318                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1319                }
1320                // We only want to arrange the parts of the input that are not part of the output.
1321                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1322                // gives us the intended value of this aggregate function. Also we assume that regardless
1323                // of the multiplicity of the final result in the input, we only want to have one copy
1324                // in the output.
1325                target.reserve(source.len().saturating_add(1));
1326                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1327                target.extend(source.iter().map(|(values, cnt)| {
1328                    let mut cnt = *cnt;
1329                    cnt.negate();
1330                    (Tr::ValOwn::ok(values.to_row()), cnt)
1331                }));
1332            },
1333        );
1334        (arranged_input, reduced)
1335    }
1336
1337    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1338    /// on monotonic inputs.
1339    fn build_monotonic<S>(
1340        &self,
1341        collection: Collection<S, (Row, Row), Diff>,
1342        MonotonicPlan {
1343            aggr_funcs,
1344            skips,
1345            must_consolidate,
1346        }: MonotonicPlan,
1347        mfp_after: Option<SafeMfpPlan>,
1348    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1349    where
1350        S: Scope<Timestamp = G::Timestamp>,
1351    {
1352        // Gather the relevant values into a vec of rows ordered by aggregation_index
1353        let collection = collection
1354            .map(move |(key, row)| {
1355                let mut row_builder = SharedRow::get();
1356                let mut values = Vec::with_capacity(skips.len());
1357                let mut row_iter = row.iter();
1358                for skip in skips.iter() {
1359                    values.push(
1360                        row_builder.pack_using(std::iter::once(row_iter.nth(*skip).unwrap())),
1361                    );
1362                }
1363
1364                (key, values)
1365            })
1366            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1367                must_consolidate,
1368                "Consolidated ReduceMonotonic input",
1369            );
1370
1371        // It should be now possible to ensure that we have a monotonic collection.
1372        let error_logger = self.error_logger();
1373        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1374            error_logger.log(
1375                "Non-monotonic input to ReduceMonotonic",
1376                &format!("data={data:?}, diff={diff}"),
1377            );
1378            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1379            (EvalError::Internal(m).into(), Diff::ONE)
1380        });
1381        // We can place our rows directly into the diff field, and
1382        // only keep the relevant one corresponding to evaluating our
1383        // aggregate, instead of having to do a hierarchical reduction.
1384        let partial = partial.explode_one(move |(key, values)| {
1385            let mut output = Vec::new();
1386            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1387                output.push(monoids::get_monoid(row, func).expect(
1388                    "hierarchical aggregations are expected to have monoid implementations",
1389                ));
1390            }
1391            (key, output)
1392        });
1393
1394        // Allocations for the two closures.
1395        let mut datums1 = DatumVec::new();
1396        let mut datums2 = DatumVec::new();
1397        let mfp_after1 = mfp_after.clone();
1398        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1399
1400        let partial: KeyCollection<_, _, _> = partial.into();
1401        let arranged = partial
1402            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1403                "ArrangeMonotonic [val: empty]",
1404            );
1405        let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1406            "ReduceMonotonic",
1407            {
1408                move |key, input, output| {
1409                    let temp_storage = RowArena::new();
1410                    let datum_iter = key.to_datum_iter();
1411                    let mut datums_local = datums1.borrow();
1412                    datums_local.extend(datum_iter);
1413                    let key_len = datums_local.len();
1414                    let accum = &input[0].1;
1415                    for monoid in accum.iter() {
1416                        datums_local.extend(monoid.finalize().iter());
1417                    }
1418
1419                    if let Some(row) =
1420                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1421                    {
1422                        output.push((row, Diff::ONE));
1423                    }
1424                }
1425            },
1426        );
1427
1428        // If `mfp_after` can error, then we need to render a paired reduction
1429        // to scan for these potential errors. Note that we cannot directly use
1430        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1431        // conditionally render the second component of the reduction pair.
1432        if let Some(mfp) = mfp_after2 {
1433            let mfp_errs = arranged
1434                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1435                    "ReduceMonotonic Error Check",
1436                    move |key, input, output| {
1437                        let temp_storage = RowArena::new();
1438                        let datum_iter = key.to_datum_iter();
1439                        let mut datums_local = datums2.borrow();
1440                        datums_local.extend(datum_iter);
1441                        let accum = &input[0].1;
1442                        for monoid in accum.iter() {
1443                            datums_local.extend(monoid.finalize().iter());
1444                        }
1445                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1446                        {
1447                            output.push((e.into(), Diff::ONE));
1448                        }
1449                    },
1450                )
1451                .as_collection(|_k, v| v.clone());
1452            (output, validation_errs.concat(&mfp_errs))
1453        } else {
1454            (output, validation_errs)
1455        }
1456    }
1457
1458    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1459    ///
1460    /// The incoming values are moved to the update's "difference" field, at which point
1461    /// they can be accumulated in place. The `count` operator promotes the accumulated
1462    /// values to data, at which point a final map applies operator-specific logic to
1463    /// yield the final aggregate.
1464    fn build_accumulable<S>(
1465        &self,
1466        collection: Collection<S, (Row, Row), Diff>,
1467        AccumulablePlan {
1468            full_aggrs,
1469            simple_aggrs,
1470            distinct_aggrs,
1471        }: AccumulablePlan,
1472        key_arity: usize,
1473        mfp_after: Option<SafeMfpPlan>,
1474    ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1475    where
1476        S: Scope<Timestamp = G::Timestamp>,
1477    {
1478        // we must have called this function with something to reduce
1479        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1480            self.error_logger().soft_panic_or_log(
1481                "Incorrect numbers of aggregates in accummulable reduction rendering",
1482                &format!(
1483                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1484                    full_aggrs.len(),
1485                    simple_aggrs.len(),
1486                    distinct_aggrs.len(),
1487                ),
1488            );
1489        }
1490
1491        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1492        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1493        // Other aggregations can be directly moved in to the `diff` field.
1494        //
1495        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1496        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1497        // generally the count, and then two aggregation-specific values. The size could be
1498        // reduced if we want to specialize for the aggregations.
1499
1500        // Instantiate a default vector for diffs with the correct types at each
1501        // position.
1502        let zero_diffs: (Vec<_>, Diff) = (
1503            full_aggrs
1504                .iter()
1505                .map(|f| accumulable_zero(&f.func))
1506                .collect(),
1507            Diff::ZERO,
1508        );
1509
1510        let mut to_aggregate = Vec::new();
1511        if simple_aggrs.len() > 0 {
1512            // First, collect all non-distinct aggregations in one pass.
1513            let easy_cases = collection.explode_one({
1514                let zero_diffs = zero_diffs.clone();
1515                move |(key, row)| {
1516                    let mut diffs = zero_diffs.clone();
1517                    // Try to unpack only the datums we need. Unfortunately, since we
1518                    // can't random access into a Row, we have to iterate through one by one.
1519                    // TODO: Even though we don't have random access, we could still avoid unpacking
1520                    // everything that we don't care about, and it might be worth it to extend the
1521                    // Row API to do that.
1522                    let mut row_iter = row.iter().enumerate();
1523                    for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
1524                        let mut datum = row_iter.next().unwrap();
1525                        while datum_index != &datum.0 {
1526                            datum = row_iter.next().unwrap();
1527                        }
1528                        let datum = datum.1;
1529                        diffs.0[*accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1530                        diffs.1 = Diff::ONE;
1531                    }
1532                    ((key, ()), diffs)
1533                }
1534            });
1535            to_aggregate.push(easy_cases);
1536        }
1537
1538        // Next, collect all aggregations that require distinctness.
1539        for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
1540            let pairer = Pairer::new(key_arity);
1541            let collection = collection
1542                .map(move |(key, row)| {
1543                    let value = row.iter().nth(datum_index).unwrap();
1544                    (pairer.merge(&key, std::iter::once(value)), ())
1545                })
1546                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1547                    "Arranged Accumulable Distinct [val: empty]",
1548                )
1549                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1550                    "Reduced Accumulable Distinct [val: empty]",
1551                    move |_k, _s, t| t.push(((), Diff::ONE)),
1552                )
1553                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1554                .explode_one({
1555                    let zero_diffs = zero_diffs.clone();
1556                    move |(key, row)| {
1557                        let datum = row.iter().next().unwrap();
1558                        let mut diffs = zero_diffs.clone();
1559                        diffs.0[accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1560                        diffs.1 = Diff::ONE;
1561                        ((key, ()), diffs)
1562                    }
1563                });
1564            to_aggregate.push(collection);
1565        }
1566
1567        // now concatenate, if necessary, multiple aggregations
1568        let collection = if to_aggregate.len() == 1 {
1569            to_aggregate.remove(0)
1570        } else {
1571            differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1572        };
1573
1574        // Allocations for the two closures.
1575        let mut datums1 = DatumVec::new();
1576        let mut datums2 = DatumVec::new();
1577        let mfp_after1 = mfp_after.clone();
1578        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1579        let full_aggrs2 = full_aggrs.clone();
1580
1581        let error_logger = self.error_logger();
1582        let err_full_aggrs = full_aggrs.clone();
1583        let (arranged_output, arranged_errs) = collection
1584            .mz_arrange::<RowBatcher<_,_>, RowBuilder<_,_>, RowSpine<_, (Vec<Accum>, Diff)>>("ArrangeAccumulable [val: empty]")
1585            .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
1586                "ReduceAccumulable",
1587                "AccumulableErrorCheck",
1588                {
1589                    move |key, input, output| {
1590                        let (ref accums, total) = input[0].1;
1591
1592                        let temp_storage = RowArena::new();
1593                        let datum_iter = key.to_datum_iter();
1594                        let mut datums_local = datums1.borrow();
1595                        datums_local.extend(datum_iter);
1596                        let key_len = datums_local.len();
1597                        for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1598                            datums_local.push(finalize_accum(&aggr.func, accum, total));
1599                        }
1600
1601                        if let Some(row) = evaluate_mfp_after(
1602                            &mfp_after1,
1603                            &mut datums_local,
1604                            &temp_storage,
1605                            key_len,
1606                        ) {
1607                            output.push((row, Diff::ONE));
1608                        }
1609                    }
1610                },
1611                move |key, input, output| {
1612                    let (ref accums, total) = input[0].1;
1613                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1614                        // We first test here if inputs without net-positive records are present,
1615                        // producing an error to the logs and to the query output if that is the case.
1616                        if total == Diff::ZERO && !accum.is_zero() {
1617                            error_logger.log(
1618                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1619                                &format!("aggr={aggr:?}, accum={accum:?}"),
1620                            );
1621                            let key = key.to_row();
1622                            let message = format!(
1623                                "Invalid data in source, saw net-zero records for key {key} \
1624                                 with non-zero accumulation in accumulable aggregate"
1625                            );
1626                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1627                        }
1628                        match (&aggr.func, &accum) {
1629                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1630                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1631                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1632                                if accum.is_negative() {
1633                                    error_logger.log(
1634                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1635                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1636                                );
1637                                    let key = key.to_row();
1638                                    let message = format!(
1639                                        "Invalid data in source, saw negative accumulation with \
1640                                         unsigned type for key {key}"
1641                                    );
1642                                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1643                                }
1644                            }
1645                            _ => (), // no more errors to check for at this point!
1646                        }
1647                    }
1648
1649                    // If `mfp_after` can error, then evaluate it here.
1650                    let Some(mfp) = &mfp_after2 else { return };
1651                    let temp_storage = RowArena::new();
1652                    let datum_iter = key.to_datum_iter();
1653                    let mut datums_local = datums2.borrow();
1654                    datums_local.extend(datum_iter);
1655                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1656                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1657                    }
1658
1659                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1660                        output.push((e.into(), Diff::ONE));
1661                    }
1662                },
1663            );
1664        (
1665            arranged_output,
1666            arranged_errs.as_collection(|_key, error| error.clone()),
1667        )
1668    }
1669}
1670
1671/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1672/// containing key and aggregate values, then returns a result `Row` or `None`
1673/// if the MFP filters the result out.
1674fn evaluate_mfp_after<'a, 'b>(
1675    mfp_after: &'a Option<SafeMfpPlan>,
1676    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1677    temp_storage: &'a RowArena,
1678    key_len: usize,
1679) -> Option<Row> {
1680    let mut row_builder = SharedRow::get();
1681    // Apply MFP if it exists and pack a Row of
1682    // aggregate values from `datums_local`.
1683    if let Some(mfp) = mfp_after {
1684        // It must ignore errors here, but they are scanned
1685        // for elsewhere if the MFP can error.
1686        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1687            // The `mfp_after` must preserve the key columns,
1688            // so we can skip them to form aggregation results.
1689            Some(row_builder.pack_using(iter.skip(key_len)))
1690        } else {
1691            None
1692        }
1693    } else {
1694        Some(row_builder.pack_using(&datums_local[key_len..]))
1695    }
1696}
1697
1698fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1699    match aggr_func {
1700        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1701            trues: Diff::ZERO,
1702            falses: Diff::ZERO,
1703        },
1704        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1705            accum: AccumCount::ZERO,
1706            pos_infs: Diff::ZERO,
1707            neg_infs: Diff::ZERO,
1708            nans: Diff::ZERO,
1709            non_nulls: Diff::ZERO,
1710        },
1711        AggregateFunc::SumNumeric => Accum::Numeric {
1712            accum: OrderedDecimal(NumericAgg::zero()),
1713            pos_infs: Diff::ZERO,
1714            neg_infs: Diff::ZERO,
1715            nans: Diff::ZERO,
1716            non_nulls: Diff::ZERO,
1717        },
1718        _ => Accum::SimpleNumber {
1719            accum: AccumCount::ZERO,
1720            non_nulls: Diff::ZERO,
1721        },
1722    }
1723}
1724
1725static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1726
1727fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1728    match aggregate_func {
1729        AggregateFunc::Count => Accum::SimpleNumber {
1730            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1731            non_nulls: if datum.is_null() {
1732                Diff::ZERO
1733            } else {
1734                Diff::ONE
1735            },
1736        },
1737        AggregateFunc::Any | AggregateFunc::All => match datum {
1738            Datum::True => Accum::Bool {
1739                trues: Diff::ONE,
1740                falses: Diff::ZERO,
1741            },
1742            Datum::Null => Accum::Bool {
1743                trues: Diff::ZERO,
1744                falses: Diff::ZERO,
1745            },
1746            Datum::False => Accum::Bool {
1747                trues: Diff::ZERO,
1748                falses: Diff::ONE,
1749            },
1750            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1751        },
1752        AggregateFunc::Dummy => match datum {
1753            Datum::Dummy => Accum::SimpleNumber {
1754                accum: AccumCount::ZERO,
1755                non_nulls: Diff::ZERO,
1756            },
1757            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1758        },
1759        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1760            let n = match datum {
1761                Datum::Float32(n) => f64::from(*n),
1762                Datum::Float64(n) => *n,
1763                Datum::Null => 0f64,
1764                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1765            };
1766
1767            let nans = Diff::from(n.is_nan());
1768            let pos_infs = Diff::from(n == f64::INFINITY);
1769            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1770            let non_nulls = Diff::from(datum != Datum::Null);
1771
1772            // Map the floating point value onto a fixed precision domain
1773            // All special values should map to zero, since they are tracked separately
1774            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1775                AccumCount::ZERO
1776            } else {
1777                // This operation will truncate to i128::MAX if out of range.
1778                // TODO(benesch): rewrite to avoid `as`.
1779                #[allow(clippy::as_conversions)]
1780                { (n * *FLOAT_SCALE) as i128 }.into()
1781            };
1782
1783            Accum::Float {
1784                accum,
1785                pos_infs,
1786                neg_infs,
1787                nans,
1788                non_nulls,
1789            }
1790        }
1791        AggregateFunc::SumNumeric => match datum {
1792            Datum::Numeric(n) => {
1793                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1794                    if n.0.is_negative() {
1795                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1796                    } else {
1797                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1798                    }
1799                } else if n.0.is_nan() {
1800                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1801                } else {
1802                    // Take a narrow decimal (datum) into a wide decimal
1803                    // (aggregator).
1804                    let mut cx_agg = numeric::cx_agg();
1805                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1806                };
1807
1808                Accum::Numeric {
1809                    accum: OrderedDecimal(accum),
1810                    pos_infs,
1811                    neg_infs,
1812                    nans,
1813                    non_nulls: Diff::ONE,
1814                }
1815            }
1816            Datum::Null => Accum::Numeric {
1817                accum: OrderedDecimal(NumericAgg::zero()),
1818                pos_infs: Diff::ZERO,
1819                neg_infs: Diff::ZERO,
1820                nans: Diff::ZERO,
1821                non_nulls: Diff::ZERO,
1822            },
1823            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1824        },
1825        _ => {
1826            // Other accumulations need to disentangle the accumulable
1827            // value from its NULL-ness, which is not quite as easily
1828            // accumulated.
1829            match datum {
1830                Datum::Int16(i) => Accum::SimpleNumber {
1831                    accum: i.into(),
1832                    non_nulls: Diff::ONE,
1833                },
1834                Datum::Int32(i) => Accum::SimpleNumber {
1835                    accum: i.into(),
1836                    non_nulls: Diff::ONE,
1837                },
1838                Datum::Int64(i) => Accum::SimpleNumber {
1839                    accum: i.into(),
1840                    non_nulls: Diff::ONE,
1841                },
1842                Datum::UInt16(u) => Accum::SimpleNumber {
1843                    accum: u.into(),
1844                    non_nulls: Diff::ONE,
1845                },
1846                Datum::UInt32(u) => Accum::SimpleNumber {
1847                    accum: u.into(),
1848                    non_nulls: Diff::ONE,
1849                },
1850                Datum::UInt64(u) => Accum::SimpleNumber {
1851                    accum: u.into(),
1852                    non_nulls: Diff::ONE,
1853                },
1854                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1855                    accum: u64::from(t).into(),
1856                    non_nulls: Diff::ONE,
1857                },
1858                Datum::Null => Accum::SimpleNumber {
1859                    accum: AccumCount::ZERO,
1860                    non_nulls: Diff::ZERO,
1861                },
1862                x => panic!("Accumulating non-integer data: {x:?}"),
1863            }
1864        }
1865    }
1866}
1867
1868fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1869    // The finished value depends on the aggregation function in a variety of ways.
1870    // For all aggregates but count, if only null values were
1871    // accumulated, then the output is null.
1872    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1873        Datum::Null
1874    } else {
1875        match (&aggr_func, &accum) {
1876            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1877                Datum::Int64(non_nulls.into_inner())
1878            }
1879            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1880                // If any false, else if all true, else must be no false and some nulls.
1881                if falses.is_positive() {
1882                    Datum::False
1883                } else if *trues == total {
1884                    Datum::True
1885                } else {
1886                    Datum::Null
1887                }
1888            }
1889            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1890                // If any true, else if all false, else must be no true and some nulls.
1891                if trues.is_positive() {
1892                    Datum::True
1893                } else if *falses == total {
1894                    Datum::False
1895                } else {
1896                    Datum::Null
1897                }
1898            }
1899            (AggregateFunc::Dummy, _) => Datum::Dummy,
1900            // If any non-nulls, just report the aggregate.
1901            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1902            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1903                // This conversion is safe, as long as we have less than 2^32
1904                // summands.
1905                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1906                // If so, rewrite to avoid `as`.
1907                #[allow(clippy::as_conversions)]
1908                Datum::Int64(accum.into_inner() as i64)
1909            }
1910            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1911            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1912            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1913                if !accum.is_negative() {
1914                    // Our semantics of overflow are not clearly articulated wrt.
1915                    // unsigned vs. signed types (database-issues#5172). We adopt an
1916                    // unsigned wrapping behavior to match what we do above for
1917                    // signed types.
1918                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1919                    #[allow(clippy::as_conversions)]
1920                    Datum::UInt64(accum.into_inner() as u64)
1921                } else {
1922                    // Note that we return a value here, but an error in the other
1923                    // operator of the reduce_pair. Therefore, we expect that this
1924                    // value will never be exposed as an output.
1925                    Datum::Null
1926                }
1927            }
1928            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1929                if !accum.is_negative() {
1930                    Datum::from(*accum)
1931                } else {
1932                    // Note that we return a value here, but an error in the other
1933                    // operator of the reduce_pair. Therefore, we expect that this
1934                    // value will never be exposed as an output.
1935                    Datum::Null
1936                }
1937            }
1938            (
1939                AggregateFunc::SumFloat32,
1940                Accum::Float {
1941                    accum,
1942                    pos_infs,
1943                    neg_infs,
1944                    nans,
1945                    non_nulls: _,
1946                },
1947            ) => {
1948                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1949                    // NaNs are NaNs and cases where we've seen a
1950                    // mixture of positive and negative infinities.
1951                    Datum::from(f32::NAN)
1952                } else if pos_infs.is_positive() {
1953                    Datum::from(f32::INFINITY)
1954                } else if neg_infs.is_positive() {
1955                    Datum::from(f32::NEG_INFINITY)
1956                } else {
1957                    // TODO(benesch): remove potentially dangerous usage of `as`.
1958                    #[allow(clippy::as_conversions)]
1959                    {
1960                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1961                    }
1962                }
1963            }
1964            (
1965                AggregateFunc::SumFloat64,
1966                Accum::Float {
1967                    accum,
1968                    pos_infs,
1969                    neg_infs,
1970                    nans,
1971                    non_nulls: _,
1972                },
1973            ) => {
1974                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1975                    // NaNs are NaNs and cases where we've seen a
1976                    // mixture of positive and negative infinities.
1977                    Datum::from(f64::NAN)
1978                } else if pos_infs.is_positive() {
1979                    Datum::from(f64::INFINITY)
1980                } else if neg_infs.is_positive() {
1981                    Datum::from(f64::NEG_INFINITY)
1982                } else {
1983                    // TODO(benesch): remove potentially dangerous usage of `as`.
1984                    #[allow(clippy::as_conversions)]
1985                    {
1986                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1987                    }
1988                }
1989            }
1990            (
1991                AggregateFunc::SumNumeric,
1992                Accum::Numeric {
1993                    accum,
1994                    pos_infs,
1995                    neg_infs,
1996                    nans,
1997                    non_nulls: _,
1998                },
1999            ) => {
2000                let mut cx_datum = numeric::cx_datum();
2001                let d = cx_datum.to_width(accum.0);
2002                // Take a wide decimal (aggregator) into a
2003                // narrow decimal (datum). If this operation
2004                // overflows the datum, this new value will be
2005                // +/- infinity. However, the aggregator tracks
2006                // the amount of overflow, making it invertible.
2007                let inf_d = d.is_infinite();
2008                let neg_d = d.is_negative();
2009                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
2010                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
2011                if nans.is_positive() || (pos_inf && neg_inf) {
2012                    // NaNs are NaNs and cases where we've seen a
2013                    // mixture of positive and negative infinities.
2014                    Datum::from(Numeric::nan())
2015                } else if pos_inf {
2016                    Datum::from(Numeric::infinity())
2017                } else if neg_inf {
2018                    let mut cx = numeric::cx_datum();
2019                    let mut d = Numeric::infinity();
2020                    cx.neg(&mut d);
2021                    Datum::from(d)
2022                } else {
2023                    Datum::from(d)
2024                }
2025            }
2026            _ => panic!(
2027                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
2028                aggr_func
2029            ),
2030        }
2031    }
2032}
2033
2034/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
2035type AccumCount = mz_ore::Overflowing<i128>;
2036
2037/// Accumulates values for the various types of accumulable aggregations.
2038///
2039/// We assume that there are not more than 2^32 elements for the aggregation.
2040/// Thus we can perform a summation over i32 in an i64 accumulator
2041/// and not worry about exceeding its bounds.
2042///
2043/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
2044/// point representation has less precision than a double. It is entirely possible
2045/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
2046/// to preserve group guarantees.
2047#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2048enum Accum {
2049    /// Accumulates boolean values.
2050    Bool {
2051        /// The number of `true` values observed.
2052        trues: Diff,
2053        /// The number of `false` values observed.
2054        falses: Diff,
2055    },
2056    /// Accumulates simple numeric values.
2057    SimpleNumber {
2058        /// The accumulation of all non-NULL values observed.
2059        accum: AccumCount,
2060        /// The number of non-NULL values observed.
2061        non_nulls: Diff,
2062    },
2063    /// Accumulates float values.
2064    Float {
2065        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
2066        /// preserve associativity and commutativity
2067        accum: AccumCount,
2068        /// Counts +inf
2069        pos_infs: Diff,
2070        /// Counts -inf
2071        neg_infs: Diff,
2072        /// Counts NaNs
2073        nans: Diff,
2074        /// Counts non-NULL values
2075        non_nulls: Diff,
2076    },
2077    /// Accumulates arbitrary precision decimals.
2078    Numeric {
2079        /// Accumulates non-special values
2080        accum: OrderedDecimal<NumericAgg>,
2081        /// Counts +inf
2082        pos_infs: Diff,
2083        /// Counts -inf
2084        neg_infs: Diff,
2085        /// Counts NaNs
2086        nans: Diff,
2087        /// Counts non-NULL values
2088        non_nulls: Diff,
2089    },
2090}
2091
2092impl IsZero for Accum {
2093    fn is_zero(&self) -> bool {
2094        match self {
2095            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2096            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2097            Accum::Float {
2098                accum,
2099                pos_infs,
2100                neg_infs,
2101                nans,
2102                non_nulls,
2103            } => {
2104                accum.is_zero()
2105                    && pos_infs.is_zero()
2106                    && neg_infs.is_zero()
2107                    && nans.is_zero()
2108                    && non_nulls.is_zero()
2109            }
2110            Accum::Numeric {
2111                accum,
2112                pos_infs,
2113                neg_infs,
2114                nans,
2115                non_nulls,
2116            } => {
2117                accum.0.is_zero()
2118                    && pos_infs.is_zero()
2119                    && neg_infs.is_zero()
2120                    && nans.is_zero()
2121                    && non_nulls.is_zero()
2122            }
2123        }
2124    }
2125}
2126
2127impl Semigroup for Accum {
2128    fn plus_equals(&mut self, other: &Accum) {
2129        match (&mut *self, other) {
2130            (
2131                Accum::Bool { trues, falses },
2132                Accum::Bool {
2133                    trues: other_trues,
2134                    falses: other_falses,
2135                },
2136            ) => {
2137                *trues += other_trues;
2138                *falses += other_falses;
2139            }
2140            (
2141                Accum::SimpleNumber { accum, non_nulls },
2142                Accum::SimpleNumber {
2143                    accum: other_accum,
2144                    non_nulls: other_non_nulls,
2145                },
2146            ) => {
2147                *accum += other_accum;
2148                *non_nulls += other_non_nulls;
2149            }
2150            (
2151                Accum::Float {
2152                    accum,
2153                    pos_infs,
2154                    neg_infs,
2155                    nans,
2156                    non_nulls,
2157                },
2158                Accum::Float {
2159                    accum: other_accum,
2160                    pos_infs: other_pos_infs,
2161                    neg_infs: other_neg_infs,
2162                    nans: other_nans,
2163                    non_nulls: other_non_nulls,
2164                },
2165            ) => {
2166                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2167                    warn!("Float accumulator overflow. Incorrect results possible");
2168                    accum.wrapping_add(*other_accum)
2169                });
2170                *pos_infs += other_pos_infs;
2171                *neg_infs += other_neg_infs;
2172                *nans += other_nans;
2173                *non_nulls += other_non_nulls;
2174            }
2175            (
2176                Accum::Numeric {
2177                    accum,
2178                    pos_infs,
2179                    neg_infs,
2180                    nans,
2181                    non_nulls,
2182                },
2183                Accum::Numeric {
2184                    accum: other_accum,
2185                    pos_infs: other_pos_infs,
2186                    neg_infs: other_neg_infs,
2187                    nans: other_nans,
2188                    non_nulls: other_non_nulls,
2189                },
2190            ) => {
2191                let mut cx_agg = numeric::cx_agg();
2192                cx_agg.add(&mut accum.0, &other_accum.0);
2193                // `rounded` signals we have exceeded the aggregator's max
2194                // precision, which means we've lost commutativity and
2195                // associativity; nothing to be done here, so panic. For more
2196                // context, see the DEC_Rounded definition at
2197                // http://speleotrove.com/decimal/dncont.html
2198                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2199                // Reduce to reclaim unused decimal precision. Note that this
2200                // reduction must happen somewhere to make the following
2201                // invertible:
2202                // ```
2203                // CREATE TABLE a (a numeric);
2204                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2205                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2206                // ```
2207                // This will now return infinity. However, we can retract the
2208                // value that blew up its precision:
2209                // ```
2210                // INSERT INTO a VALUES ('-9e-39');
2211                // ```
2212                // This leaves `t`'s aggregator with a value of 9e39. However,
2213                // without doing a reduction, `libdecnum` will store the value
2214                // as 9e39+0e-39, which still exceeds the narrower context's
2215                // precision. By doing the reduction, we can "reclaim" the 39
2216                // digits of precision.
2217                cx_agg.reduce(&mut accum.0);
2218                *pos_infs += other_pos_infs;
2219                *neg_infs += other_neg_infs;
2220                *nans += other_nans;
2221                *non_nulls += other_non_nulls;
2222            }
2223            (l, r) => unreachable!(
2224                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2225            ),
2226        }
2227    }
2228}
2229
2230impl Multiply<Diff> for Accum {
2231    type Output = Accum;
2232
2233    fn multiply(self, factor: &Diff) -> Accum {
2234        let factor = *factor;
2235        match self {
2236            Accum::Bool { trues, falses } => Accum::Bool {
2237                trues: trues * factor,
2238                falses: falses * factor,
2239            },
2240            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2241                accum: accum * AccumCount::from(factor),
2242                non_nulls: non_nulls * factor,
2243            },
2244            Accum::Float {
2245                accum,
2246                pos_infs,
2247                neg_infs,
2248                nans,
2249                non_nulls,
2250            } => Accum::Float {
2251                accum: accum
2252                    .checked_mul(AccumCount::from(factor))
2253                    .unwrap_or_else(|| {
2254                        warn!("Float accumulator overflow. Incorrect results possible");
2255                        accum.wrapping_mul(AccumCount::from(factor))
2256                    }),
2257                pos_infs: pos_infs * factor,
2258                neg_infs: neg_infs * factor,
2259                nans: nans * factor,
2260                non_nulls: non_nulls * factor,
2261            },
2262            Accum::Numeric {
2263                accum,
2264                pos_infs,
2265                neg_infs,
2266                nans,
2267                non_nulls,
2268            } => {
2269                let mut cx = numeric::cx_agg();
2270                let mut f = NumericAgg::from(factor.into_inner());
2271                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2272                // always be an integer, i.e. we are never increasing the
2273                // values' scale.
2274                cx.mul(&mut f, &accum.0);
2275                // `rounded` signals we have exceeded the aggregator's max
2276                // precision, which means we've lost commutativity and
2277                // associativity; nothing to be done here, so panic. For more
2278                // context, see the DEC_Rounded definition at
2279                // http://speleotrove.com/decimal/dncont.html
2280                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2281                Accum::Numeric {
2282                    accum: OrderedDecimal(f),
2283                    pos_infs: pos_infs * factor,
2284                    neg_infs: neg_infs * factor,
2285                    nans: nans * factor,
2286                    non_nulls: non_nulls * factor,
2287                }
2288            }
2289        }
2290    }
2291}
2292
2293impl Columnation for Accum {
2294    type InnerRegion = CopyRegion<Self>;
2295}
2296
2297/// Monoids for in-place compaction of monotonic streams.
2298mod monoids {
2299
2300    // We can improve the performance of some aggregations through the use of algebra.
2301    // In particular, we can move some of the aggregations in to the `diff` field of
2302    // updates, by changing `diff` from integers to a different algebraic structure.
2303    //
2304    // The one we use is called a "semigroup", and it means that the structure has a
2305    // symmetric addition operator. The trait we use also allows the semigroup elements
2306    // to present as "zero", meaning they always act as the identity under +. Here,
2307    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2308    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2309    // of DD, this Semigroup should _not_ have a zero.
2310    //
2311    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2312    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2313    // assumes this.
2314
2315    use differential_dataflow::containers::{Columnation, Region};
2316    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2317    use mz_expr::AggregateFunc;
2318    use mz_ore::soft_panic_or_log;
2319    use mz_repr::{Datum, Diff, Row};
2320    use serde::{Deserialize, Serialize};
2321
2322    /// A monoid containing a single-datum row.
2323    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2324    pub enum ReductionMonoid {
2325        Min(Row),
2326        Max(Row),
2327    }
2328
2329    impl ReductionMonoid {
2330        pub fn finalize(&self) -> &Row {
2331            use ReductionMonoid::*;
2332            match self {
2333                Min(row) | Max(row) => row,
2334            }
2335        }
2336    }
2337
2338    impl Clone for ReductionMonoid {
2339        fn clone(&self) -> Self {
2340            use ReductionMonoid::*;
2341            match self {
2342                Min(row) => Min(row.clone()),
2343                Max(row) => Max(row.clone()),
2344            }
2345        }
2346
2347        fn clone_from(&mut self, source: &Self) {
2348            use ReductionMonoid::*;
2349
2350            let mut row = std::mem::take(match self {
2351                Min(row) | Max(row) => row,
2352            });
2353
2354            let source_row = match source {
2355                Min(row) | Max(row) => row,
2356            };
2357
2358            row.clone_from(source_row);
2359
2360            match source {
2361                Min(_) => *self = Min(row),
2362                Max(_) => *self = Max(row),
2363            }
2364        }
2365    }
2366
2367    impl Multiply<Diff> for ReductionMonoid {
2368        type Output = Self;
2369
2370        fn multiply(self, factor: &Diff) -> Self {
2371            // Multiplication in ReductionMonoid is idempotent, and
2372            // its users must ascertain its monotonicity beforehand
2373            // (typically with ensure_monotonic) since it has no zero
2374            // value for us to use here.
2375            assert!(factor.is_positive());
2376            self
2377        }
2378    }
2379
2380    impl Semigroup for ReductionMonoid {
2381        fn plus_equals(&mut self, rhs: &Self) {
2382            match (self, rhs) {
2383                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2384                    let swap = {
2385                        let lhs_val = lhs.unpack_first();
2386                        let rhs_val = rhs.unpack_first();
2387                        // Datum::Null is the identity, not a small element.
2388                        match (lhs_val, rhs_val) {
2389                            (_, Datum::Null) => false,
2390                            (Datum::Null, _) => true,
2391                            (lhs, rhs) => rhs < lhs,
2392                        }
2393                    };
2394                    if swap {
2395                        lhs.clone_from(rhs);
2396                    }
2397                }
2398                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2399                    let swap = {
2400                        let lhs_val = lhs.unpack_first();
2401                        let rhs_val = rhs.unpack_first();
2402                        // Datum::Null is the identity, not a large element.
2403                        match (lhs_val, rhs_val) {
2404                            (_, Datum::Null) => false,
2405                            (Datum::Null, _) => true,
2406                            (lhs, rhs) => rhs > lhs,
2407                        }
2408                    };
2409                    if swap {
2410                        lhs.clone_from(rhs);
2411                    }
2412                }
2413                (lhs, rhs) => {
2414                    soft_panic_or_log!(
2415                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2416                    );
2417                }
2418            }
2419        }
2420    }
2421
2422    impl IsZero for ReductionMonoid {
2423        fn is_zero(&self) -> bool {
2424            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2425            // DD uses true results of this method to make stuff disappear. This makes sense when
2426            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2427            // We don't want funny stuff, like disappearing, happening to reduction results even
2428            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2429            false
2430        }
2431    }
2432
2433    impl Columnation for ReductionMonoid {
2434        type InnerRegion = ReductionMonoidRegion;
2435    }
2436
2437    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2438    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2439    /// the former for simplicity reasons.
2440    #[derive(Default)]
2441    pub struct ReductionMonoidRegion {
2442        inner: <Row as Columnation>::InnerRegion,
2443    }
2444
2445    impl Region for ReductionMonoidRegion {
2446        type Item = ReductionMonoid;
2447
2448        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2449            use ReductionMonoid::*;
2450            match item {
2451                Min(row) => Min(unsafe { self.inner.copy(row) }),
2452                Max(row) => Max(unsafe { self.inner.copy(row) }),
2453            }
2454        }
2455
2456        fn clear(&mut self) {
2457            self.inner.clear();
2458        }
2459
2460        fn reserve_items<'a, I>(&mut self, items: I)
2461        where
2462            Self: 'a,
2463            I: Iterator<Item = &'a Self::Item> + Clone,
2464        {
2465            self.inner
2466                .reserve_items(items.map(ReductionMonoid::finalize));
2467        }
2468
2469        fn reserve_regions<'a, I>(&mut self, regions: I)
2470        where
2471            Self: 'a,
2472            I: Iterator<Item = &'a Self> + Clone,
2473        {
2474            self.inner.reserve_regions(regions.map(|r| &r.inner));
2475        }
2476
2477        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2478            self.inner.heap_size(callback);
2479        }
2480    }
2481
2482    /// Get the correct monoid implementation for a given aggregation function. Note that
2483    /// all hierarchical aggregation functions need to supply a monoid implementation.
2484    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2485        match func {
2486            AggregateFunc::MaxNumeric
2487            | AggregateFunc::MaxInt16
2488            | AggregateFunc::MaxInt32
2489            | AggregateFunc::MaxInt64
2490            | AggregateFunc::MaxUInt16
2491            | AggregateFunc::MaxUInt32
2492            | AggregateFunc::MaxUInt64
2493            | AggregateFunc::MaxMzTimestamp
2494            | AggregateFunc::MaxFloat32
2495            | AggregateFunc::MaxFloat64
2496            | AggregateFunc::MaxBool
2497            | AggregateFunc::MaxString
2498            | AggregateFunc::MaxDate
2499            | AggregateFunc::MaxTimestamp
2500            | AggregateFunc::MaxTimestampTz
2501            | AggregateFunc::MaxInterval
2502            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2503            AggregateFunc::MinNumeric
2504            | AggregateFunc::MinInt16
2505            | AggregateFunc::MinInt32
2506            | AggregateFunc::MinInt64
2507            | AggregateFunc::MinUInt16
2508            | AggregateFunc::MinUInt32
2509            | AggregateFunc::MinUInt64
2510            | AggregateFunc::MinMzTimestamp
2511            | AggregateFunc::MinFloat32
2512            | AggregateFunc::MinFloat64
2513            | AggregateFunc::MinBool
2514            | AggregateFunc::MinString
2515            | AggregateFunc::MinDate
2516            | AggregateFunc::MinTimestamp
2517            | AggregateFunc::MinTimestampTz
2518            | AggregateFunc::MinInterval
2519            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2520            AggregateFunc::SumInt16
2521            | AggregateFunc::SumInt32
2522            | AggregateFunc::SumInt64
2523            | AggregateFunc::SumUInt16
2524            | AggregateFunc::SumUInt32
2525            | AggregateFunc::SumUInt64
2526            | AggregateFunc::SumFloat32
2527            | AggregateFunc::SumFloat64
2528            | AggregateFunc::SumNumeric
2529            | AggregateFunc::Count
2530            | AggregateFunc::Any
2531            | AggregateFunc::All
2532            | AggregateFunc::Dummy
2533            | AggregateFunc::JsonbAgg { .. }
2534            | AggregateFunc::JsonbObjectAgg { .. }
2535            | AggregateFunc::MapAgg { .. }
2536            | AggregateFunc::ArrayConcat { .. }
2537            | AggregateFunc::ListConcat { .. }
2538            | AggregateFunc::StringAgg { .. }
2539            | AggregateFunc::RowNumber { .. }
2540            | AggregateFunc::Rank { .. }
2541            | AggregateFunc::DenseRank { .. }
2542            | AggregateFunc::LagLead { .. }
2543            | AggregateFunc::FirstValue { .. }
2544            | AggregateFunc::LastValue { .. }
2545            | AggregateFunc::WindowAggregate { .. }
2546            | AggregateFunc::FusedValueWindowFunc { .. }
2547            | AggregateFunc::FusedWindowAggregate { .. } => None,
2548        }
2549    }
2550}
2551
2552mod window_agg_helpers {
2553    use crate::render::reduce::*;
2554
2555    /// TODO: It would be better for performance to do the branching that is in the methods of this
2556    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2557    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2558    /// directly.
2559    pub enum OneByOneAggrImpls {
2560        Accumulable(AccumulableOneByOneAggr),
2561        Hierarchical(HierarchicalOneByOneAggr),
2562        Basic(mz_expr::NaiveOneByOneAggr),
2563    }
2564
2565    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2566        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2567            match reduction_type(agg) {
2568                ReductionType::Basic => {
2569                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2570                }
2571                ReductionType::Accumulable => {
2572                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2573                }
2574                ReductionType::Hierarchical => {
2575                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2576                }
2577            }
2578        }
2579
2580        fn give(&mut self, d: &Datum) {
2581            match self {
2582                OneByOneAggrImpls::Basic(i) => i.give(d),
2583                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2584                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2585            }
2586        }
2587
2588        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2589            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2590            match self {
2591                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2592                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2593                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2594            }
2595        }
2596    }
2597
2598    pub struct AccumulableOneByOneAggr {
2599        aggr_func: AggregateFunc,
2600        accum: Accum,
2601        total: Diff,
2602    }
2603
2604    impl AccumulableOneByOneAggr {
2605        fn new(aggr_func: &AggregateFunc) -> Self {
2606            AccumulableOneByOneAggr {
2607                aggr_func: aggr_func.clone(),
2608                accum: accumulable_zero(aggr_func),
2609                total: Diff::ZERO,
2610            }
2611        }
2612
2613        fn give(&mut self, d: &Datum) {
2614            self.accum
2615                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2616            self.total += Diff::ONE;
2617        }
2618
2619        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2620            temp_storage.make_datum(|packer| {
2621                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2622            })
2623        }
2624    }
2625
2626    pub struct HierarchicalOneByOneAggr {
2627        aggr_func: AggregateFunc,
2628        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2629        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2630        monoid: ReductionMonoid,
2631    }
2632
2633    impl HierarchicalOneByOneAggr {
2634        fn new(aggr_func: &AggregateFunc) -> Self {
2635            let mut row_buf = Row::default();
2636            row_buf.packer().push(Datum::Null);
2637            HierarchicalOneByOneAggr {
2638                aggr_func: aggr_func.clone(),
2639                monoid: get_monoid(row_buf, aggr_func)
2640                    .expect("aggr_func should be a hierarchical aggregation function"),
2641            }
2642        }
2643
2644        fn give(&mut self, d: &Datum) {
2645            let mut row_buf = Row::default();
2646            row_buf.packer().push(d);
2647            let m = get_monoid(row_buf, &self.aggr_func)
2648                .expect("aggr_func should be a hierarchical aggregation function");
2649            self.monoid.plus_equals(&m);
2650        }
2651
2652        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2653            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2654        }
2655    }
2656}