Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15
16use columnation::{Columnation, CopyRegion};
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
22use differential_dataflow::hashable::Hashable;
23use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
24use differential_dataflow::trace::implementations::BatchContainer;
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Data, VecCollection};
27use itertools::Itertools;
28use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
29use mz_compute_types::plan::ArrangementStrategy;
30use mz_compute_types::plan::reduce::{
31    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_ore::cast::CastLossy;
38use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
39use mz_repr::fixed_length::ExtendDatums;
40use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
41use mz_timely_util::columnation::ColumnationChunker;
42use mz_timely_util::operator::CollectionExt;
43use num_traits::Float;
44use serde::{Deserialize, Serialize};
45use timely::Container;
46use timely::container::{CapacityContainerBuilder, PushInto};
47use tracing::warn;
48
49use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
50use crate::extensions::reduce::{ClearContainer, MzReduce};
51use crate::render::context::{CollectionBundle, Context};
52use crate::render::errors::DataflowErrorSer;
53use crate::render::errors::MaybeValidatingRow;
54use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
55use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
56use crate::typedefs::{
57    ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58    RowRowSpine, RowSpine, RowValSpine,
59};
60use mz_row_spine::{
61    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
62};
63
64impl<'scope, T: RenderTimestamp> Context<'scope, T> {
65    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
66    /// minimize worst-case incremental update times and memory footprint.
67    pub fn render_reduce(
68        &self,
69        input_key: Option<Vec<MirScalarExpr>>,
70        input: CollectionBundle<'scope, T>,
71        key_val_plan: KeyValPlan,
72        reduce_plan: ReducePlan,
73        mfp_after: Option<MapFilterProject>,
74        temporal_bucketing_strategy: ArrangementStrategy,
75    ) -> CollectionBundle<'scope, T>
76    where
77        T: crate::render::MaybeBucketByTime,
78    {
79        // Convert `mfp_after` to an actionable plan.
80        let mfp_after = mfp_after.map(|m| {
81            m.into_plan()
82                .expect("MFP planning must succeed")
83                .into_nontemporal()
84                .expect("Fused Reduce MFPs do not have temporal predicates")
85        });
86
87        input.scope().region_named("Reduce", |inner| {
88            let KeyValPlan {
89                mut key_plan,
90                mut val_plan,
91            } = key_val_plan;
92            let key_arity = key_plan.projection.len();
93            let mut datums = DatumVec::new();
94
95            // Determine the columns we'll need from the row.
96            let mut demand = Vec::new();
97            demand.extend(key_plan.demand());
98            demand.extend(val_plan.demand());
99            demand.sort();
100            demand.dedup();
101
102            // remap column references to the subset we use.
103            let mut demand_map = BTreeMap::new();
104            for column in demand.iter() {
105                demand_map.insert(*column, demand_map.len());
106            }
107            let demand_map_len = demand_map.len();
108            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
110            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
111            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
112
113            let (key_val_input, err) = input
114                .enter_region(inner)
115                .flat_map::<_, ConsolidatingContainerBuilder<Vec<((Row, Row), T, Diff)>>, _>(
116                    input_key.map(|k| (k, None)),
117                    max_demand,
118                    move |row_datums, time, diff, ok_session, err_session| {
119                        let mut row_builder = SharedRow::get();
120                        let temp_storage = RowArena::new();
121
122                        let mut row_iter = row_datums.drain(..);
123                        let mut datums_local = datums.borrow();
124                        // Unpack only the demanded columns.
125                        for skip in skips.iter() {
126                            datums_local.push(row_iter.nth(*skip).unwrap());
127                        }
128
129                        // Evaluate the key expressions.
130                        let key = key_plan.evaluate_into(
131                            &mut datums_local,
132                            &temp_storage,
133                            &mut row_builder,
134                        );
135                        let key = match key {
136                            Err(e) => {
137                                err_session.give((e.into(), time, diff));
138                                return 1;
139                            }
140                            Ok(Some(key)) => key.clone(),
141                            Ok(None) => panic!("Row expected as no predicate was used"),
142                        };
143
144                        // Evaluate the value expressions.
145                        // The prior evaluation may have left additional columns we should delete.
146                        datums_local.truncate(skips.len());
147                        let val = val_plan.evaluate_into(
148                            &mut datums_local,
149                            &temp_storage,
150                            &mut row_builder,
151                        );
152                        let val = match val {
153                            Err(e) => {
154                                err_session.give((e.into(), time, diff));
155                                return 1;
156                            }
157                            Ok(Some(val)) => val.clone(),
158                            Ok(None) => panic!("Row expected as no predicate was used"),
159                        };
160
161                        ok_session.give(((key, val), time, diff));
162                        1
163                    },
164                );
165
166            // Bucket the keyed `(key, val)` stream when lowering chose `TemporalBucketing`.
167            // `Reduce` builds its own arrangement via `KeyValPlan`, bypassing
168            // `ensure_collections`, so the strategy is plumbed through `PlanNode::Reduce`
169            // rather than inferred at the arrangement site. No-op for `Direct`.
170            let key_val_collection = key_val_input.as_collection();
171            let key_val_collection = if matches!(
172                temporal_bucketing_strategy,
173                ArrangementStrategy::TemporalBucketing
174            ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
175            {
176                let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
177                    .get(&self.config_set)
178                    .try_into()
179                    .expect("must fit");
180                T::maybe_apply_temporal_bucketing(
181                    key_val_collection.inner,
182                    self.as_of_frontier.clone(),
183                    summary,
184                )
185            } else {
186                key_val_collection
187            };
188
189            // Render the reduce plan
190            self.render_reduce_plan(reduce_plan, key_val_collection, err, key_arity, mfp_after)
191                .leave_region(self.scope)
192        })
193    }
194
195    /// Render a dataflow based on the provided plan.
196    ///
197    /// The output will be an arrangements that looks the same as if
198    /// we just had a single reduce operator computing everything together, and
199    /// this arrangement can also be re-used.
200    fn render_reduce_plan<'s>(
201        &self,
202        plan: ReducePlan,
203        collection: VecCollection<'s, T, (Row, Row), Diff>,
204        err_input: VecCollection<'s, T, DataflowErrorSer, Diff>,
205        key_arity: usize,
206        mfp_after: Option<SafeMfpPlan>,
207    ) -> CollectionBundle<'s, T> {
208        let mut errors = Default::default();
209        let arrangement =
210            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
211        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
212        CollectionBundle::from_columns(
213            0..key_arity,
214            ArrangementFlavor::Local(
215                arrangement,
216                errs.mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
217                    "Arrange bundle err",
218                ),
219            ),
220        )
221    }
222
223    fn render_reduce_plan_inner<'s>(
224        &self,
225        plan: ReducePlan,
226        collection: VecCollection<'s, T, (Row, Row), Diff>,
227        errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
228        key_arity: usize,
229        mfp_after: Option<SafeMfpPlan>,
230    ) -> Arranged<'s, RowRowAgent<T, Diff>> {
231        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
232        // not only values (database-issues#6658).
233        let arrangement = match plan {
234            // If we have no aggregations or just a single type of reduction, we
235            // can go ahead and render them directly.
236            ReducePlan::Distinct => {
237                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
238                errors.push(errs);
239                arranged_output
240            }
241            ReducePlan::Accumulable(expr) => {
242                let (arranged_output, errs) =
243                    self.build_accumulable(collection, expr, key_arity, mfp_after);
244                errors.push(errs);
245                arranged_output
246            }
247            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
248                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
249                errors.push(errs);
250                output
251            }
252            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
253                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
254                errors.push(errs);
255                output
256            }
257            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
258                expr,
259                fused_unnest_list,
260            })) => {
261                // Note that we skip validating for negative diffs when we have a fused unnest list,
262                // because this is already a CPU-intensive situation due to the non-incrementalness
263                // of window functions.
264                let validating = !fused_unnest_list;
265                let (output, errs) = self.build_basic_aggregate(
266                    collection,
267                    0,
268                    &expr,
269                    validating,
270                    key_arity,
271                    mfp_after,
272                    fused_unnest_list,
273                );
274                if validating {
275                    errors.push(errs.expect("validation should have occurred as it was requested"));
276                }
277                output
278            }
279            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
280                let (output, errs) =
281                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
282                errors.push(errs);
283                output
284            }
285        };
286        arrangement
287    }
288
289    /// Build the dataflow to compute the set of distinct keys.
290    fn build_distinct<'s>(
291        &self,
292        collection: VecCollection<'s, T, (Row, Row), Diff>,
293        mfp_after: Option<SafeMfpPlan>,
294    ) -> (
295        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
296        VecCollection<'s, T, DataflowErrorSer, Diff>,
297    ) {
298        let error_logger = self.error_logger();
299
300        // Allocations for the two closures.
301        let mut datums1 = DatumVec::new();
302        let mut datums2 = DatumVec::new();
303        let mfp_after1 = mfp_after.clone();
304        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
305
306        let arranged = collection
307            .mz_arrange::<
308                ColumnationChunker<_>,
309                RowRowBatcher<_, _>,
310                RowRowBuilder<_, _>,
311                RowRowSpine<_, _>,
312            >(
313                "Arranged DistinctBy",
314            );
315        let output = arranged
316            .clone()
317            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
318                "DistinctBy",
319                move |key, _input, output| {
320                    let temp_storage = RowArena::new();
321                    let mut datums_local = datums1.borrow();
322                    key.extend_datums(&temp_storage, &mut datums_local, None);
323
324                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
325                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
326                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
327                    if mfp_after1
328                        .as_ref()
329                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
330                        .unwrap_or(Ok(true))
331                        == Ok(true)
332                    {
333                        // We're pushing a unit value here because the key is implicitly added by the
334                        // arrangement, and the permutation logic takes care of using the key part of the
335                        // output.
336                        output.push((Row::default(), Diff::ONE));
337                    }
338                },
339            );
340        let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
341            "DistinctByErrorCheck",
342            move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowErrorSer, _)>| {
343                for (_, count) in input.iter() {
344                    if count.is_positive() {
345                        continue;
346                    }
347                    let message = "Non-positive multiplicity in DistinctBy";
348                    error_logger.log(message, &format!("row={key:?}, count={count}"));
349                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
350                    return;
351                }
352                // If `mfp_after` can error, then evaluate it here.
353                let Some(mfp) = &mfp_after2 else { return };
354                let temp_storage = RowArena::new();
355                let mut datums_local = datums2.borrow();
356                key.extend_datums(&temp_storage, &mut datums_local, None);
357
358                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
359                    output.push((e.into(), Diff::ONE));
360                }
361            },
362        );
363        (output, errors.as_collection(|_k, v| v.clone()))
364    }
365
366    /// Build the dataflow to compute and arrange multiple non-accumulable,
367    /// non-hierarchical aggregations on `input`.
368    ///
369    /// This function assumes that we are explicitly rendering multiple basic aggregations.
370    /// For each aggregate, we render a different reduce operator, and then fuse
371    /// results together into a final arrangement that presents all the results
372    /// in the order specified by `aggrs`.
373    fn build_basic_aggregates<'s>(
374        &self,
375        input: VecCollection<'s, T, (Row, Row), Diff>,
376        aggrs: Vec<AggregateExpr>,
377        key_arity: usize,
378        mfp_after: Option<SafeMfpPlan>,
379    ) -> (
380        RowRowArrangement<'s, T>,
381        VecCollection<'s, T, DataflowErrorSer, Diff>,
382    ) {
383        // We are only using this function to render multiple basic aggregates and
384        // stitch them together. If that's not true we should complain.
385        if aggrs.len() <= 1 {
386            self.error_logger().soft_panic_or_log(
387                "Too few aggregations when building basic aggregates",
388                &format!("len={}", aggrs.len()),
389            )
390        }
391        let mut err_output = None;
392        let mut to_collect = Vec::new();
393        for (index, aggr) in aggrs.into_iter().enumerate() {
394            let (result, errs) = self.build_basic_aggregate(
395                input.clone(),
396                index,
397                &aggr,
398                err_output.is_none(),
399                key_arity,
400                None,
401                false,
402            );
403            if errs.is_some() {
404                err_output = errs
405            }
406            to_collect
407                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
408        }
409
410        // Allocations for the two closures.
411        let mut datums1 = DatumVec::new();
412        let mut datums2 = DatumVec::new();
413        let mfp_after1 = mfp_after.clone();
414        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
415
416        let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
417            .mz_arrange::<
418                ColumnationChunker<_>,
419                RowValBatcher<_, _, _>,
420                RowValBuilder<_, _, _>,
421                RowValSpine<_, _, _>,
422            >(
423            "Arranged ReduceFuseBasic input",
424        );
425
426        let output = arranged
427            .clone()
428            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
429                move |key, input, output| {
430                    let temp_storage = RowArena::new();
431                    let mut datums_local = datums1.borrow();
432                    key.extend_datums(&temp_storage, &mut datums_local, None);
433                    let key_len = datums_local.len();
434
435                    for ((_, row), _) in input.iter() {
436                        datums_local.push(row.unpack_first());
437                    }
438
439                    if let Some(row) =
440                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
441                    {
442                        output.push((row, Diff::ONE));
443                    }
444                }
445            });
446        // If `mfp_after` can error, then we need to render a paired reduction
447        // to scan for these potential errors. Note that we cannot directly use
448        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
449        // conditionally render the second component of the reduction pair.
450        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
451        if let Some(mfp) = mfp_after2 {
452            let mfp_errs = arranged
453                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
454                    "ReduceFuseBasic Error Check",
455                    move |key, input, output| {
456                        // Since negative accumulations are checked in at least one component
457                        // aggregate, we only need to look for MFP errors here.
458                        let temp_storage = RowArena::new();
459                        let mut datums_local = datums2.borrow();
460                        key.extend_datums(&temp_storage, &mut datums_local, None);
461
462                        for ((_, row), _) in input.iter() {
463                            datums_local.push(row.unpack_first());
464                        }
465
466                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
467                            output.push((e.into(), Diff::ONE));
468                        }
469                    },
470                )
471                .as_collection(|_, v| v.clone());
472            (output, validation_errs.concat(mfp_errs))
473        } else {
474            (output, validation_errs)
475        }
476    }
477
478    /// Build the dataflow to compute a single basic aggregation.
479    ///
480    /// This method also applies distinctness if required.
481    fn build_basic_aggregate<'s>(
482        &self,
483        input: VecCollection<'s, T, (Row, Row), Diff>,
484        index: usize,
485        aggr: &AggregateExpr,
486        validating: bool,
487        key_arity: usize,
488        mfp_after: Option<SafeMfpPlan>,
489        fused_unnest_list: bool,
490    ) -> (
491        RowRowArrangement<'s, T>,
492        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
493    ) {
494        let AggregateExpr {
495            func,
496            expr: _,
497            distinct,
498        } = aggr.clone();
499
500        // Extract the value we were asked to aggregate over.
501        let mut partial = input.map(move |(key, row)| {
502            let mut row_builder = SharedRow::get();
503            let value = row.iter().nth(index).unwrap();
504            row_builder.packer().push(value);
505            (key, row_builder.clone())
506        });
507
508        let mut err_output = None;
509
510        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
511        if distinct {
512            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
513            let pairer = Pairer::new(key_arity);
514            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
515            if validating {
516                let (oks, errs) = self
517                    .build_reduce_inaccumulable_distinct::<
518                        RowValBuilder<Result<(), String>, _, _>,
519                        RowValSpine<Result<(), String>, _, _>,
520                    >(keyed, None)
521                    .as_collection(|k, v| {
522                        (
523                            k.to_row(),
524                            v.as_ref()
525                                .map(|&()| ())
526                                .map_err(|m| m.as_str().into()),
527                        )
528                    })
529                    .map_fallible::<
530                        CapacityContainerBuilder<_>,
531                        CapacityContainerBuilder<_>,
532                        _,
533                        _,
534                        _,
535                    >(
536                        "Demux Errors",
537                        move |(key_val, result)| match result {
538                            Ok(()) => Ok(pairer.split(&key_val)),
539                            Err(m) => {
540                                Err(EvalError::Internal(m).into())
541                            }
542                        },
543                    );
544                err_output = Some(errs);
545                partial = oks;
546            } else {
547                partial = self
548                    .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
549                        keyed,
550                        Some(" [val: empty]"),
551                    )
552                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
553            }
554        }
555
556        // Allocations for the two closures.
557        let mut datums1 = DatumVec::new();
558        let mut datums2 = DatumVec::new();
559        let mut datums_key_1 = DatumVec::new();
560        let mut datums_key_2 = DatumVec::new();
561        // Scratch buffers for decoding each input value's (single) datum into the
562        // arena, so the aggregates iterate arena-resident datums rather than the
563        // packed value bytes — a prerequisite for compressed value representations.
564        let mut vals1 = DatumVec::new();
565        let mut vals2 = DatumVec::new();
566        let mut vals_key_1 = DatumVec::new();
567        let mut vals_key_2 = DatumVec::new();
568        let mfp_after1 = mfp_after.clone();
569        let func2 = func.clone();
570
571        let name = if !fused_unnest_list {
572            "ReduceInaccumulable"
573        } else {
574            "FusedReduceUnnestList"
575        };
576        let arranged = partial
577            .mz_arrange::<
578                ColumnationChunker<_>,
579                RowRowBatcher<_, _>,
580                RowRowBuilder<_, _>,
581                RowRowSpine<_, _>,
582            >(&format!(
583                "Arranged {name}"
584            ));
585        let oks = if !fused_unnest_list {
586            arranged
587                .clone()
588                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
589                    move |key, source, target| {
590                        let temp_storage = RowArena::new();
591                        // Decode each input value's single datum into the arena, reusing one
592                        // scratch buffer; the datum is `Copy` and is copied out before the
593                        // buffer is overwritten on the next row. We pass the multiplicity
594                        // through (unlike in hierarchical aggregation) because we don't know
595                        // that the aggregation method is not sensitive to the number of
596                        // records. The aggregate decides how to consume it.
597                        let mut val_scratch = vals1.borrow();
598                        let iter = source.iter().map(|(v, w)| {
599                            val_scratch.clear();
600                            v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
601                            (val_scratch[0], *w)
602                        });
603
604                        let mut datums_local = datums1.borrow();
605                        key.extend_datums(&temp_storage, &mut datums_local, None);
606                        let key_len = datums_local.len();
607                        datums_local.push(
608                        // Note that this is not necessarily a window aggregation, in which case
609                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
610                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
611                            iter,
612                            &temp_storage,
613                        ),
614                    );
615
616                        if let Some(row) = evaluate_mfp_after(
617                            &mfp_after1,
618                            &mut datums_local,
619                            &temp_storage,
620                            key_len,
621                        ) {
622                            target.push((row, Diff::ONE));
623                        }
624                    }
625                })
626        } else {
627            arranged
628                .clone()
629                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
630                    move |key, source, target| {
631                        // This part is the same as in the `!fused_unnest_list` if branch above.
632                        let temp_storage = RowArena::new();
633                        let mut val_scratch = vals_key_1.borrow();
634                        let iter = source.iter().map(|(v, w)| {
635                            val_scratch.clear();
636                            v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
637                            (val_scratch[0], *w)
638                        });
639
640                        // This is the part that is specific to the `fused_unnest_list` branch.
641                        let mut datums_local = datums_key_1.borrow();
642                        key.extend_datums(&temp_storage, &mut datums_local, None);
643                        let key_len = datums_local.len();
644                        for datum in func
645                            .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
646                                iter,
647                                &temp_storage,
648                            )
649                        {
650                            datums_local.truncate(key_len);
651                            datums_local.push(datum);
652                            if let Some(row) = evaluate_mfp_after(
653                                &mfp_after1,
654                                &mut datums_local,
655                                &temp_storage,
656                                key_len,
657                            ) {
658                                target.push((row, Diff::ONE));
659                            }
660                        }
661                    }
662                })
663        };
664
665        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
666        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
667        // rationale around using a second reduction here.
668        let must_validate = validating && err_output.is_none();
669        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
670        if must_validate || mfp_after2.is_some() {
671            let error_logger = self.error_logger();
672
673            let errs = if !fused_unnest_list {
674                arranged
675                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
676                        &format!("{name} Error Check"),
677                        move |key, source, target| {
678                            // Negative counts would be surprising, but until we are 100% certain we won't
679                            // see them, we should report when we do. We may want to bake even more info
680                            // in here in the future.
681                            if must_validate {
682                                for (value, count) in source.iter() {
683                                    if count.is_positive() {
684                                        continue;
685                                    }
686                                    let value = value.to_row();
687                                    let message =
688                                        "Non-positive accumulation in ReduceInaccumulable";
689                                    error_logger
690                                        .log(message, &format!("value={value:?}, count={count}"));
691                                    let err = EvalError::Internal(message.into());
692                                    target.push((err.into(), Diff::ONE));
693                                    return;
694                                }
695                            }
696
697                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
698                            let Some(mfp) = &mfp_after2 else { return };
699                            let temp_storage = RowArena::new();
700                            let mut val_scratch = vals2.borrow();
701                            let iter = source.iter().map(|(v, w)| {
702                                val_scratch.clear();
703                                v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
704                                (val_scratch[0], *w)
705                            });
706
707                            let mut datums_local = datums2.borrow();
708                            key.extend_datums(&temp_storage, &mut datums_local, None);
709                            datums_local.push(
710                                func2.eval_with_fast_window_agg::<
711                                    _,
712                                    window_agg_helpers::OneByOneAggrImpls,
713                                >(
714                                    iter, &temp_storage
715                                ),
716                            );
717                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
718                                target.push((e.into(), Diff::ONE));
719                            }
720                        },
721                    )
722                    .as_collection(|_, v| v.clone())
723            } else {
724                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
725                assert!(!must_validate);
726                // We couldn't have got into this if branch due to `must_validate`, so it must be
727                // because of the `mfp_after2.is_some()`.
728                let Some(mfp) = mfp_after2 else {
729                    unreachable!()
730                };
731                arranged
732                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
733                        &format!("{name} Error Check"),
734                        move |key, source, target| {
735                            let temp_storage = RowArena::new();
736                            let mut val_scratch = vals_key_2.borrow();
737                            let iter = source.iter().map(|(v, w)| {
738                                val_scratch.clear();
739                                v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
740                                (val_scratch[0], *w)
741                            });
742
743                            let mut datums_local = datums_key_2.borrow();
744                            key.extend_datums(&temp_storage, &mut datums_local, None);
745                            let key_len = datums_local.len();
746                            for datum in func2
747                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
748                                    iter,
749                                    &temp_storage,
750                                )
751                            {
752                                datums_local.truncate(key_len);
753                                datums_local.push(datum);
754                                // We know that `mfp` can error (because of the `could_error` call
755                                // above), so try to evaluate it here.
756                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
757                                {
758                                    target.push((e.into(), Diff::ONE));
759                                }
760                            }
761                        },
762                    )
763                    .as_collection(|_, v| v.clone())
764            };
765
766            if let Some(e) = err_output {
767                err_output = Some(e.concat(errs));
768            } else {
769                err_output = Some(errs);
770            }
771        }
772        (oks, err_output)
773    }
774
775    fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
776        &self,
777        input: VecCollection<'s, T, Row, Diff>,
778        name_tag: Option<&str>,
779    ) -> Arranged<'s, TraceAgent<Tr>>
780    where
781        Tr: for<'a> Trace<
782                Key<'a> = DatumSeq<'a>,
783                KeyContainer: BatchContainer<Owned = Row>,
784                Time = T,
785                Diff = Diff,
786                ValOwn: Data + MaybeValidatingRow<(), String>,
787            > + 'static,
788        Bu: Builder<
789                Time = T,
790                Input: Container
791                           + ClearContainer
792                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
793                Output = Tr::Batch,
794            >,
795        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
796    {
797        let error_logger = self.error_logger();
798
799        let output_name = format!(
800            "ReduceInaccumulable Distinct{}",
801            name_tag.unwrap_or_default()
802        );
803
804        let input: KeyCollection<_, _, _> = input.into();
805        input
806            .mz_arrange::<
807                ColumnationChunker<_>,
808                RowBatcher<_, _>,
809                RowBuilder<_, _>,
810                RowSpine<_, _>,
811            >(
812                "Arranged ReduceInaccumulable Distinct [val: empty]",
813            )
814            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
815                if let Some(err) = Tr::ValOwn::into_error() {
816                    for (value, count) in source.iter() {
817                        if count.is_positive() {
818                            continue;
819                        }
820
821                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
822                        error_logger.log(message, &format!("value={value:?}, count={count}"));
823                        t.push((err(message.to_string()), Diff::ONE));
824                        return;
825                    }
826                }
827                t.push((Tr::ValOwn::ok(()), Diff::ONE))
828            })
829    }
830
831    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
832    /// on non-monotonic inputs.
833    ///
834    /// This function renders a single reduction tree that computes aggregations with
835    /// a priority queue implemented with a series of reduce operators that partition
836    /// the input into buckets, and compute the aggregation over very small buckets
837    /// and feed the results up to larger buckets.
838    ///
839    /// Note that this implementation currently ignores the distinct bit because we
840    /// currently only perform min / max hierarchically and the reduction tree
841    /// efficiently suppresses non-distinct updates.
842    ///
843    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
844    /// trickery here to limit the memory usage per layer by internally
845    /// holding only the elements that were rejected by this stage. However, the
846    /// output collection maintains the `((key, bucket), (passing value)` for this
847    /// stage.
848    fn build_bucketed<'s>(
849        &self,
850        input: VecCollection<'s, T, (Row, Row), Diff>,
851        BucketedPlan {
852            aggr_funcs,
853            buckets,
854        }: BucketedPlan,
855        key_arity: usize,
856        mfp_after: Option<SafeMfpPlan>,
857    ) -> (
858        RowRowArrangement<'s, T>,
859        VecCollection<'s, T, DataflowErrorSer, Diff>,
860    ) {
861        let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
862        let outer_scope = input.scope();
863        let arranged_output = outer_scope
864            .clone()
865            .region_named("ReduceHierarchical", |inner| {
866                let input = input.enter(inner);
867
868                // The first mod to apply to the hash.
869                let first_mod = buckets.get(0).copied().unwrap_or(1);
870                let aggregations = aggr_funcs.len();
871
872                // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
873                let mut stage = input.map(move |(key, row)| {
874                    let mut row_builder = SharedRow::get();
875                    let mut row_packer = row_builder.packer();
876                    row_packer.extend(row.iter().take(aggregations));
877                    let values = row_builder.clone();
878
879                    // Apply the initial mod here.
880                    let hash = values.hashed() % first_mod;
881                    let hash_key =
882                        row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
883                    (hash_key, values)
884                });
885
886                // Repeatedly apply hierarchical reduction with a progressively coarser key.
887                for (index, b) in buckets.into_iter().enumerate() {
888                    // Apply subsequent bucket mods for all but the first round.
889                    let input = if index == 0 {
890                        stage
891                    } else {
892                        stage.map(move |(hash_key, values)| {
893                            let mut hash_key_iter = hash_key.iter();
894                            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
895                            // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
896                            let hash_key = SharedRow::pack(
897                                std::iter::once(Datum::from(hash))
898                                    .chain(hash_key_iter.take(key_arity)),
899                            );
900                            (hash_key, values)
901                        })
902                    };
903
904                    // We only want the first stage to perform validation of whether invalid accumulations
905                    // were observed in the input. Subsequently, we will either produce an error in the error
906                    // stream or produce correct data in the output stream.
907                    let validating = err_output.is_none();
908
909                    let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
910                    if let Some(errs) = errs {
911                        err_output = Some(errs.leave_region(outer_scope));
912                    }
913                    stage = oks
914                }
915
916                // Discard the hash from the key and return to the format of the input data.
917                let partial = stage.map(move |(hash_key, values)| {
918                    let mut hash_key_iter = hash_key.iter();
919                    let _hash = hash_key_iter.next();
920                    (SharedRow::pack(hash_key_iter.take(key_arity)), values)
921                });
922
923                // Allocations for the two closures.
924                let mut datums1 = DatumVec::new();
925                let mut datums2 = DatumVec::new();
926                // Scratch buffers for decoding the input values (one column per aggregate)
927                // into the arena, so the aggregates iterate arena-resident datums rather
928                // than the packed value bytes.
929                let mut vals1 = DatumVec::new();
930                let mut vals2 = DatumVec::new();
931                let mfp_after1 = mfp_after.clone();
932                let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
933                let aggr_funcs2 = aggr_funcs.clone();
934
935                // Build a series of stages for the reduction
936                // Arrange the final result into (key, Row)
937                let error_logger = self.error_logger();
938                // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
939                // view mz_introspection.mz_expected_group_size_advice.
940                let arranged = partial
941                    .mz_arrange::<
942                        ColumnationChunker<_>,
943                        RowRowBatcher<_, _>,
944                        RowRowBuilder<_, _>,
945                        RowRowSpine<_, _>,
946                    >(
947                        "Arrange ReduceMinsMaxes",
948                    );
949                // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
950                // but we then wouldn't be able to do this error check conditionally.  See its documentation
951                // for the rationale around using a second reduction here.
952                let must_validate = err_output.is_none();
953                if must_validate || mfp_after2.is_some() {
954                    let errs = arranged
955                        .clone()
956                        .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
957                            "ReduceMinsMaxes Error Check",
958                            move |key, source, target| {
959                                // Negative counts would be surprising, but until we are 100% certain we wont
960                                // see them, we should report when we do. We may want to bake even more info
961                                // in here in the future.
962                                if must_validate {
963                                    for (val, count) in source.iter() {
964                                        if count.is_positive() {
965                                            continue;
966                                        }
967                                        let val = val.to_row();
968                                        let message =
969                                            "Non-positive accumulation in ReduceMinsMaxes";
970                                        error_logger
971                                            .log(message, &format!("val={val:?}, count={count}"));
972                                        target.push((
973                                            EvalError::Internal(message.into()).into(),
974                                            Diff::ONE,
975                                        ));
976                                        return;
977                                    }
978                                }
979
980                                // We know that `mfp_after` can error if it exists, so try to evaluate it here.
981                                let Some(mfp) = &mfp_after2 else { return };
982                                let temp_storage = RowArena::new();
983                                let mut datums_local = datums2.borrow();
984                                key.extend_datums(&temp_storage, &mut datums_local, None);
985
986                                // Decode every value row's datums into the arena, one column
987                                // per aggregate, then iterate them column-major below. Min/max
988                                // hierarchical aggregates are multiplicity-insensitive, so each
989                                // row contributes once (`Diff::ONE`) regardless of `_cnt`.
990                                let arity = aggr_funcs2.len();
991                                let mut decoded = vals2.borrow();
992                                for (values, _cnt) in source.iter() {
993                                    values.extend_datums(&temp_storage, &mut decoded, None);
994                                }
995                                assert_eq!(decoded.len(), source.len() * arity);
996                                for (col, func) in aggr_funcs2.iter().enumerate() {
997                                    let column_iter = (0..source.len())
998                                        .map(|r| (decoded[r * arity + col], Diff::ONE));
999                                    datums_local.push(func.eval(column_iter, &temp_storage));
1000                                }
1001                                if let Result::Err(e) =
1002                                    mfp.evaluate_inner(&mut datums_local, &temp_storage)
1003                                {
1004                                    target.push((e.into(), Diff::ONE));
1005                                }
1006                            },
1007                        )
1008                        .as_collection(|_, v| v.clone())
1009                        .leave_region(outer_scope);
1010                    if let Some(e) = err_output.take() {
1011                        err_output = Some(e.concat(errs));
1012                    } else {
1013                        err_output = Some(errs);
1014                    }
1015                }
1016                arranged
1017                    .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1018                        "ReduceMinsMaxes",
1019                        move |key, source, target| {
1020                            let temp_storage = RowArena::new();
1021                            let mut datums_local = datums1.borrow();
1022                            key.extend_datums(&temp_storage, &mut datums_local, None);
1023                            let key_len = datums_local.len();
1024
1025                            // Decode every value row's datums into the arena, one column
1026                            // per aggregate, then iterate them column-major below. Min/max
1027                            // hierarchical aggregates are multiplicity-insensitive, so each
1028                            // row contributes once (`Diff::ONE`) regardless of `_cnt`.
1029                            let arity = aggr_funcs.len();
1030                            let mut decoded = vals1.borrow();
1031                            for (values, _cnt) in source.iter() {
1032                                values.extend_datums(&temp_storage, &mut decoded, None);
1033                            }
1034                            assert_eq!(decoded.len(), source.len() * arity);
1035                            for (col, func) in aggr_funcs.iter().enumerate() {
1036                                let column_iter = (0..source.len())
1037                                    .map(|r| (decoded[r * arity + col], Diff::ONE));
1038                                datums_local.push(func.eval(column_iter, &temp_storage));
1039                            }
1040
1041                            if let Some(row) = evaluate_mfp_after(
1042                                &mfp_after1,
1043                                &mut datums_local,
1044                                &temp_storage,
1045                                key_len,
1046                            ) {
1047                                target.push((row, Diff::ONE));
1048                            }
1049                        },
1050                    )
1051                    .leave_region(outer_scope)
1052            });
1053        (
1054            arranged_output,
1055            err_output.expect("expected to validate in one level of the hierarchy"),
1056        )
1057    }
1058
1059    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
1060    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
1061    /// with the negation of what's produced by the reduction.
1062    /// `validating` indicates whether we want this stage to perform error detection
1063    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
1064    /// stages can skip validation.
1065    fn build_bucketed_stage<'s>(
1066        &self,
1067        aggr_funcs: &Vec<AggregateFunc>,
1068        input: VecCollection<'s, T, (Row, Row), Diff>,
1069        validating: bool,
1070    ) -> (
1071        VecCollection<'s, T, (Row, Row), Diff>,
1072        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
1073    ) {
1074        let (input, negated_output, errs) = if validating {
1075            let (input, reduced) = self
1076                .build_bucketed_negated_output::<
1077                    RowValBuilder<_, _, _>,
1078                    RowValSpine<Result<Row, Row>, _, _>,
1079                >(
1080                    input.clone(),
1081                    aggr_funcs.clone(),
1082                );
1083            let (oks, errs) = reduced
1084                .as_collection(|k, v| (k.to_row(), v.clone()))
1085                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1086                "Checked Invalid Accumulations",
1087                |(hash_key, result)| match result {
1088                    Err(hash_key) => {
1089                        let mut hash_key_iter = hash_key.iter();
1090                        let _hash = hash_key_iter.next();
1091                        let key = SharedRow::pack(hash_key_iter);
1092                        let message = format!(
1093                            "Invalid data in source, saw non-positive accumulation \
1094                                         for key {key:?} in hierarchical mins-maxes aggregate"
1095                        );
1096                        Err(EvalError::Internal(message.into()).into())
1097                    }
1098                    Ok(values) => Ok((hash_key, values)),
1099                },
1100            );
1101            (input, oks, Some(errs))
1102        } else {
1103            let (input, reduced) = self
1104                .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1105                    input,
1106                    aggr_funcs.clone(),
1107                );
1108            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1109            // that we need to apply the mod on both input and oks.
1110            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1111            (input, oks, None)
1112        };
1113
1114        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1115        let oks = negated_output.concat(input);
1116        (oks, errs)
1117    }
1118
1119    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1120    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1121    /// with all diffs in the reduction's output negated.
1122    fn build_bucketed_negated_output<'s, Bu, Tr>(
1123        &self,
1124        input: VecCollection<'s, T, (Row, Row), Diff>,
1125        aggrs: Vec<AggregateFunc>,
1126    ) -> (
1127        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1128        Arranged<'s, TraceAgent<Tr>>,
1129    )
1130    where
1131        Tr: for<'a> Trace<
1132                Key<'a> = DatumSeq<'a>,
1133                KeyContainer: BatchContainer<Owned = Row>,
1134                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1135                Time = T,
1136                Diff = Diff,
1137            > + 'static,
1138        Bu: Builder<
1139                Time = T,
1140                Input: Container
1141                           + ClearContainer
1142                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1143                Output = Tr::Batch,
1144            >,
1145        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1146    {
1147        let error_logger = self.error_logger();
1148        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1149        // view mz_introspection.mz_expected_group_size_advice.
1150        let arranged_input = input
1151            .mz_arrange::<
1152                ColumnationChunker<_>,
1153                RowRowBatcher<_, _>,
1154                RowRowBuilder<_, _>,
1155                RowRowSpine<_, _>,
1156            >(
1157                "Arranged MinsMaxesHierarchical input",
1158            );
1159
1160        // Scratch buffer for decoding the input values (one column per aggregate) into the
1161        // arena, so the aggregates iterate arena-resident datums rather than the packed bytes.
1162        let mut value_datums = DatumVec::new();
1163        let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1164            "Reduced Fallibly MinsMaxesHierarchical",
1165            move |key, source, target| {
1166                if let Some(err) = Tr::ValOwn::into_error() {
1167                    // Should negative accumulations reach us, we should loudly complain.
1168                    for (value, count) in source.iter() {
1169                        if count.is_positive() {
1170                            continue;
1171                        }
1172                        error_logger.log(
1173                            "Non-positive accumulation in MinsMaxesHierarchical",
1174                            &format!("key={key:?}, value={value:?}, count={count}"),
1175                        );
1176                        // After complaining, output an error here so that we can eventually
1177                        // report it in an error stream.
1178                        target.push((
1179                            err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1180                            Diff::ONE,
1181                        ));
1182                        return;
1183                    }
1184                }
1185
1186                // Decode every value row's datums into the arena, one column per aggregate,
1187                // then iterate them column-major below.
1188                let temp_storage = RowArena::new();
1189                let arity = aggrs.len();
1190                let mut decoded = value_datums.borrow();
1191                for (values, _cnt) in source.iter() {
1192                    values.extend_datums(&temp_storage, &mut decoded, None);
1193                }
1194                assert_eq!(decoded.len(), source.len() * arity);
1195
1196                let mut row_builder = SharedRow::get();
1197                let mut row_packer = row_builder.packer();
1198                for (col, func) in aggrs.iter().enumerate() {
1199                    // Min/max hierarchical aggregates are multiplicity-insensitive, so each
1200                    // row contributes once (`Diff::ONE`) regardless of `_cnt`.
1201                    let column_iter =
1202                        (0..source.len()).map(|r| (decoded[r * arity + col], Diff::ONE));
1203                    row_packer.push(func.eval(column_iter, &temp_storage));
1204                }
1205                // We only want to arrange the parts of the input that are not part of the output.
1206                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1207                // gives us the intended value of this aggregate function. Also we assume that regardless
1208                // of the multiplicity of the final result in the input, we only want to have one copy
1209                // in the output.
1210                target.reserve(source.len().saturating_add(1));
1211                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1212                target.extend(source.iter().map(|(values, cnt)| {
1213                    let mut cnt = *cnt;
1214                    cnt.negate();
1215                    (Tr::ValOwn::ok(values.to_row()), cnt)
1216                }));
1217            },
1218        );
1219        (arranged_input, reduced)
1220    }
1221
1222    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1223    /// on monotonic inputs.
1224    fn build_monotonic<'s>(
1225        &self,
1226        collection: VecCollection<'s, T, (Row, Row), Diff>,
1227        MonotonicPlan {
1228            aggr_funcs,
1229            must_consolidate,
1230        }: MonotonicPlan,
1231        mfp_after: Option<SafeMfpPlan>,
1232    ) -> (
1233        RowRowArrangement<'s, T>,
1234        VecCollection<'s, T, DataflowErrorSer, Diff>,
1235    ) {
1236        let aggregations = aggr_funcs.len();
1237        // Gather the relevant values into a vec of rows ordered by aggregation_index
1238        let collection = collection
1239            .map(move |(key, row)| {
1240                let mut row_builder = SharedRow::get();
1241                let mut values = Vec::with_capacity(aggregations);
1242                values.extend(
1243                    row.iter()
1244                        .take(aggregations)
1245                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1246                );
1247
1248                (key, values)
1249            })
1250            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1251                must_consolidate,
1252                "Consolidated ReduceMonotonic input",
1253            );
1254
1255        // It should be now possible to ensure that we have a monotonic collection.
1256        let error_logger = self.error_logger();
1257        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1258            error_logger.log(
1259                "Non-monotonic input to ReduceMonotonic",
1260                &format!("data={data:?}, diff={diff}"),
1261            );
1262            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1263            (EvalError::Internal(m).into(), Diff::ONE)
1264        });
1265        // We can place our rows directly into the diff field, and
1266        // only keep the relevant one corresponding to evaluating our
1267        // aggregate, instead of having to do a hierarchical reduction.
1268        let partial = partial.explode_one(move |(key, values)| {
1269            let mut output = Vec::new();
1270            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1271                output.push(monoids::get_monoid(row, func).expect(
1272                    "hierarchical aggregations are expected to have monoid implementations",
1273                ));
1274            }
1275            (key, output)
1276        });
1277
1278        // Allocations for the two closures.
1279        let mut datums1 = DatumVec::new();
1280        let mut datums2 = DatumVec::new();
1281        let mfp_after1 = mfp_after.clone();
1282        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1283
1284        let partial: KeyCollection<_, _, _> = partial.into();
1285        let arranged = partial
1286            .mz_arrange::<
1287                ColumnationChunker<_>,
1288                RowBatcher<_, _>,
1289                RowBuilder<_, _>,
1290                RowSpine<_, Vec<ReductionMonoid>>,
1291            >(
1292                "ArrangeMonotonic [val: empty]",
1293            );
1294        let output = arranged
1295            .clone()
1296            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1297                move |key, input, output| {
1298                    let temp_storage = RowArena::new();
1299                    let mut datums_local = datums1.borrow();
1300                    key.extend_datums(&temp_storage, &mut datums_local, None);
1301                    let key_len = datums_local.len();
1302                    let accum = &input[0].1;
1303                    for monoid in accum.iter() {
1304                        datums_local.extend(monoid.finalize().iter());
1305                    }
1306
1307                    if let Some(row) =
1308                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1309                    {
1310                        output.push((row, Diff::ONE));
1311                    }
1312                }
1313            });
1314
1315        // If `mfp_after` can error, then we need to render a paired reduction
1316        // to scan for these potential errors. Note that we cannot directly use
1317        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1318        // conditionally render the second component of the reduction pair.
1319        if let Some(mfp) = mfp_after2 {
1320            let mfp_errs = arranged
1321                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1322                    "ReduceMonotonic Error Check",
1323                    move |key, input, output| {
1324                        let temp_storage = RowArena::new();
1325                        let mut datums_local = datums2.borrow();
1326                        key.extend_datums(&temp_storage, &mut datums_local, None);
1327                        let accum = &input[0].1;
1328                        for monoid in accum.iter() {
1329                            datums_local.extend(monoid.finalize().iter());
1330                        }
1331                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1332                        {
1333                            output.push((e.into(), Diff::ONE));
1334                        }
1335                    },
1336                )
1337                .as_collection(|_k, v| v.clone());
1338            (output, validation_errs.concat(mfp_errs))
1339        } else {
1340            (output, validation_errs)
1341        }
1342    }
1343
1344    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1345    ///
1346    /// The incoming values are moved to the update's "difference" field, at which point
1347    /// they can be accumulated in place. The `count` operator promotes the accumulated
1348    /// values to data, at which point a final map applies operator-specific logic to
1349    /// yield the final aggregate.
1350    fn build_accumulable<'s>(
1351        &self,
1352        collection: VecCollection<'s, T, (Row, Row), Diff>,
1353        AccumulablePlan {
1354            full_aggrs,
1355            simple_aggrs,
1356            distinct_aggrs,
1357        }: AccumulablePlan,
1358        key_arity: usize,
1359        mfp_after: Option<SafeMfpPlan>,
1360    ) -> (
1361        RowRowArrangement<'s, T>,
1362        VecCollection<'s, T, DataflowErrorSer, Diff>,
1363    ) {
1364        let collection_scope = collection.scope();
1365
1366        // we must have called this function with something to reduce
1367        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1368            self.error_logger().soft_panic_or_log(
1369                "Incorrect numbers of aggregates in accummulable reduction rendering",
1370                &format!(
1371                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1372                    full_aggrs.len(),
1373                    simple_aggrs.len(),
1374                    distinct_aggrs.len(),
1375                ),
1376            );
1377        }
1378
1379        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1380        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1381        // Other aggregations can be directly moved in to the `diff` field.
1382        //
1383        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1384        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1385        // generally the count, and then two aggregation-specific values. The size could be
1386        // reduced if we want to specialize for the aggregations.
1387
1388        // Instantiate a default vector for diffs with the correct types at each
1389        // position.
1390        let zero_diffs: (Vec<_>, Diff) = (
1391            full_aggrs
1392                .iter()
1393                .map(|f| accumulable_zero(&f.func))
1394                .collect(),
1395            Diff::ZERO,
1396        );
1397
1398        let mut to_aggregate = Vec::new();
1399        if simple_aggrs.len() > 0 {
1400            // First, collect all non-distinct aggregations in one pass.
1401            let collection = collection.clone();
1402            let easy_cases = collection.explode_one({
1403                let zero_diffs = zero_diffs.clone();
1404                move |(key, row)| {
1405                    let mut diffs = zero_diffs.clone();
1406                    // Try to unpack only the datums we need. Unfortunately, since we
1407                    // can't random access into a Row, we have to iterate through one by one.
1408                    // TODO: Even though we don't have random access, we could still avoid unpacking
1409                    // everything that we don't care about, and it might be worth it to extend the
1410                    // Row API to do that.
1411                    let mut row_iter = row.iter().enumerate();
1412                    for (datum_index, aggr) in simple_aggrs.iter() {
1413                        let mut datum = row_iter.next().unwrap();
1414                        while datum_index != &datum.0 {
1415                            datum = row_iter.next().unwrap();
1416                        }
1417                        let datum = datum.1;
1418                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1419                        diffs.1 = Diff::ONE;
1420                    }
1421                    ((key, ()), diffs)
1422                }
1423            });
1424            to_aggregate.push(easy_cases);
1425        }
1426
1427        // Next, collect all aggregations that require distinctness.
1428        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1429            let pairer = Pairer::new(key_arity);
1430            let collection = collection
1431                .clone()
1432                .map(move |(key, row)| {
1433                    let value = row.iter().nth(datum_index).unwrap();
1434                    (pairer.merge(&key, std::iter::once(value)), ())
1435                })
1436                .mz_arrange::<
1437                    ColumnationChunker<_>,
1438                    RowBatcher<_, _>,
1439                    RowBuilder<_, _>,
1440                    RowSpine<_, _>,
1441                >(
1442                    "Arranged Accumulable Distinct [val: empty]",
1443                )
1444                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1445                    "Reduced Accumulable Distinct [val: empty]",
1446                    move |_k, _s, t| t.push(((), Diff::ONE)),
1447                )
1448                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1449                .explode_one({
1450                    let zero_diffs = zero_diffs.clone();
1451                    move |(key, row)| {
1452                        let datum = row.iter().next().unwrap();
1453                        let mut diffs = zero_diffs.clone();
1454                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1455                        diffs.1 = Diff::ONE;
1456                        ((key, ()), diffs)
1457                    }
1458                });
1459            to_aggregate.push(collection);
1460        }
1461
1462        // now concatenate, if necessary, multiple aggregations
1463        let collection = if to_aggregate.len() == 1 {
1464            to_aggregate.remove(0)
1465        } else {
1466            differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1467        };
1468
1469        // Allocations for the two closures.
1470        let mut datums1 = DatumVec::new();
1471        let mut datums2 = DatumVec::new();
1472        let mfp_after1 = mfp_after.clone();
1473        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1474        let full_aggrs2 = full_aggrs.clone();
1475
1476        let error_logger = self.error_logger();
1477        let err_full_aggrs = full_aggrs.clone();
1478        let arranged = collection
1479            .mz_arrange::<
1480                ColumnationChunker<_>,
1481                RowBatcher<_, _>,
1482                RowBuilder<_, _>,
1483                RowSpine<_, (Vec<Accum>, Diff)>,
1484            >(
1485                "ArrangeAccumulable [val: empty]",
1486            );
1487        let arranged_output = arranged
1488            .clone()
1489            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1490                move |key, input, output| {
1491                    let (ref accums, total) = input[0].1;
1492
1493                    let temp_storage = RowArena::new();
1494                    let mut datums_local = datums1.borrow();
1495                    key.extend_datums(&temp_storage, &mut datums_local, None);
1496                    let key_len = datums_local.len();
1497                    for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1498                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1499                    }
1500
1501                    if let Some(row) =
1502                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1503                    {
1504                        output.push((row, Diff::ONE));
1505                    }
1506                }
1507            });
1508        let arranged_errs = arranged
1509            .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1510                "AccumulableErrorCheck",
1511                move |key, input, output| {
1512                    let (ref accums, total) = input[0].1;
1513                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1514                        // We first test here if inputs without net-positive records are present,
1515                        // producing an error to the logs and to the query output if that is the case.
1516                        if total == Diff::ZERO && !accum.is_zero() {
1517                            error_logger.log(
1518                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1519                                &format!("aggr={aggr:?}, accum={accum:?}"),
1520                            );
1521                            let key = key.to_row();
1522                            let message = format!(
1523                                "Invalid data in source, saw net-zero records for key {key} \
1524                                 with non-zero accumulation in accumulable aggregate"
1525                            );
1526                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1527                        }
1528                        match (&aggr.func, &accum) {
1529                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1530                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1531                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1532                                if accum.is_negative() {
1533                                    error_logger.log(
1534                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1535                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1536                                );
1537                                    let key = key.to_row();
1538                                    let message = format!(
1539                                        "Invalid data in source, saw negative accumulation with \
1540                                         unsigned type for key {key}"
1541                                    );
1542                                    let err = EvalError::Internal(message.into());
1543                                    output.push((err.into(), Diff::ONE));
1544                                }
1545                            }
1546                            _ => (), // no more errors to check for at this point!
1547                        }
1548                    }
1549
1550                    // If `mfp_after` can error, then evaluate it here.
1551                    let Some(mfp) = &mfp_after2 else { return };
1552                    let temp_storage = RowArena::new();
1553                    let mut datums_local = datums2.borrow();
1554                    key.extend_datums(&temp_storage, &mut datums_local, None);
1555                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1556                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1557                    }
1558
1559                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1560                        output.push((e.into(), Diff::ONE));
1561                    }
1562                },
1563            );
1564        (
1565            arranged_output,
1566            arranged_errs.as_collection(|_key, error| error.clone()),
1567        )
1568    }
1569}
1570
1571/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1572/// containing key and aggregate values, then returns a result `Row` or `None`
1573/// if the MFP filters the result out.
1574fn evaluate_mfp_after<'a, 'b>(
1575    mfp_after: &'a Option<SafeMfpPlan>,
1576    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1577    temp_storage: &'a RowArena,
1578    key_len: usize,
1579) -> Option<Row> {
1580    let mut row_builder = SharedRow::get();
1581    // Apply MFP if it exists and pack a Row of
1582    // aggregate values from `datums_local`.
1583    if let Some(mfp) = mfp_after {
1584        // It must ignore errors here, but they are scanned
1585        // for elsewhere if the MFP can error.
1586        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1587            // The `mfp_after` must preserve the key columns,
1588            // so we can skip them to form aggregation results.
1589            Some(row_builder.pack_using(iter.skip(key_len)))
1590        } else {
1591            None
1592        }
1593    } else {
1594        Some(row_builder.pack_using(&datums_local[key_len..]))
1595    }
1596}
1597
1598fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1599    match aggr_func {
1600        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1601            trues: Diff::ZERO,
1602            falses: Diff::ZERO,
1603        },
1604        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1605            accum: AccumCount::ZERO,
1606            pos_infs: Diff::ZERO,
1607            neg_infs: Diff::ZERO,
1608            nans: Diff::ZERO,
1609            non_nulls: Diff::ZERO,
1610        },
1611        AggregateFunc::SumNumeric => Accum::Numeric {
1612            accum: OrderedDecimal(NumericAgg::zero()),
1613            pos_infs: Diff::ZERO,
1614            neg_infs: Diff::ZERO,
1615            nans: Diff::ZERO,
1616            non_nulls: Diff::ZERO,
1617        },
1618        _ => Accum::SimpleNumber {
1619            accum: AccumCount::ZERO,
1620            non_nulls: Diff::ZERO,
1621        },
1622    }
1623}
1624
1625/// The number of fractional bits of binary precision retained by the
1626/// fixed-point representation used to accumulate float sums. The fixed-point
1627/// scale is `FLOAT_SCALE == 2^FLOAT_SCALE_EXP`.
1628const FLOAT_SCALE_EXP: u32 = 24;
1629
1630/// The fixed-point scale applied to float sums, i.e. `2^FLOAT_SCALE_EXP`.
1631#[allow(clippy::as_conversions)] // Integer-to-float cast, exact and const-evaluable.
1632const FLOAT_SCALE: f64 = (1_u64 << FLOAT_SCALE_EXP) as f64;
1633
1634/// Maps a finite `f64` onto the fixed-point `i128` domain used to accumulate
1635/// float sums, i.e. computes `trunc(n * FLOAT_SCALE)` reduced modulo `2^128`.
1636///
1637/// Conceptually this multiplies `n` by `FLOAT_SCALE` and truncates towards zero,
1638/// but it does so using *wrapping* (modulo `2^128`) rather than *saturating*
1639/// semantics, and it never forms the intermediate product `n * FLOAT_SCALE` as
1640/// an `f64` (which could itself overflow to infinity for very large `n`).
1641///
1642/// Wrapping is what makes this conversion a group homomorphism into the additive
1643/// group of `i128` (mod `2^128`), matching the wrapping arithmetic used when
1644/// accumulators are combined and retracted. As a result, a set of large finite
1645/// values whose *sum* is representable produces the correct result even when the
1646/// individual values fall outside the representable fixed-point range. Saturating
1647/// instead breaks this: e.g. `1.1e31` and `-1.1e31` both overflow the domain and
1648/// would saturate to `i128::MAX` and `i128::MIN`, which sum to `-1` rather than
1649/// `0` (see database-issues#11265).
1650fn float_to_fixed_point(n: f64) -> i128 {
1651    debug_assert!(n.is_finite());
1652
1653    // Decompose `n` into integer parts such that `n == sign * mantissa *
1654    // 2^exponent`. Folding in the `* 2^FLOAT_SCALE_EXP` scaling then amounts to
1655    // shifting `mantissa` left by `exponent + FLOAT_SCALE_EXP` bits.
1656    let (mantissa, exponent, sign) = Float::integer_decode(n);
1657    let significand = u128::from(mantissa);
1658    let exp = i64::from(exponent) + i64::from(FLOAT_SCALE_EXP);
1659
1660    let magnitude: u128 = if exp >= 0 {
1661        // Left shifts of 128 or more bits leave nothing within the 128-bit
1662        // window; smaller shifts keep only the low 128 bits (i.e. mod `2^128`).
1663        match u32::try_from(exp) {
1664            Ok(shift) if shift < 128 => significand << shift,
1665            _ => 0,
1666        }
1667    } else {
1668        // Right shift truncates the fractional part towards zero. Subnormals
1669        // (and zero) shift entirely out of the window and become zero.
1670        match u32::try_from(-exp) {
1671            Ok(shift) if shift < 128 => significand >> shift,
1672            _ => 0,
1673        }
1674    };
1675
1676    // Reinterpret the magnitude as a signed `i128` (wrapping into the signed
1677    // domain) and apply the sign of `n`.
1678    let magnitude = magnitude.cast_signed();
1679    if sign < 0 {
1680        magnitude.wrapping_neg()
1681    } else {
1682        magnitude
1683    }
1684}
1685
1686fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1687    match aggregate_func {
1688        AggregateFunc::Count => Accum::SimpleNumber {
1689            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1690            non_nulls: if datum.is_null() {
1691                Diff::ZERO
1692            } else {
1693                Diff::ONE
1694            },
1695        },
1696        AggregateFunc::Any | AggregateFunc::All => match datum {
1697            Datum::True => Accum::Bool {
1698                trues: Diff::ONE,
1699                falses: Diff::ZERO,
1700            },
1701            Datum::Null => Accum::Bool {
1702                trues: Diff::ZERO,
1703                falses: Diff::ZERO,
1704            },
1705            Datum::False => Accum::Bool {
1706                trues: Diff::ZERO,
1707                falses: Diff::ONE,
1708            },
1709            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1710        },
1711        AggregateFunc::Dummy => match datum {
1712            Datum::Dummy => Accum::SimpleNumber {
1713                accum: AccumCount::ZERO,
1714                non_nulls: Diff::ZERO,
1715            },
1716            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1717        },
1718        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1719            let n = match datum {
1720                Datum::Float32(n) => f64::from(*n),
1721                Datum::Float64(n) => *n,
1722                Datum::Null => 0f64,
1723                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1724            };
1725
1726            let nans = Diff::from(n.is_nan());
1727            let pos_infs = Diff::from(n == f64::INFINITY);
1728            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1729            let non_nulls = Diff::from(datum != Datum::Null);
1730
1731            // Map the floating point value onto a fixed precision domain
1732            // All special values should map to zero, since they are tracked separately
1733            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1734                AccumCount::ZERO
1735            } else {
1736                // Wrap (rather than saturate) on overflow, so that the mapping is
1737                // a group homomorphism and large finite values whose sum is in
1738                // range still produce correct results (database-issues#11265).
1739                float_to_fixed_point(n).into()
1740            };
1741
1742            Accum::Float {
1743                accum,
1744                pos_infs,
1745                neg_infs,
1746                nans,
1747                non_nulls,
1748            }
1749        }
1750        AggregateFunc::SumNumeric => match datum {
1751            Datum::Numeric(n) => {
1752                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1753                    if n.0.is_negative() {
1754                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1755                    } else {
1756                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1757                    }
1758                } else if n.0.is_nan() {
1759                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1760                } else {
1761                    // Take a narrow decimal (datum) into a wide decimal
1762                    // (aggregator).
1763                    let mut cx_agg = numeric::cx_agg();
1764                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1765                };
1766
1767                Accum::Numeric {
1768                    accum: OrderedDecimal(accum),
1769                    pos_infs,
1770                    neg_infs,
1771                    nans,
1772                    non_nulls: Diff::ONE,
1773                }
1774            }
1775            Datum::Null => Accum::Numeric {
1776                accum: OrderedDecimal(NumericAgg::zero()),
1777                pos_infs: Diff::ZERO,
1778                neg_infs: Diff::ZERO,
1779                nans: Diff::ZERO,
1780                non_nulls: Diff::ZERO,
1781            },
1782            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1783        },
1784        _ => {
1785            // Other accumulations need to disentangle the accumulable
1786            // value from its NULL-ness, which is not quite as easily
1787            // accumulated.
1788            match datum {
1789                Datum::Int16(i) => Accum::SimpleNumber {
1790                    accum: i.into(),
1791                    non_nulls: Diff::ONE,
1792                },
1793                Datum::Int32(i) => Accum::SimpleNumber {
1794                    accum: i.into(),
1795                    non_nulls: Diff::ONE,
1796                },
1797                Datum::Int64(i) => Accum::SimpleNumber {
1798                    accum: i.into(),
1799                    non_nulls: Diff::ONE,
1800                },
1801                Datum::UInt16(u) => Accum::SimpleNumber {
1802                    accum: u.into(),
1803                    non_nulls: Diff::ONE,
1804                },
1805                Datum::UInt32(u) => Accum::SimpleNumber {
1806                    accum: u.into(),
1807                    non_nulls: Diff::ONE,
1808                },
1809                Datum::UInt64(u) => Accum::SimpleNumber {
1810                    accum: u.into(),
1811                    non_nulls: Diff::ONE,
1812                },
1813                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1814                    accum: u64::from(t).into(),
1815                    non_nulls: Diff::ONE,
1816                },
1817                Datum::Null => Accum::SimpleNumber {
1818                    accum: AccumCount::ZERO,
1819                    non_nulls: Diff::ZERO,
1820                },
1821                x => panic!("Accumulating non-integer data: {x:?}"),
1822            }
1823        }
1824    }
1825}
1826
1827fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1828    // The finished value depends on the aggregation function in a variety of ways.
1829    // For all aggregates but count, if only null values were
1830    // accumulated, then the output is null.
1831    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1832        Datum::Null
1833    } else {
1834        match (&aggr_func, &accum) {
1835            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1836                Datum::Int64(non_nulls.into_inner())
1837            }
1838            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1839                // If any false, else if all true, else must be no false and some nulls.
1840                if falses.is_positive() {
1841                    Datum::False
1842                } else if *trues == total {
1843                    Datum::True
1844                } else {
1845                    Datum::Null
1846                }
1847            }
1848            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1849                // If any true, else if all false, else must be no true and some nulls.
1850                if trues.is_positive() {
1851                    Datum::True
1852                } else if *falses == total {
1853                    Datum::False
1854                } else {
1855                    Datum::Null
1856                }
1857            }
1858            (AggregateFunc::Dummy, _) => Datum::Dummy,
1859            // If any non-nulls, just report the aggregate.
1860            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1861            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1862                // This conversion is safe, as long as we have less than 2^32
1863                // summands.
1864                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1865                // If so, rewrite to avoid `as`.
1866                #[allow(clippy::as_conversions)]
1867                Datum::Int64(accum.into_inner() as i64)
1868            }
1869            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1870            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1871            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1872                if !accum.is_negative() {
1873                    // Our semantics of overflow are not clearly articulated wrt.
1874                    // unsigned vs. signed types (database-issues#5172). We adopt an
1875                    // unsigned wrapping behavior to match what we do above for
1876                    // signed types.
1877                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1878                    #[allow(clippy::as_conversions)]
1879                    Datum::UInt64(accum.into_inner() as u64)
1880                } else {
1881                    // Note that we return a value here, but an error in the other
1882                    // operator of the reduce_pair. Therefore, we expect that this
1883                    // value will never be exposed as an output.
1884                    Datum::Null
1885                }
1886            }
1887            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1888                if !accum.is_negative() {
1889                    Datum::from(*accum)
1890                } else {
1891                    // Note that we return a value here, but an error in the other
1892                    // operator of the reduce_pair. Therefore, we expect that this
1893                    // value will never be exposed as an output.
1894                    Datum::Null
1895                }
1896            }
1897            (
1898                AggregateFunc::SumFloat32,
1899                Accum::Float {
1900                    accum,
1901                    pos_infs,
1902                    neg_infs,
1903                    nans,
1904                    non_nulls: _,
1905                },
1906            ) => {
1907                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1908                    // NaNs are NaNs and cases where we've seen a
1909                    // mixture of positive and negative infinities.
1910                    Datum::from(f32::NAN)
1911                } else if pos_infs.is_positive() {
1912                    Datum::from(f32::INFINITY)
1913                } else if neg_infs.is_positive() {
1914                    Datum::from(f32::NEG_INFINITY)
1915                } else {
1916                    let sum = f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE;
1917                    Datum::from(f32::cast_lossy(sum))
1918                }
1919            }
1920            (
1921                AggregateFunc::SumFloat64,
1922                Accum::Float {
1923                    accum,
1924                    pos_infs,
1925                    neg_infs,
1926                    nans,
1927                    non_nulls: _,
1928                },
1929            ) => {
1930                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1931                    // NaNs are NaNs and cases where we've seen a
1932                    // mixture of positive and negative infinities.
1933                    Datum::from(f64::NAN)
1934                } else if pos_infs.is_positive() {
1935                    Datum::from(f64::INFINITY)
1936                } else if neg_infs.is_positive() {
1937                    Datum::from(f64::NEG_INFINITY)
1938                } else {
1939                    Datum::from(f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE)
1940                }
1941            }
1942            (
1943                AggregateFunc::SumNumeric,
1944                Accum::Numeric {
1945                    accum,
1946                    pos_infs,
1947                    neg_infs,
1948                    nans,
1949                    non_nulls: _,
1950                },
1951            ) => {
1952                let mut cx_datum = numeric::cx_datum();
1953                let d = cx_datum.to_width(accum.0);
1954                // Take a wide decimal (aggregator) into a
1955                // narrow decimal (datum). If this operation
1956                // overflows the datum, this new value will be
1957                // +/- infinity. However, the aggregator tracks
1958                // the amount of overflow, making it invertible.
1959                let inf_d = d.is_infinite();
1960                let neg_d = d.is_negative();
1961                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1962                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1963                if nans.is_positive() || (pos_inf && neg_inf) {
1964                    // NaNs are NaNs and cases where we've seen a
1965                    // mixture of positive and negative infinities.
1966                    Datum::from(Numeric::nan())
1967                } else if pos_inf {
1968                    Datum::from(Numeric::infinity())
1969                } else if neg_inf {
1970                    let mut cx = numeric::cx_datum();
1971                    let mut d = Numeric::infinity();
1972                    cx.neg(&mut d);
1973                    Datum::from(d)
1974                } else {
1975                    Datum::from(d)
1976                }
1977            }
1978            _ => panic!(
1979                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1980                aggr_func
1981            ),
1982        }
1983    }
1984}
1985
1986/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1987type AccumCount = mz_ore::Overflowing<i128>;
1988
1989/// Accumulates values for the various types of accumulable aggregations.
1990///
1991/// We assume that there are not more than 2^32 elements for the aggregation.
1992/// Thus we can perform a summation over i32 in an i64 accumulator
1993/// and not worry about exceeding its bounds.
1994///
1995/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1996/// point representation has less precision than a double. It is entirely possible
1997/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1998/// to preserve group guarantees.
1999#[derive(
2000    Debug,
2001    Clone,
2002    Copy,
2003    PartialEq,
2004    Eq,
2005    PartialOrd,
2006    Ord,
2007    Serialize,
2008    Deserialize
2009)]
2010enum Accum {
2011    /// Accumulates boolean values.
2012    Bool {
2013        /// The number of `true` values observed.
2014        trues: Diff,
2015        /// The number of `false` values observed.
2016        falses: Diff,
2017    },
2018    /// Accumulates simple numeric values.
2019    SimpleNumber {
2020        /// The accumulation of all non-NULL values observed.
2021        accum: AccumCount,
2022        /// The number of non-NULL values observed.
2023        non_nulls: Diff,
2024    },
2025    /// Accumulates float values.
2026    Float {
2027        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
2028        /// preserve associativity and commutativity
2029        accum: AccumCount,
2030        /// Counts +inf
2031        pos_infs: Diff,
2032        /// Counts -inf
2033        neg_infs: Diff,
2034        /// Counts NaNs
2035        nans: Diff,
2036        /// Counts non-NULL values
2037        non_nulls: Diff,
2038    },
2039    /// Accumulates arbitrary precision decimals.
2040    Numeric {
2041        /// Accumulates non-special values
2042        accum: OrderedDecimal<NumericAgg>,
2043        /// Counts +inf
2044        pos_infs: Diff,
2045        /// Counts -inf
2046        neg_infs: Diff,
2047        /// Counts NaNs
2048        nans: Diff,
2049        /// Counts non-NULL values
2050        non_nulls: Diff,
2051    },
2052}
2053
2054impl IsZero for Accum {
2055    fn is_zero(&self) -> bool {
2056        match self {
2057            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2058            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2059            Accum::Float {
2060                accum,
2061                pos_infs,
2062                neg_infs,
2063                nans,
2064                non_nulls,
2065            } => {
2066                accum.is_zero()
2067                    && pos_infs.is_zero()
2068                    && neg_infs.is_zero()
2069                    && nans.is_zero()
2070                    && non_nulls.is_zero()
2071            }
2072            Accum::Numeric {
2073                accum,
2074                pos_infs,
2075                neg_infs,
2076                nans,
2077                non_nulls,
2078            } => {
2079                accum.0.is_zero()
2080                    && pos_infs.is_zero()
2081                    && neg_infs.is_zero()
2082                    && nans.is_zero()
2083                    && non_nulls.is_zero()
2084            }
2085        }
2086    }
2087}
2088
2089impl Semigroup for Accum {
2090    fn plus_equals(&mut self, other: &Accum) {
2091        match (&mut *self, other) {
2092            (
2093                Accum::Bool { trues, falses },
2094                Accum::Bool {
2095                    trues: other_trues,
2096                    falses: other_falses,
2097                },
2098            ) => {
2099                *trues += other_trues;
2100                *falses += other_falses;
2101            }
2102            (
2103                Accum::SimpleNumber { accum, non_nulls },
2104                Accum::SimpleNumber {
2105                    accum: other_accum,
2106                    non_nulls: other_non_nulls,
2107                },
2108            ) => {
2109                *accum += other_accum;
2110                *non_nulls += other_non_nulls;
2111            }
2112            (
2113                Accum::Float {
2114                    accum,
2115                    pos_infs,
2116                    neg_infs,
2117                    nans,
2118                    non_nulls,
2119                },
2120                Accum::Float {
2121                    accum: other_accum,
2122                    pos_infs: other_pos_infs,
2123                    neg_infs: other_neg_infs,
2124                    nans: other_nans,
2125                    non_nulls: other_non_nulls,
2126                },
2127            ) => {
2128                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2129                    warn!("Float accumulator overflow. Incorrect results possible");
2130                    accum.wrapping_add(*other_accum)
2131                });
2132                *pos_infs += other_pos_infs;
2133                *neg_infs += other_neg_infs;
2134                *nans += other_nans;
2135                *non_nulls += other_non_nulls;
2136            }
2137            (
2138                Accum::Numeric {
2139                    accum,
2140                    pos_infs,
2141                    neg_infs,
2142                    nans,
2143                    non_nulls,
2144                },
2145                Accum::Numeric {
2146                    accum: other_accum,
2147                    pos_infs: other_pos_infs,
2148                    neg_infs: other_neg_infs,
2149                    nans: other_nans,
2150                    non_nulls: other_non_nulls,
2151                },
2152            ) => {
2153                let mut cx_agg = numeric::cx_agg();
2154                cx_agg.add(&mut accum.0, &other_accum.0);
2155                // `rounded` signals we have exceeded the aggregator's max
2156                // precision, which means we've lost commutativity and
2157                // associativity; nothing to be done here, so panic. For more
2158                // context, see the DEC_Rounded definition at
2159                // http://speleotrove.com/decimal/dncont.html
2160                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2161                // Reduce to reclaim unused decimal precision. Note that this
2162                // reduction must happen somewhere to make the following
2163                // invertible:
2164                // ```
2165                // CREATE TABLE a (a numeric);
2166                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2167                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2168                // ```
2169                // This will now return infinity. However, we can retract the
2170                // value that blew up its precision:
2171                // ```
2172                // INSERT INTO a VALUES ('-9e-39');
2173                // ```
2174                // This leaves `t`'s aggregator with a value of 9e39. However,
2175                // without doing a reduction, `libdecnum` will store the value
2176                // as 9e39+0e-39, which still exceeds the narrower context's
2177                // precision. By doing the reduction, we can "reclaim" the 39
2178                // digits of precision.
2179                cx_agg.reduce(&mut accum.0);
2180                *pos_infs += other_pos_infs;
2181                *neg_infs += other_neg_infs;
2182                *nans += other_nans;
2183                *non_nulls += other_non_nulls;
2184            }
2185            (l, r) => unreachable!(
2186                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2187            ),
2188        }
2189    }
2190}
2191
2192impl Multiply<Diff> for Accum {
2193    type Output = Accum;
2194
2195    fn multiply(self, factor: &Diff) -> Accum {
2196        let factor = *factor;
2197        match self {
2198            Accum::Bool { trues, falses } => Accum::Bool {
2199                trues: trues * factor,
2200                falses: falses * factor,
2201            },
2202            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2203                accum: accum * AccumCount::from(factor),
2204                non_nulls: non_nulls * factor,
2205            },
2206            Accum::Float {
2207                accum,
2208                pos_infs,
2209                neg_infs,
2210                nans,
2211                non_nulls,
2212            } => Accum::Float {
2213                accum: accum
2214                    .checked_mul(AccumCount::from(factor))
2215                    .unwrap_or_else(|| {
2216                        warn!("Float accumulator overflow. Incorrect results possible");
2217                        accum.wrapping_mul(AccumCount::from(factor))
2218                    }),
2219                pos_infs: pos_infs * factor,
2220                neg_infs: neg_infs * factor,
2221                nans: nans * factor,
2222                non_nulls: non_nulls * factor,
2223            },
2224            Accum::Numeric {
2225                accum,
2226                pos_infs,
2227                neg_infs,
2228                nans,
2229                non_nulls,
2230            } => {
2231                let mut cx = numeric::cx_agg();
2232                let mut f = NumericAgg::from(factor.into_inner());
2233                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2234                // always be an integer, i.e. we are never increasing the
2235                // values' scale.
2236                cx.mul(&mut f, &accum.0);
2237                // `rounded` signals we have exceeded the aggregator's max
2238                // precision, which means we've lost commutativity and
2239                // associativity; nothing to be done here, so panic. For more
2240                // context, see the DEC_Rounded definition at
2241                // http://speleotrove.com/decimal/dncont.html
2242                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2243                Accum::Numeric {
2244                    accum: OrderedDecimal(f),
2245                    pos_infs: pos_infs * factor,
2246                    neg_infs: neg_infs * factor,
2247                    nans: nans * factor,
2248                    non_nulls: non_nulls * factor,
2249                }
2250            }
2251        }
2252    }
2253}
2254
2255impl Columnation for Accum {
2256    type InnerRegion = CopyRegion<Self>;
2257}
2258
2259/// Monoids for in-place compaction of monotonic streams.
2260mod monoids {
2261
2262    // We can improve the performance of some aggregations through the use of algebra.
2263    // In particular, we can move some of the aggregations in to the `diff` field of
2264    // updates, by changing `diff` from integers to a different algebraic structure.
2265    //
2266    // The one we use is called a "semigroup", and it means that the structure has a
2267    // symmetric addition operator. The trait we use also allows the semigroup elements
2268    // to present as "zero", meaning they always act as the identity under +. Here,
2269    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2270    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2271    // of DD, this Semigroup should _not_ have a zero.
2272    //
2273    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2274    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2275    // assumes this.
2276
2277    use columnation::{Columnation, Region};
2278    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2279    use mz_expr::AggregateFunc;
2280    use mz_ore::soft_panic_or_log;
2281    use mz_repr::{Datum, Diff, Row};
2282    use serde::{Deserialize, Serialize};
2283
2284    /// A monoid containing a single-datum row.
2285    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2286    pub enum ReductionMonoid {
2287        Min(Row),
2288        Max(Row),
2289    }
2290
2291    impl ReductionMonoid {
2292        pub fn finalize(&self) -> &Row {
2293            use ReductionMonoid::*;
2294            match self {
2295                Min(row) | Max(row) => row,
2296            }
2297        }
2298    }
2299
2300    impl Clone for ReductionMonoid {
2301        fn clone(&self) -> Self {
2302            use ReductionMonoid::*;
2303            match self {
2304                Min(row) => Min(row.clone()),
2305                Max(row) => Max(row.clone()),
2306            }
2307        }
2308
2309        fn clone_from(&mut self, source: &Self) {
2310            use ReductionMonoid::*;
2311
2312            let mut row = std::mem::take(match self {
2313                Min(row) | Max(row) => row,
2314            });
2315
2316            let source_row = match source {
2317                Min(row) | Max(row) => row,
2318            };
2319
2320            row.clone_from(source_row);
2321
2322            match source {
2323                Min(_) => *self = Min(row),
2324                Max(_) => *self = Max(row),
2325            }
2326        }
2327    }
2328
2329    impl Multiply<Diff> for ReductionMonoid {
2330        type Output = Self;
2331
2332        fn multiply(self, factor: &Diff) -> Self {
2333            // Multiplication in ReductionMonoid is idempotent, and
2334            // its users must ascertain its monotonicity beforehand
2335            // (typically with ensure_monotonic) since it has no zero
2336            // value for us to use here.
2337            assert!(factor.is_positive());
2338            self
2339        }
2340    }
2341
2342    impl Semigroup for ReductionMonoid {
2343        fn plus_equals(&mut self, rhs: &Self) {
2344            match (self, rhs) {
2345                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2346                    let swap = {
2347                        let lhs_val = lhs.unpack_first();
2348                        let rhs_val = rhs.unpack_first();
2349                        // Datum::Null is the identity, not a small element.
2350                        match (lhs_val, rhs_val) {
2351                            (_, Datum::Null) => false,
2352                            (Datum::Null, _) => true,
2353                            (lhs, rhs) => rhs < lhs,
2354                        }
2355                    };
2356                    if swap {
2357                        lhs.clone_from(rhs);
2358                    }
2359                }
2360                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2361                    let swap = {
2362                        let lhs_val = lhs.unpack_first();
2363                        let rhs_val = rhs.unpack_first();
2364                        // Datum::Null is the identity, not a large element.
2365                        match (lhs_val, rhs_val) {
2366                            (_, Datum::Null) => false,
2367                            (Datum::Null, _) => true,
2368                            (lhs, rhs) => rhs > lhs,
2369                        }
2370                    };
2371                    if swap {
2372                        lhs.clone_from(rhs);
2373                    }
2374                }
2375                (lhs, rhs) => {
2376                    soft_panic_or_log!(
2377                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2378                    );
2379                }
2380            }
2381        }
2382    }
2383
2384    impl IsZero for ReductionMonoid {
2385        fn is_zero(&self) -> bool {
2386            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2387            // DD uses true results of this method to make stuff disappear. This makes sense when
2388            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2389            // We don't want funny stuff, like disappearing, happening to reduction results even
2390            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2391            false
2392        }
2393    }
2394
2395    impl Columnation for ReductionMonoid {
2396        type InnerRegion = ReductionMonoidRegion;
2397    }
2398
2399    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2400    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2401    /// the former for simplicity reasons.
2402    #[derive(Default)]
2403    pub struct ReductionMonoidRegion {
2404        inner: <Row as Columnation>::InnerRegion,
2405    }
2406
2407    impl Region for ReductionMonoidRegion {
2408        type Item = ReductionMonoid;
2409
2410        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2411            use ReductionMonoid::*;
2412            match item {
2413                Min(row) => Min(unsafe { self.inner.copy(row) }),
2414                Max(row) => Max(unsafe { self.inner.copy(row) }),
2415            }
2416        }
2417
2418        fn clear(&mut self) {
2419            self.inner.clear();
2420        }
2421
2422        fn reserve_items<'a, I>(&mut self, items: I)
2423        where
2424            Self: 'a,
2425            I: Iterator<Item = &'a Self::Item> + Clone,
2426        {
2427            self.inner
2428                .reserve_items(items.map(ReductionMonoid::finalize));
2429        }
2430
2431        fn reserve_regions<'a, I>(&mut self, regions: I)
2432        where
2433            Self: 'a,
2434            I: Iterator<Item = &'a Self> + Clone,
2435        {
2436            self.inner.reserve_regions(regions.map(|r| &r.inner));
2437        }
2438
2439        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2440            self.inner.heap_size(callback);
2441        }
2442    }
2443
2444    /// Get the correct monoid implementation for a given aggregation function. Note that
2445    /// all hierarchical aggregation functions need to supply a monoid implementation.
2446    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2447        match func {
2448            AggregateFunc::MaxNumeric
2449            | AggregateFunc::MaxInt16
2450            | AggregateFunc::MaxInt32
2451            | AggregateFunc::MaxInt64
2452            | AggregateFunc::MaxUInt16
2453            | AggregateFunc::MaxUInt32
2454            | AggregateFunc::MaxUInt64
2455            | AggregateFunc::MaxMzTimestamp
2456            | AggregateFunc::MaxFloat32
2457            | AggregateFunc::MaxFloat64
2458            | AggregateFunc::MaxBool
2459            | AggregateFunc::MaxString
2460            | AggregateFunc::MaxDate
2461            | AggregateFunc::MaxTimestamp
2462            | AggregateFunc::MaxTimestampTz
2463            | AggregateFunc::MaxInterval
2464            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2465            AggregateFunc::MinNumeric
2466            | AggregateFunc::MinInt16
2467            | AggregateFunc::MinInt32
2468            | AggregateFunc::MinInt64
2469            | AggregateFunc::MinUInt16
2470            | AggregateFunc::MinUInt32
2471            | AggregateFunc::MinUInt64
2472            | AggregateFunc::MinMzTimestamp
2473            | AggregateFunc::MinFloat32
2474            | AggregateFunc::MinFloat64
2475            | AggregateFunc::MinBool
2476            | AggregateFunc::MinString
2477            | AggregateFunc::MinDate
2478            | AggregateFunc::MinTimestamp
2479            | AggregateFunc::MinTimestampTz
2480            | AggregateFunc::MinInterval
2481            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2482            AggregateFunc::SumInt16
2483            | AggregateFunc::SumInt32
2484            | AggregateFunc::SumInt64
2485            | AggregateFunc::SumUInt16
2486            | AggregateFunc::SumUInt32
2487            | AggregateFunc::SumUInt64
2488            | AggregateFunc::SumFloat32
2489            | AggregateFunc::SumFloat64
2490            | AggregateFunc::SumNumeric
2491            | AggregateFunc::Count
2492            | AggregateFunc::Any
2493            | AggregateFunc::All
2494            | AggregateFunc::Dummy
2495            | AggregateFunc::JsonbAgg { .. }
2496            | AggregateFunc::JsonbObjectAgg { .. }
2497            | AggregateFunc::MapAgg { .. }
2498            | AggregateFunc::ArrayConcat { .. }
2499            | AggregateFunc::ListConcat { .. }
2500            | AggregateFunc::StringAgg { .. }
2501            | AggregateFunc::RowNumber { .. }
2502            | AggregateFunc::Rank { .. }
2503            | AggregateFunc::DenseRank { .. }
2504            | AggregateFunc::LagLead { .. }
2505            | AggregateFunc::FirstValue { .. }
2506            | AggregateFunc::LastValue { .. }
2507            | AggregateFunc::WindowAggregate { .. }
2508            | AggregateFunc::FusedValueWindowFunc { .. }
2509            | AggregateFunc::FusedWindowAggregate { .. } => None,
2510        }
2511    }
2512}
2513
2514mod window_agg_helpers {
2515    use crate::render::reduce::*;
2516
2517    /// TODO: It would be better for performance to do the branching that is in the methods of this
2518    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2519    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2520    /// directly.
2521    pub enum OneByOneAggrImpls {
2522        Accumulable(AccumulableOneByOneAggr),
2523        Hierarchical(HierarchicalOneByOneAggr),
2524        Basic(mz_expr::NaiveOneByOneAggr),
2525    }
2526
2527    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2528        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2529            match reduction_type(agg) {
2530                ReductionType::Basic => {
2531                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2532                }
2533                ReductionType::Accumulable => {
2534                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2535                }
2536                ReductionType::Hierarchical => {
2537                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2538                }
2539            }
2540        }
2541
2542        fn give(&mut self, d: &Datum) {
2543            match self {
2544                OneByOneAggrImpls::Basic(i) => i.give(d),
2545                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2546                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2547            }
2548        }
2549
2550        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2551            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2552            match self {
2553                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2554                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2555                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2556            }
2557        }
2558    }
2559
2560    pub struct AccumulableOneByOneAggr {
2561        aggr_func: AggregateFunc,
2562        accum: Accum,
2563        total: Diff,
2564    }
2565
2566    impl AccumulableOneByOneAggr {
2567        fn new(aggr_func: &AggregateFunc) -> Self {
2568            AccumulableOneByOneAggr {
2569                aggr_func: aggr_func.clone(),
2570                accum: accumulable_zero(aggr_func),
2571                total: Diff::ZERO,
2572            }
2573        }
2574
2575        fn give(&mut self, d: &Datum) {
2576            self.accum
2577                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2578            self.total += Diff::ONE;
2579        }
2580
2581        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2582            temp_storage.make_datum(|packer| {
2583                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2584            })
2585        }
2586    }
2587
2588    pub struct HierarchicalOneByOneAggr {
2589        aggr_func: AggregateFunc,
2590        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2591        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2592        monoid: ReductionMonoid,
2593    }
2594
2595    impl HierarchicalOneByOneAggr {
2596        fn new(aggr_func: &AggregateFunc) -> Self {
2597            let mut row_buf = Row::default();
2598            row_buf.packer().push(Datum::Null);
2599            HierarchicalOneByOneAggr {
2600                aggr_func: aggr_func.clone(),
2601                monoid: get_monoid(row_buf, aggr_func)
2602                    .expect("aggr_func should be a hierarchical aggregation function"),
2603            }
2604        }
2605
2606        fn give(&mut self, d: &Datum) {
2607            let mut row_buf = Row::default();
2608            row_buf.packer().push(d);
2609            let m = get_monoid(row_buf, &self.aggr_func)
2610                .expect("aggr_func should be a hierarchical aggregation function");
2611            self.monoid.plus_equals(&m);
2612        }
2613
2614        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2615            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2616        }
2617    }
2618}
2619
2620#[cfg(test)]
2621mod tests {
2622    use super::*;
2623
2624    /// The saturating conversion that `float_to_fixed_point` replaces. Used to
2625    /// assert that the new wrapping conversion agrees on the in-range values
2626    /// where the old conversion was already correct.
2627    #[allow(clippy::as_conversions)]
2628    fn saturating_convert(n: f64) -> i128 {
2629        (n * FLOAT_SCALE) as i128
2630    }
2631
2632    #[mz_ore::test]
2633    fn float_to_fixed_point_matches_saturating_in_range() {
2634        // For values whose scaled magnitude comfortably fits in an `i128`, the
2635        // wrapping conversion must produce exactly the same result the previous
2636        // saturating cast did.
2637        let cases = [
2638            0.0,
2639            -0.0,
2640            1.0,
2641            -1.0,
2642            0.1,
2643            -0.1,
2644            0.5,
2645            -0.5,
2646            3.25,
2647            -3.25,
2648            123456.789,
2649            -123456.789,
2650            1e10,
2651            -1e10,
2652            1e20,
2653            -1e20,
2654            5e30, // large, but scaled magnitude still fits comfortably in i128
2655            -5e30,
2656        ];
2657        for n in cases {
2658            assert_eq!(
2659                float_to_fixed_point(n),
2660                saturating_convert(n),
2661                "mismatch for n = {n}"
2662            );
2663        }
2664    }
2665
2666    #[mz_ore::test]
2667    fn float_to_fixed_point_truncates_toward_zero() {
2668        // 1.75 * 2^24 = 29360128, exactly representable.
2669        assert_eq!(float_to_fixed_point(1.75), 29_360_128);
2670        assert_eq!(float_to_fixed_point(-1.75), -29_360_128);
2671
2672        // Fractional results truncate toward zero, matching the previous cast.
2673        let frac = 0.123_456_7_f64;
2674        assert_eq!(float_to_fixed_point(frac), saturating_convert(frac));
2675        assert_eq!(float_to_fixed_point(-frac), saturating_convert(-frac));
2676        assert_eq!(float_to_fixed_point(-frac), -float_to_fixed_point(frac));
2677    }
2678
2679    #[mz_ore::test]
2680    fn float_to_fixed_point_subnormals_round_to_zero() {
2681        assert_eq!(float_to_fixed_point(0.0), 0);
2682        assert_eq!(float_to_fixed_point(-0.0), 0);
2683        assert_eq!(float_to_fixed_point(f64::MIN_POSITIVE / 2.0), 0);
2684        assert_eq!(float_to_fixed_point(5e-324), 0); // smallest subnormal
2685    }
2686
2687    #[mz_ore::test]
2688    fn float_to_fixed_point_cancels_large_finite_values() {
2689        // Regression test for database-issues#11265: large finite values that
2690        // individually overflow the fixed-point domain must still sum to the
2691        // correct result when their mathematical sum is representable. The
2692        // previous saturating conversion produced `i128::MAX + i128::MIN == -1`.
2693        for &n in &[1.1e31_f64, 1e32, 5e33, 1e284] {
2694            assert_eq!(
2695                float_to_fixed_point(n).wrapping_add(float_to_fixed_point(-n)),
2696                0,
2697                "n = {n} did not cancel with -n"
2698            );
2699        }
2700    }
2701
2702    #[mz_ore::test]
2703    fn float_to_fixed_point_sum_via_accumulator() {
2704        // Exercise the full accumulate-then-finalize path for the reported case.
2705        let func = AggregateFunc::SumFloat64;
2706        let mut acc = accumulable_zero(&func);
2707        acc.plus_equals(&datum_to_accumulator(&func, Datum::from(1.1e31_f64)));
2708        acc.plus_equals(&datum_to_accumulator(&func, Datum::from(-1.1e31_f64)));
2709        let datum = finalize_accum(&func, &acc, Diff::from(2_i64));
2710        assert_eq!(datum, Datum::from(0.0_f64));
2711    }
2712}