Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15
16use columnation::{Columnation, CopyRegion};
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
22use differential_dataflow::hashable::Hashable;
23use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
24use differential_dataflow::trace::implementations::BatchContainer;
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Data, VecCollection};
27use itertools::Itertools;
28use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
29use mz_compute_types::plan::ArrangementStrategy;
30use mz_compute_types::plan::reduce::{
31    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_ore::cast::CastLossy;
38use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
39use mz_repr::fixed_length::ToDatumIter;
40use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
41use mz_timely_util::columnation::ColumnationChunker;
42use mz_timely_util::operator::CollectionExt;
43use num_traits::Float;
44use serde::{Deserialize, Serialize};
45use timely::Container;
46use timely::container::{CapacityContainerBuilder, PushInto};
47use tracing::warn;
48
49use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
50use crate::extensions::reduce::{ClearContainer, MzReduce};
51use crate::render::context::{CollectionBundle, Context};
52use crate::render::errors::DataflowErrorSer;
53use crate::render::errors::MaybeValidatingRow;
54use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
55use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
56use crate::typedefs::{
57    ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58    RowRowSpine, RowSpine, RowValSpine,
59};
60use mz_row_spine::{
61    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
62};
63
64impl<'scope, T: RenderTimestamp> Context<'scope, T> {
65    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
66    /// minimize worst-case incremental update times and memory footprint.
67    pub fn render_reduce(
68        &self,
69        input_key: Option<Vec<MirScalarExpr>>,
70        input: CollectionBundle<'scope, T>,
71        key_val_plan: KeyValPlan,
72        reduce_plan: ReducePlan,
73        mfp_after: Option<MapFilterProject>,
74        temporal_bucketing_strategy: ArrangementStrategy,
75    ) -> CollectionBundle<'scope, T>
76    where
77        T: crate::render::MaybeBucketByTime,
78    {
79        // Convert `mfp_after` to an actionable plan.
80        let mfp_after = mfp_after.map(|m| {
81            m.into_plan()
82                .expect("MFP planning must succeed")
83                .into_nontemporal()
84                .expect("Fused Reduce MFPs do not have temporal predicates")
85        });
86
87        input.scope().region_named("Reduce", |inner| {
88            let KeyValPlan {
89                mut key_plan,
90                mut val_plan,
91            } = key_val_plan;
92            let key_arity = key_plan.projection.len();
93            let mut datums = DatumVec::new();
94
95            // Determine the columns we'll need from the row.
96            let mut demand = Vec::new();
97            demand.extend(key_plan.demand());
98            demand.extend(val_plan.demand());
99            demand.sort();
100            demand.dedup();
101
102            // remap column references to the subset we use.
103            let mut demand_map = BTreeMap::new();
104            for column in demand.iter() {
105                demand_map.insert(*column, demand_map.len());
106            }
107            let demand_map_len = demand_map.len();
108            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
110            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
111            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
112
113            let (key_val_input, err) = input
114                .enter_region(inner)
115                .flat_map::<_, ConsolidatingContainerBuilder<Vec<((Row, Row), T, Diff)>>, _>(
116                    input_key.map(|k| (k, None)),
117                    max_demand,
118                    move |row_datums, time, diff, ok_session, err_session| {
119                        let mut row_builder = SharedRow::get();
120                        let temp_storage = RowArena::new();
121
122                        let mut row_iter = row_datums.drain(..);
123                        let mut datums_local = datums.borrow();
124                        // Unpack only the demanded columns.
125                        for skip in skips.iter() {
126                            datums_local.push(row_iter.nth(*skip).unwrap());
127                        }
128
129                        // Evaluate the key expressions.
130                        let key = key_plan.evaluate_into(
131                            &mut datums_local,
132                            &temp_storage,
133                            &mut row_builder,
134                        );
135                        let key = match key {
136                            Err(e) => {
137                                err_session.give((e.into(), time, diff));
138                                return 1;
139                            }
140                            Ok(Some(key)) => key.clone(),
141                            Ok(None) => panic!("Row expected as no predicate was used"),
142                        };
143
144                        // Evaluate the value expressions.
145                        // The prior evaluation may have left additional columns we should delete.
146                        datums_local.truncate(skips.len());
147                        let val = val_plan.evaluate_into(
148                            &mut datums_local,
149                            &temp_storage,
150                            &mut row_builder,
151                        );
152                        let val = match val {
153                            Err(e) => {
154                                err_session.give((e.into(), time, diff));
155                                return 1;
156                            }
157                            Ok(Some(val)) => val.clone(),
158                            Ok(None) => panic!("Row expected as no predicate was used"),
159                        };
160
161                        ok_session.give(((key, val), time, diff));
162                        1
163                    },
164                );
165
166            // Bucket the keyed `(key, val)` stream when lowering chose `TemporalBucketing`.
167            // `Reduce` builds its own arrangement via `KeyValPlan`, bypassing
168            // `ensure_collections`, so the strategy is plumbed through `PlanNode::Reduce`
169            // rather than inferred at the arrangement site. No-op for `Direct`.
170            let key_val_collection = key_val_input.as_collection();
171            let key_val_collection = if matches!(
172                temporal_bucketing_strategy,
173                ArrangementStrategy::TemporalBucketing
174            ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
175            {
176                let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
177                    .get(&self.config_set)
178                    .try_into()
179                    .expect("must fit");
180                T::maybe_apply_temporal_bucketing(
181                    key_val_collection.inner,
182                    self.as_of_frontier.clone(),
183                    summary,
184                )
185            } else {
186                key_val_collection
187            };
188
189            // Render the reduce plan
190            self.render_reduce_plan(reduce_plan, key_val_collection, err, key_arity, mfp_after)
191                .leave_region(self.scope)
192        })
193    }
194
195    /// Render a dataflow based on the provided plan.
196    ///
197    /// The output will be an arrangements that looks the same as if
198    /// we just had a single reduce operator computing everything together, and
199    /// this arrangement can also be re-used.
200    fn render_reduce_plan<'s>(
201        &self,
202        plan: ReducePlan,
203        collection: VecCollection<'s, T, (Row, Row), Diff>,
204        err_input: VecCollection<'s, T, DataflowErrorSer, Diff>,
205        key_arity: usize,
206        mfp_after: Option<SafeMfpPlan>,
207    ) -> CollectionBundle<'s, T> {
208        let mut errors = Default::default();
209        let arrangement =
210            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
211        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
212        CollectionBundle::from_columns(
213            0..key_arity,
214            ArrangementFlavor::Local(
215                arrangement,
216                errs.mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
217                    "Arrange bundle err",
218                ),
219            ),
220        )
221    }
222
223    fn render_reduce_plan_inner<'s>(
224        &self,
225        plan: ReducePlan,
226        collection: VecCollection<'s, T, (Row, Row), Diff>,
227        errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
228        key_arity: usize,
229        mfp_after: Option<SafeMfpPlan>,
230    ) -> Arranged<'s, RowRowAgent<T, Diff>> {
231        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
232        // not only values (database-issues#6658).
233        let arrangement = match plan {
234            // If we have no aggregations or just a single type of reduction, we
235            // can go ahead and render them directly.
236            ReducePlan::Distinct => {
237                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
238                errors.push(errs);
239                arranged_output
240            }
241            ReducePlan::Accumulable(expr) => {
242                let (arranged_output, errs) =
243                    self.build_accumulable(collection, expr, key_arity, mfp_after);
244                errors.push(errs);
245                arranged_output
246            }
247            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
248                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
249                errors.push(errs);
250                output
251            }
252            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
253                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
254                errors.push(errs);
255                output
256            }
257            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
258                expr,
259                fused_unnest_list,
260            })) => {
261                // Note that we skip validating for negative diffs when we have a fused unnest list,
262                // because this is already a CPU-intensive situation due to the non-incrementalness
263                // of window functions.
264                let validating = !fused_unnest_list;
265                let (output, errs) = self.build_basic_aggregate(
266                    collection,
267                    0,
268                    &expr,
269                    validating,
270                    key_arity,
271                    mfp_after,
272                    fused_unnest_list,
273                );
274                if validating {
275                    errors.push(errs.expect("validation should have occurred as it was requested"));
276                }
277                output
278            }
279            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
280                let (output, errs) =
281                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
282                errors.push(errs);
283                output
284            }
285        };
286        arrangement
287    }
288
289    /// Build the dataflow to compute the set of distinct keys.
290    fn build_distinct<'s>(
291        &self,
292        collection: VecCollection<'s, T, (Row, Row), Diff>,
293        mfp_after: Option<SafeMfpPlan>,
294    ) -> (
295        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
296        VecCollection<'s, T, DataflowErrorSer, Diff>,
297    ) {
298        let error_logger = self.error_logger();
299
300        // Allocations for the two closures.
301        let mut datums1 = DatumVec::new();
302        let mut datums2 = DatumVec::new();
303        let mfp_after1 = mfp_after.clone();
304        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
305
306        let arranged = collection
307            .mz_arrange::<
308                ColumnationChunker<_>,
309                RowRowBatcher<_, _>,
310                RowRowBuilder<_, _>,
311                RowRowSpine<_, _>,
312            >(
313                "Arranged DistinctBy",
314            );
315        let output = arranged
316            .clone()
317            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
318                "DistinctBy",
319                move |key, _input, output| {
320                    let temp_storage = RowArena::new();
321                    let mut datums_local = datums1.borrow();
322                    datums_local.extend(key.to_datum_iter());
323
324                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
325                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
326                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
327                    if mfp_after1
328                        .as_ref()
329                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
330                        .unwrap_or(Ok(true))
331                        == Ok(true)
332                    {
333                        // We're pushing a unit value here because the key is implicitly added by the
334                        // arrangement, and the permutation logic takes care of using the key part of the
335                        // output.
336                        output.push((Row::default(), Diff::ONE));
337                    }
338                },
339            );
340        let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
341            "DistinctByErrorCheck",
342            move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowErrorSer, _)>| {
343                for (_, count) in input.iter() {
344                    if count.is_positive() {
345                        continue;
346                    }
347                    let message = "Non-positive multiplicity in DistinctBy";
348                    error_logger.log(message, &format!("row={key:?}, count={count}"));
349                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
350                    return;
351                }
352                // If `mfp_after` can error, then evaluate it here.
353                let Some(mfp) = &mfp_after2 else { return };
354                let temp_storage = RowArena::new();
355                let datum_iter = key.to_datum_iter();
356                let mut datums_local = datums2.borrow();
357                datums_local.extend(datum_iter);
358
359                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
360                    output.push((e.into(), Diff::ONE));
361                }
362            },
363        );
364        (output, errors.as_collection(|_k, v| v.clone()))
365    }
366
367    /// Build the dataflow to compute and arrange multiple non-accumulable,
368    /// non-hierarchical aggregations on `input`.
369    ///
370    /// This function assumes that we are explicitly rendering multiple basic aggregations.
371    /// For each aggregate, we render a different reduce operator, and then fuse
372    /// results together into a final arrangement that presents all the results
373    /// in the order specified by `aggrs`.
374    fn build_basic_aggregates<'s>(
375        &self,
376        input: VecCollection<'s, T, (Row, Row), Diff>,
377        aggrs: Vec<AggregateExpr>,
378        key_arity: usize,
379        mfp_after: Option<SafeMfpPlan>,
380    ) -> (
381        RowRowArrangement<'s, T>,
382        VecCollection<'s, T, DataflowErrorSer, Diff>,
383    ) {
384        // We are only using this function to render multiple basic aggregates and
385        // stitch them together. If that's not true we should complain.
386        if aggrs.len() <= 1 {
387            self.error_logger().soft_panic_or_log(
388                "Too few aggregations when building basic aggregates",
389                &format!("len={}", aggrs.len()),
390            )
391        }
392        let mut err_output = None;
393        let mut to_collect = Vec::new();
394        for (index, aggr) in aggrs.into_iter().enumerate() {
395            let (result, errs) = self.build_basic_aggregate(
396                input.clone(),
397                index,
398                &aggr,
399                err_output.is_none(),
400                key_arity,
401                None,
402                false,
403            );
404            if errs.is_some() {
405                err_output = errs
406            }
407            to_collect
408                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
409        }
410
411        // Allocations for the two closures.
412        let mut datums1 = DatumVec::new();
413        let mut datums2 = DatumVec::new();
414        let mfp_after1 = mfp_after.clone();
415        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
416
417        let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
418            .mz_arrange::<
419                ColumnationChunker<_>,
420                RowValBatcher<_, _, _>,
421                RowValBuilder<_, _, _>,
422                RowValSpine<_, _, _>,
423            >(
424            "Arranged ReduceFuseBasic input",
425        );
426
427        let output = arranged
428            .clone()
429            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
430                move |key, input, output| {
431                    let temp_storage = RowArena::new();
432                    let datum_iter = key.to_datum_iter();
433                    let mut datums_local = datums1.borrow();
434                    datums_local.extend(datum_iter);
435                    let key_len = datums_local.len();
436
437                    for ((_, row), _) in input.iter() {
438                        datums_local.push(row.unpack_first());
439                    }
440
441                    if let Some(row) =
442                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
443                    {
444                        output.push((row, Diff::ONE));
445                    }
446                }
447            });
448        // If `mfp_after` can error, then we need to render a paired reduction
449        // to scan for these potential errors. Note that we cannot directly use
450        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
451        // conditionally render the second component of the reduction pair.
452        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
453        if let Some(mfp) = mfp_after2 {
454            let mfp_errs = arranged
455                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
456                    "ReduceFuseBasic Error Check",
457                    move |key, input, output| {
458                        // Since negative accumulations are checked in at least one component
459                        // aggregate, we only need to look for MFP errors here.
460                        let temp_storage = RowArena::new();
461                        let datum_iter = key.to_datum_iter();
462                        let mut datums_local = datums2.borrow();
463                        datums_local.extend(datum_iter);
464
465                        for ((_, row), _) in input.iter() {
466                            datums_local.push(row.unpack_first());
467                        }
468
469                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
470                            output.push((e.into(), Diff::ONE));
471                        }
472                    },
473                )
474                .as_collection(|_, v| v.clone());
475            (output, validation_errs.concat(mfp_errs))
476        } else {
477            (output, validation_errs)
478        }
479    }
480
481    /// Build the dataflow to compute a single basic aggregation.
482    ///
483    /// This method also applies distinctness if required.
484    fn build_basic_aggregate<'s>(
485        &self,
486        input: VecCollection<'s, T, (Row, Row), Diff>,
487        index: usize,
488        aggr: &AggregateExpr,
489        validating: bool,
490        key_arity: usize,
491        mfp_after: Option<SafeMfpPlan>,
492        fused_unnest_list: bool,
493    ) -> (
494        RowRowArrangement<'s, T>,
495        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
496    ) {
497        let AggregateExpr {
498            func,
499            expr: _,
500            distinct,
501        } = aggr.clone();
502
503        // Extract the value we were asked to aggregate over.
504        let mut partial = input.map(move |(key, row)| {
505            let mut row_builder = SharedRow::get();
506            let value = row.iter().nth(index).unwrap();
507            row_builder.packer().push(value);
508            (key, row_builder.clone())
509        });
510
511        let mut err_output = None;
512
513        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
514        if distinct {
515            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
516            let pairer = Pairer::new(key_arity);
517            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
518            if validating {
519                let (oks, errs) = self
520                    .build_reduce_inaccumulable_distinct::<
521                        RowValBuilder<Result<(), String>, _, _>,
522                        RowValSpine<Result<(), String>, _, _>,
523                    >(keyed, None)
524                    .as_collection(|k, v| {
525                        (
526                            k.to_row(),
527                            v.as_ref()
528                                .map(|&()| ())
529                                .map_err(|m| m.as_str().into()),
530                        )
531                    })
532                    .map_fallible::<
533                        CapacityContainerBuilder<_>,
534                        CapacityContainerBuilder<_>,
535                        _,
536                        _,
537                        _,
538                    >(
539                        "Demux Errors",
540                        move |(key_val, result)| match result {
541                            Ok(()) => Ok(pairer.split(&key_val)),
542                            Err(m) => {
543                                Err(EvalError::Internal(m).into())
544                            }
545                        },
546                    );
547                err_output = Some(errs);
548                partial = oks;
549            } else {
550                partial = self
551                    .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
552                        keyed,
553                        Some(" [val: empty]"),
554                    )
555                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
556            }
557        }
558
559        // Allocations for the two closures.
560        let mut datums1 = DatumVec::new();
561        let mut datums2 = DatumVec::new();
562        let mut datums_key_1 = DatumVec::new();
563        let mut datums_key_2 = DatumVec::new();
564        let mfp_after1 = mfp_after.clone();
565        let func2 = func.clone();
566
567        let name = if !fused_unnest_list {
568            "ReduceInaccumulable"
569        } else {
570            "FusedReduceUnnestList"
571        };
572        let arranged = partial
573            .mz_arrange::<
574                ColumnationChunker<_>,
575                RowRowBatcher<_, _>,
576                RowRowBuilder<_, _>,
577                RowRowSpine<_, _>,
578            >(&format!(
579                "Arranged {name}"
580            ));
581        let oks = if !fused_unnest_list {
582            arranged
583                .clone()
584                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
585                    move |key, source, target| {
586                        // We respect the multiplicity here (unlike in hierarchical aggregation)
587                        // because we don't know that the aggregation method is not sensitive
588                        // to the number of records.
589                        let iter = source.iter().flat_map(|(v, w)| {
590                            // Note that in the non-positive case, this is wrong, but harmless because
591                            // our other reduction will produce an error.
592                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
593                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
594                        });
595
596                        let temp_storage = RowArena::new();
597                        let datum_iter = key.to_datum_iter();
598                        let mut datums_local = datums1.borrow();
599                        datums_local.extend(datum_iter);
600                        let key_len = datums_local.len();
601                        datums_local.push(
602                        // Note that this is not necessarily a window aggregation, in which case
603                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
604                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
605                            iter,
606                            &temp_storage,
607                        ),
608                    );
609
610                        if let Some(row) = evaluate_mfp_after(
611                            &mfp_after1,
612                            &mut datums_local,
613                            &temp_storage,
614                            key_len,
615                        ) {
616                            target.push((row, Diff::ONE));
617                        }
618                    }
619                })
620        } else {
621            arranged
622                .clone()
623                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
624                    move |key, source, target| {
625                        // This part is the same as in the `!fused_unnest_list` if branch above.
626                        let iter = source.iter().flat_map(|(v, w)| {
627                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
628                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
629                        });
630
631                        // This is the part that is specific to the `fused_unnest_list` branch.
632                        let temp_storage = RowArena::new();
633                        let mut datums_local = datums_key_1.borrow();
634                        datums_local.extend(key.to_datum_iter());
635                        let key_len = datums_local.len();
636                        for datum in func
637                            .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
638                                iter,
639                                &temp_storage,
640                            )
641                        {
642                            datums_local.truncate(key_len);
643                            datums_local.push(datum);
644                            if let Some(row) = evaluate_mfp_after(
645                                &mfp_after1,
646                                &mut datums_local,
647                                &temp_storage,
648                                key_len,
649                            ) {
650                                target.push((row, Diff::ONE));
651                            }
652                        }
653                    }
654                })
655        };
656
657        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
658        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
659        // rationale around using a second reduction here.
660        let must_validate = validating && err_output.is_none();
661        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
662        if must_validate || mfp_after2.is_some() {
663            let error_logger = self.error_logger();
664
665            let errs = if !fused_unnest_list {
666                arranged
667                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
668                        &format!("{name} Error Check"),
669                        move |key, source, target| {
670                            // Negative counts would be surprising, but until we are 100% certain we won't
671                            // see them, we should report when we do. We may want to bake even more info
672                            // in here in the future.
673                            if must_validate {
674                                for (value, count) in source.iter() {
675                                    if count.is_positive() {
676                                        continue;
677                                    }
678                                    let value = value.to_row();
679                                    let message =
680                                        "Non-positive accumulation in ReduceInaccumulable";
681                                    error_logger
682                                        .log(message, &format!("value={value:?}, count={count}"));
683                                    let err = EvalError::Internal(message.into());
684                                    target.push((err.into(), Diff::ONE));
685                                    return;
686                                }
687                            }
688
689                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
690                            let Some(mfp) = &mfp_after2 else { return };
691                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
692                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
693                                // This would ideally use `to_datum_iter` but we cannot as it needs to
694                                // borrow `v` and only presents datums with that lifetime, not any longer.
695                                std::iter::repeat(v.next().unwrap()).take(count)
696                            });
697
698                            let temp_storage = RowArena::new();
699                            let datum_iter = key.to_datum_iter();
700                            let mut datums_local = datums2.borrow();
701                            datums_local.extend(datum_iter);
702                            datums_local.push(
703                                func2.eval_with_fast_window_agg::<
704                                    _,
705                                    window_agg_helpers::OneByOneAggrImpls,
706                                >(
707                                    iter, &temp_storage
708                                ),
709                            );
710                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
711                                target.push((e.into(), Diff::ONE));
712                            }
713                        },
714                    )
715                    .as_collection(|_, v| v.clone())
716            } else {
717                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
718                assert!(!must_validate);
719                // We couldn't have got into this if branch due to `must_validate`, so it must be
720                // because of the `mfp_after2.is_some()`.
721                let Some(mfp) = mfp_after2 else {
722                    unreachable!()
723                };
724                arranged
725                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
726                        &format!("{name} Error Check"),
727                        move |key, source, target| {
728                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
729                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
730                                // This would ideally use `to_datum_iter` but we cannot as it needs to
731                                // borrow `v` and only presents datums with that lifetime, not any longer.
732                                std::iter::repeat(v.next().unwrap()).take(count)
733                            });
734
735                            let temp_storage = RowArena::new();
736                            let mut datums_local = datums_key_2.borrow();
737                            datums_local.extend(key.to_datum_iter());
738                            let key_len = datums_local.len();
739                            for datum in func2
740                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
741                                    iter,
742                                    &temp_storage,
743                                )
744                            {
745                                datums_local.truncate(key_len);
746                                datums_local.push(datum);
747                                // We know that `mfp` can error (because of the `could_error` call
748                                // above), so try to evaluate it here.
749                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
750                                {
751                                    target.push((e.into(), Diff::ONE));
752                                }
753                            }
754                        },
755                    )
756                    .as_collection(|_, v| v.clone())
757            };
758
759            if let Some(e) = err_output {
760                err_output = Some(e.concat(errs));
761            } else {
762                err_output = Some(errs);
763            }
764        }
765        (oks, err_output)
766    }
767
768    fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
769        &self,
770        input: VecCollection<'s, T, Row, Diff>,
771        name_tag: Option<&str>,
772    ) -> Arranged<'s, TraceAgent<Tr>>
773    where
774        Tr: for<'a> Trace<
775                Key<'a> = DatumSeq<'a>,
776                KeyContainer: BatchContainer<Owned = Row>,
777                Time = T,
778                Diff = Diff,
779                ValOwn: Data + MaybeValidatingRow<(), String>,
780            > + 'static,
781        Bu: Builder<
782                Time = T,
783                Input: Container
784                           + ClearContainer
785                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
786                Output = Tr::Batch,
787            >,
788        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
789    {
790        let error_logger = self.error_logger();
791
792        let output_name = format!(
793            "ReduceInaccumulable Distinct{}",
794            name_tag.unwrap_or_default()
795        );
796
797        let input: KeyCollection<_, _, _> = input.into();
798        input
799            .mz_arrange::<
800                ColumnationChunker<_>,
801                RowBatcher<_, _>,
802                RowBuilder<_, _>,
803                RowSpine<_, _>,
804            >(
805                "Arranged ReduceInaccumulable Distinct [val: empty]",
806            )
807            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
808                if let Some(err) = Tr::ValOwn::into_error() {
809                    for (value, count) in source.iter() {
810                        if count.is_positive() {
811                            continue;
812                        }
813
814                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
815                        error_logger.log(message, &format!("value={value:?}, count={count}"));
816                        t.push((err(message.to_string()), Diff::ONE));
817                        return;
818                    }
819                }
820                t.push((Tr::ValOwn::ok(()), Diff::ONE))
821            })
822    }
823
824    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
825    /// on non-monotonic inputs.
826    ///
827    /// This function renders a single reduction tree that computes aggregations with
828    /// a priority queue implemented with a series of reduce operators that partition
829    /// the input into buckets, and compute the aggregation over very small buckets
830    /// and feed the results up to larger buckets.
831    ///
832    /// Note that this implementation currently ignores the distinct bit because we
833    /// currently only perform min / max hierarchically and the reduction tree
834    /// efficiently suppresses non-distinct updates.
835    ///
836    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
837    /// trickery here to limit the memory usage per layer by internally
838    /// holding only the elements that were rejected by this stage. However, the
839    /// output collection maintains the `((key, bucket), (passing value)` for this
840    /// stage.
841    fn build_bucketed<'s>(
842        &self,
843        input: VecCollection<'s, T, (Row, Row), Diff>,
844        BucketedPlan {
845            aggr_funcs,
846            buckets,
847        }: BucketedPlan,
848        key_arity: usize,
849        mfp_after: Option<SafeMfpPlan>,
850    ) -> (
851        RowRowArrangement<'s, T>,
852        VecCollection<'s, T, DataflowErrorSer, Diff>,
853    ) {
854        let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
855        let outer_scope = input.scope();
856        let arranged_output = outer_scope
857            .clone()
858            .region_named("ReduceHierarchical", |inner| {
859                let input = input.enter(inner);
860
861                // The first mod to apply to the hash.
862                let first_mod = buckets.get(0).copied().unwrap_or(1);
863                let aggregations = aggr_funcs.len();
864
865                // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
866                let mut stage = input.map(move |(key, row)| {
867                    let mut row_builder = SharedRow::get();
868                    let mut row_packer = row_builder.packer();
869                    row_packer.extend(row.iter().take(aggregations));
870                    let values = row_builder.clone();
871
872                    // Apply the initial mod here.
873                    let hash = values.hashed() % first_mod;
874                    let hash_key =
875                        row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
876                    (hash_key, values)
877                });
878
879                // Repeatedly apply hierarchical reduction with a progressively coarser key.
880                for (index, b) in buckets.into_iter().enumerate() {
881                    // Apply subsequent bucket mods for all but the first round.
882                    let input = if index == 0 {
883                        stage
884                    } else {
885                        stage.map(move |(hash_key, values)| {
886                            let mut hash_key_iter = hash_key.iter();
887                            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
888                            // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
889                            let hash_key = SharedRow::pack(
890                                std::iter::once(Datum::from(hash))
891                                    .chain(hash_key_iter.take(key_arity)),
892                            );
893                            (hash_key, values)
894                        })
895                    };
896
897                    // We only want the first stage to perform validation of whether invalid accumulations
898                    // were observed in the input. Subsequently, we will either produce an error in the error
899                    // stream or produce correct data in the output stream.
900                    let validating = err_output.is_none();
901
902                    let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
903                    if let Some(errs) = errs {
904                        err_output = Some(errs.leave_region(outer_scope));
905                    }
906                    stage = oks
907                }
908
909                // Discard the hash from the key and return to the format of the input data.
910                let partial = stage.map(move |(hash_key, values)| {
911                    let mut hash_key_iter = hash_key.iter();
912                    let _hash = hash_key_iter.next();
913                    (SharedRow::pack(hash_key_iter.take(key_arity)), values)
914                });
915
916                // Allocations for the two closures.
917                let mut datums1 = DatumVec::new();
918                let mut datums2 = DatumVec::new();
919                let mfp_after1 = mfp_after.clone();
920                let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
921                let aggr_funcs2 = aggr_funcs.clone();
922
923                // Build a series of stages for the reduction
924                // Arrange the final result into (key, Row)
925                let error_logger = self.error_logger();
926                // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
927                // view mz_introspection.mz_expected_group_size_advice.
928                let arranged = partial
929                    .mz_arrange::<
930                        ColumnationChunker<_>,
931                        RowRowBatcher<_, _>,
932                        RowRowBuilder<_, _>,
933                        RowRowSpine<_, _>,
934                    >(
935                        "Arrange ReduceMinsMaxes",
936                    );
937                // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
938                // but we then wouldn't be able to do this error check conditionally.  See its documentation
939                // for the rationale around using a second reduction here.
940                let must_validate = err_output.is_none();
941                if must_validate || mfp_after2.is_some() {
942                    let errs = arranged
943                        .clone()
944                        .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
945                            "ReduceMinsMaxes Error Check",
946                            move |key, source, target| {
947                                // Negative counts would be surprising, but until we are 100% certain we wont
948                                // see them, we should report when we do. We may want to bake even more info
949                                // in here in the future.
950                                if must_validate {
951                                    for (val, count) in source.iter() {
952                                        if count.is_positive() {
953                                            continue;
954                                        }
955                                        let val = val.to_row();
956                                        let message =
957                                            "Non-positive accumulation in ReduceMinsMaxes";
958                                        error_logger
959                                            .log(message, &format!("val={val:?}, count={count}"));
960                                        target.push((
961                                            EvalError::Internal(message.into()).into(),
962                                            Diff::ONE,
963                                        ));
964                                        return;
965                                    }
966                                }
967
968                                // We know that `mfp_after` can error if it exists, so try to evaluate it here.
969                                let Some(mfp) = &mfp_after2 else { return };
970                                let temp_storage = RowArena::new();
971                                let datum_iter = key.to_datum_iter();
972                                let mut datums_local = datums2.borrow();
973                                datums_local.extend(datum_iter);
974
975                                let mut source_iters = source
976                                    .iter()
977                                    .map(|(values, _cnt)| *values)
978                                    .collect::<Vec<_>>();
979                                for func in aggr_funcs2.iter() {
980                                    let column_iter = (0..source_iters.len())
981                                        .map(|i| source_iters[i].next().unwrap());
982                                    datums_local.push(func.eval(column_iter, &temp_storage));
983                                }
984                                if let Result::Err(e) =
985                                    mfp.evaluate_inner(&mut datums_local, &temp_storage)
986                                {
987                                    target.push((e.into(), Diff::ONE));
988                                }
989                            },
990                        )
991                        .as_collection(|_, v| v.clone())
992                        .leave_region(outer_scope);
993                    if let Some(e) = err_output.take() {
994                        err_output = Some(e.concat(errs));
995                    } else {
996                        err_output = Some(errs);
997                    }
998                }
999                arranged
1000                    .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1001                        "ReduceMinsMaxes",
1002                        move |key, source, target| {
1003                            let temp_storage = RowArena::new();
1004                            let datum_iter = key.to_datum_iter();
1005                            let mut datums_local = datums1.borrow();
1006                            datums_local.extend(datum_iter);
1007                            let key_len = datums_local.len();
1008
1009                            let mut source_iters = source
1010                                .iter()
1011                                .map(|(values, _cnt)| *values)
1012                                .collect::<Vec<_>>();
1013                            for func in aggr_funcs.iter() {
1014                                let column_iter = (0..source_iters.len())
1015                                    .map(|i| source_iters[i].next().unwrap());
1016                                datums_local.push(func.eval(column_iter, &temp_storage));
1017                            }
1018
1019                            if let Some(row) = evaluate_mfp_after(
1020                                &mfp_after1,
1021                                &mut datums_local,
1022                                &temp_storage,
1023                                key_len,
1024                            ) {
1025                                target.push((row, Diff::ONE));
1026                            }
1027                        },
1028                    )
1029                    .leave_region(outer_scope)
1030            });
1031        (
1032            arranged_output,
1033            err_output.expect("expected to validate in one level of the hierarchy"),
1034        )
1035    }
1036
1037    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
1038    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
1039    /// with the negation of what's produced by the reduction.
1040    /// `validating` indicates whether we want this stage to perform error detection
1041    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
1042    /// stages can skip validation.
1043    fn build_bucketed_stage<'s>(
1044        &self,
1045        aggr_funcs: &Vec<AggregateFunc>,
1046        input: VecCollection<'s, T, (Row, Row), Diff>,
1047        validating: bool,
1048    ) -> (
1049        VecCollection<'s, T, (Row, Row), Diff>,
1050        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
1051    ) {
1052        let (input, negated_output, errs) = if validating {
1053            let (input, reduced) = self
1054                .build_bucketed_negated_output::<
1055                    RowValBuilder<_, _, _>,
1056                    RowValSpine<Result<Row, Row>, _, _>,
1057                >(
1058                    input.clone(),
1059                    aggr_funcs.clone(),
1060                );
1061            let (oks, errs) = reduced
1062                .as_collection(|k, v| (k.to_row(), v.clone()))
1063                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1064                "Checked Invalid Accumulations",
1065                |(hash_key, result)| match result {
1066                    Err(hash_key) => {
1067                        let mut hash_key_iter = hash_key.iter();
1068                        let _hash = hash_key_iter.next();
1069                        let key = SharedRow::pack(hash_key_iter);
1070                        let message = format!(
1071                            "Invalid data in source, saw non-positive accumulation \
1072                                         for key {key:?} in hierarchical mins-maxes aggregate"
1073                        );
1074                        Err(EvalError::Internal(message.into()).into())
1075                    }
1076                    Ok(values) => Ok((hash_key, values)),
1077                },
1078            );
1079            (input, oks, Some(errs))
1080        } else {
1081            let (input, reduced) = self
1082                .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1083                    input,
1084                    aggr_funcs.clone(),
1085                );
1086            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1087            // that we need to apply the mod on both input and oks.
1088            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1089            (input, oks, None)
1090        };
1091
1092        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1093        let oks = negated_output.concat(input);
1094        (oks, errs)
1095    }
1096
1097    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1098    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1099    /// with all diffs in the reduction's output negated.
1100    fn build_bucketed_negated_output<'s, Bu, Tr>(
1101        &self,
1102        input: VecCollection<'s, T, (Row, Row), Diff>,
1103        aggrs: Vec<AggregateFunc>,
1104    ) -> (
1105        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1106        Arranged<'s, TraceAgent<Tr>>,
1107    )
1108    where
1109        Tr: for<'a> Trace<
1110                Key<'a> = DatumSeq<'a>,
1111                KeyContainer: BatchContainer<Owned = Row>,
1112                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1113                Time = T,
1114                Diff = Diff,
1115            > + 'static,
1116        Bu: Builder<
1117                Time = T,
1118                Input: Container
1119                           + ClearContainer
1120                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1121                Output = Tr::Batch,
1122            >,
1123        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1124    {
1125        let error_logger = self.error_logger();
1126        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1127        // view mz_introspection.mz_expected_group_size_advice.
1128        let arranged_input = input
1129            .mz_arrange::<
1130                ColumnationChunker<_>,
1131                RowRowBatcher<_, _>,
1132                RowRowBuilder<_, _>,
1133                RowRowSpine<_, _>,
1134            >(
1135                "Arranged MinsMaxesHierarchical input",
1136            );
1137
1138        let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1139            "Reduced Fallibly MinsMaxesHierarchical",
1140            move |key, source, target| {
1141                if let Some(err) = Tr::ValOwn::into_error() {
1142                    // Should negative accumulations reach us, we should loudly complain.
1143                    for (value, count) in source.iter() {
1144                        if count.is_positive() {
1145                            continue;
1146                        }
1147                        error_logger.log(
1148                            "Non-positive accumulation in MinsMaxesHierarchical",
1149                            &format!("key={key:?}, value={value:?}, count={count}"),
1150                        );
1151                        // After complaining, output an error here so that we can eventually
1152                        // report it in an error stream.
1153                        target.push((
1154                            err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1155                            Diff::ONE,
1156                        ));
1157                        return;
1158                    }
1159                }
1160
1161                let mut row_builder = SharedRow::get();
1162                let mut row_packer = row_builder.packer();
1163
1164                let mut source_iters = source
1165                    .iter()
1166                    .map(|(values, _cnt)| *values)
1167                    .collect::<Vec<_>>();
1168                for func in aggrs.iter() {
1169                    let column_iter =
1170                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1171                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1172                }
1173                // We only want to arrange the parts of the input that are not part of the output.
1174                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1175                // gives us the intended value of this aggregate function. Also we assume that regardless
1176                // of the multiplicity of the final result in the input, we only want to have one copy
1177                // in the output.
1178                target.reserve(source.len().saturating_add(1));
1179                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1180                target.extend(source.iter().map(|(values, cnt)| {
1181                    let mut cnt = *cnt;
1182                    cnt.negate();
1183                    (Tr::ValOwn::ok(values.to_row()), cnt)
1184                }));
1185            },
1186        );
1187        (arranged_input, reduced)
1188    }
1189
1190    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1191    /// on monotonic inputs.
1192    fn build_monotonic<'s>(
1193        &self,
1194        collection: VecCollection<'s, T, (Row, Row), Diff>,
1195        MonotonicPlan {
1196            aggr_funcs,
1197            must_consolidate,
1198        }: MonotonicPlan,
1199        mfp_after: Option<SafeMfpPlan>,
1200    ) -> (
1201        RowRowArrangement<'s, T>,
1202        VecCollection<'s, T, DataflowErrorSer, Diff>,
1203    ) {
1204        let aggregations = aggr_funcs.len();
1205        // Gather the relevant values into a vec of rows ordered by aggregation_index
1206        let collection = collection
1207            .map(move |(key, row)| {
1208                let mut row_builder = SharedRow::get();
1209                let mut values = Vec::with_capacity(aggregations);
1210                values.extend(
1211                    row.iter()
1212                        .take(aggregations)
1213                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1214                );
1215
1216                (key, values)
1217            })
1218            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1219                must_consolidate,
1220                "Consolidated ReduceMonotonic input",
1221            );
1222
1223        // It should be now possible to ensure that we have a monotonic collection.
1224        let error_logger = self.error_logger();
1225        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1226            error_logger.log(
1227                "Non-monotonic input to ReduceMonotonic",
1228                &format!("data={data:?}, diff={diff}"),
1229            );
1230            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1231            (EvalError::Internal(m).into(), Diff::ONE)
1232        });
1233        // We can place our rows directly into the diff field, and
1234        // only keep the relevant one corresponding to evaluating our
1235        // aggregate, instead of having to do a hierarchical reduction.
1236        let partial = partial.explode_one(move |(key, values)| {
1237            let mut output = Vec::new();
1238            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1239                output.push(monoids::get_monoid(row, func).expect(
1240                    "hierarchical aggregations are expected to have monoid implementations",
1241                ));
1242            }
1243            (key, output)
1244        });
1245
1246        // Allocations for the two closures.
1247        let mut datums1 = DatumVec::new();
1248        let mut datums2 = DatumVec::new();
1249        let mfp_after1 = mfp_after.clone();
1250        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1251
1252        let partial: KeyCollection<_, _, _> = partial.into();
1253        let arranged = partial
1254            .mz_arrange::<
1255                ColumnationChunker<_>,
1256                RowBatcher<_, _>,
1257                RowBuilder<_, _>,
1258                RowSpine<_, Vec<ReductionMonoid>>,
1259            >(
1260                "ArrangeMonotonic [val: empty]",
1261            );
1262        let output = arranged
1263            .clone()
1264            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1265                move |key, input, output| {
1266                    let temp_storage = RowArena::new();
1267                    let datum_iter = key.to_datum_iter();
1268                    let mut datums_local = datums1.borrow();
1269                    datums_local.extend(datum_iter);
1270                    let key_len = datums_local.len();
1271                    let accum = &input[0].1;
1272                    for monoid in accum.iter() {
1273                        datums_local.extend(monoid.finalize().iter());
1274                    }
1275
1276                    if let Some(row) =
1277                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1278                    {
1279                        output.push((row, Diff::ONE));
1280                    }
1281                }
1282            });
1283
1284        // If `mfp_after` can error, then we need to render a paired reduction
1285        // to scan for these potential errors. Note that we cannot directly use
1286        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1287        // conditionally render the second component of the reduction pair.
1288        if let Some(mfp) = mfp_after2 {
1289            let mfp_errs = arranged
1290                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1291                    "ReduceMonotonic Error Check",
1292                    move |key, input, output| {
1293                        let temp_storage = RowArena::new();
1294                        let datum_iter = key.to_datum_iter();
1295                        let mut datums_local = datums2.borrow();
1296                        datums_local.extend(datum_iter);
1297                        let accum = &input[0].1;
1298                        for monoid in accum.iter() {
1299                            datums_local.extend(monoid.finalize().iter());
1300                        }
1301                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1302                        {
1303                            output.push((e.into(), Diff::ONE));
1304                        }
1305                    },
1306                )
1307                .as_collection(|_k, v| v.clone());
1308            (output, validation_errs.concat(mfp_errs))
1309        } else {
1310            (output, validation_errs)
1311        }
1312    }
1313
1314    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1315    ///
1316    /// The incoming values are moved to the update's "difference" field, at which point
1317    /// they can be accumulated in place. The `count` operator promotes the accumulated
1318    /// values to data, at which point a final map applies operator-specific logic to
1319    /// yield the final aggregate.
1320    fn build_accumulable<'s>(
1321        &self,
1322        collection: VecCollection<'s, T, (Row, Row), Diff>,
1323        AccumulablePlan {
1324            full_aggrs,
1325            simple_aggrs,
1326            distinct_aggrs,
1327        }: AccumulablePlan,
1328        key_arity: usize,
1329        mfp_after: Option<SafeMfpPlan>,
1330    ) -> (
1331        RowRowArrangement<'s, T>,
1332        VecCollection<'s, T, DataflowErrorSer, Diff>,
1333    ) {
1334        let collection_scope = collection.scope();
1335
1336        // we must have called this function with something to reduce
1337        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1338            self.error_logger().soft_panic_or_log(
1339                "Incorrect numbers of aggregates in accummulable reduction rendering",
1340                &format!(
1341                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1342                    full_aggrs.len(),
1343                    simple_aggrs.len(),
1344                    distinct_aggrs.len(),
1345                ),
1346            );
1347        }
1348
1349        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1350        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1351        // Other aggregations can be directly moved in to the `diff` field.
1352        //
1353        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1354        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1355        // generally the count, and then two aggregation-specific values. The size could be
1356        // reduced if we want to specialize for the aggregations.
1357
1358        // Instantiate a default vector for diffs with the correct types at each
1359        // position.
1360        let zero_diffs: (Vec<_>, Diff) = (
1361            full_aggrs
1362                .iter()
1363                .map(|f| accumulable_zero(&f.func))
1364                .collect(),
1365            Diff::ZERO,
1366        );
1367
1368        let mut to_aggregate = Vec::new();
1369        if simple_aggrs.len() > 0 {
1370            // First, collect all non-distinct aggregations in one pass.
1371            let collection = collection.clone();
1372            let easy_cases = collection.explode_one({
1373                let zero_diffs = zero_diffs.clone();
1374                move |(key, row)| {
1375                    let mut diffs = zero_diffs.clone();
1376                    // Try to unpack only the datums we need. Unfortunately, since we
1377                    // can't random access into a Row, we have to iterate through one by one.
1378                    // TODO: Even though we don't have random access, we could still avoid unpacking
1379                    // everything that we don't care about, and it might be worth it to extend the
1380                    // Row API to do that.
1381                    let mut row_iter = row.iter().enumerate();
1382                    for (datum_index, aggr) in simple_aggrs.iter() {
1383                        let mut datum = row_iter.next().unwrap();
1384                        while datum_index != &datum.0 {
1385                            datum = row_iter.next().unwrap();
1386                        }
1387                        let datum = datum.1;
1388                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1389                        diffs.1 = Diff::ONE;
1390                    }
1391                    ((key, ()), diffs)
1392                }
1393            });
1394            to_aggregate.push(easy_cases);
1395        }
1396
1397        // Next, collect all aggregations that require distinctness.
1398        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1399            let pairer = Pairer::new(key_arity);
1400            let collection = collection
1401                .clone()
1402                .map(move |(key, row)| {
1403                    let value = row.iter().nth(datum_index).unwrap();
1404                    (pairer.merge(&key, std::iter::once(value)), ())
1405                })
1406                .mz_arrange::<
1407                    ColumnationChunker<_>,
1408                    RowBatcher<_, _>,
1409                    RowBuilder<_, _>,
1410                    RowSpine<_, _>,
1411                >(
1412                    "Arranged Accumulable Distinct [val: empty]",
1413                )
1414                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1415                    "Reduced Accumulable Distinct [val: empty]",
1416                    move |_k, _s, t| t.push(((), Diff::ONE)),
1417                )
1418                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1419                .explode_one({
1420                    let zero_diffs = zero_diffs.clone();
1421                    move |(key, row)| {
1422                        let datum = row.iter().next().unwrap();
1423                        let mut diffs = zero_diffs.clone();
1424                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1425                        diffs.1 = Diff::ONE;
1426                        ((key, ()), diffs)
1427                    }
1428                });
1429            to_aggregate.push(collection);
1430        }
1431
1432        // now concatenate, if necessary, multiple aggregations
1433        let collection = if to_aggregate.len() == 1 {
1434            to_aggregate.remove(0)
1435        } else {
1436            differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1437        };
1438
1439        // Allocations for the two closures.
1440        let mut datums1 = DatumVec::new();
1441        let mut datums2 = DatumVec::new();
1442        let mfp_after1 = mfp_after.clone();
1443        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1444        let full_aggrs2 = full_aggrs.clone();
1445
1446        let error_logger = self.error_logger();
1447        let err_full_aggrs = full_aggrs.clone();
1448        let arranged = collection
1449            .mz_arrange::<
1450                ColumnationChunker<_>,
1451                RowBatcher<_, _>,
1452                RowBuilder<_, _>,
1453                RowSpine<_, (Vec<Accum>, Diff)>,
1454            >(
1455                "ArrangeAccumulable [val: empty]",
1456            );
1457        let arranged_output = arranged
1458            .clone()
1459            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1460                move |key, input, output| {
1461                    let (ref accums, total) = input[0].1;
1462
1463                    let temp_storage = RowArena::new();
1464                    let datum_iter = key.to_datum_iter();
1465                    let mut datums_local = datums1.borrow();
1466                    datums_local.extend(datum_iter);
1467                    let key_len = datums_local.len();
1468                    for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1469                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1470                    }
1471
1472                    if let Some(row) =
1473                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1474                    {
1475                        output.push((row, Diff::ONE));
1476                    }
1477                }
1478            });
1479        let arranged_errs = arranged
1480            .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1481                "AccumulableErrorCheck",
1482                move |key, input, output| {
1483                    let (ref accums, total) = input[0].1;
1484                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1485                        // We first test here if inputs without net-positive records are present,
1486                        // producing an error to the logs and to the query output if that is the case.
1487                        if total == Diff::ZERO && !accum.is_zero() {
1488                            error_logger.log(
1489                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1490                                &format!("aggr={aggr:?}, accum={accum:?}"),
1491                            );
1492                            let key = key.to_row();
1493                            let message = format!(
1494                                "Invalid data in source, saw net-zero records for key {key} \
1495                                 with non-zero accumulation in accumulable aggregate"
1496                            );
1497                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1498                        }
1499                        match (&aggr.func, &accum) {
1500                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1501                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1502                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1503                                if accum.is_negative() {
1504                                    error_logger.log(
1505                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1506                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1507                                );
1508                                    let key = key.to_row();
1509                                    let message = format!(
1510                                        "Invalid data in source, saw negative accumulation with \
1511                                         unsigned type for key {key}"
1512                                    );
1513                                    let err = EvalError::Internal(message.into());
1514                                    output.push((err.into(), Diff::ONE));
1515                                }
1516                            }
1517                            _ => (), // no more errors to check for at this point!
1518                        }
1519                    }
1520
1521                    // If `mfp_after` can error, then evaluate it here.
1522                    let Some(mfp) = &mfp_after2 else { return };
1523                    let temp_storage = RowArena::new();
1524                    let datum_iter = key.to_datum_iter();
1525                    let mut datums_local = datums2.borrow();
1526                    datums_local.extend(datum_iter);
1527                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1528                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1529                    }
1530
1531                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1532                        output.push((e.into(), Diff::ONE));
1533                    }
1534                },
1535            );
1536        (
1537            arranged_output,
1538            arranged_errs.as_collection(|_key, error| error.clone()),
1539        )
1540    }
1541}
1542
1543/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1544/// containing key and aggregate values, then returns a result `Row` or `None`
1545/// if the MFP filters the result out.
1546fn evaluate_mfp_after<'a, 'b>(
1547    mfp_after: &'a Option<SafeMfpPlan>,
1548    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1549    temp_storage: &'a RowArena,
1550    key_len: usize,
1551) -> Option<Row> {
1552    let mut row_builder = SharedRow::get();
1553    // Apply MFP if it exists and pack a Row of
1554    // aggregate values from `datums_local`.
1555    if let Some(mfp) = mfp_after {
1556        // It must ignore errors here, but they are scanned
1557        // for elsewhere if the MFP can error.
1558        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1559            // The `mfp_after` must preserve the key columns,
1560            // so we can skip them to form aggregation results.
1561            Some(row_builder.pack_using(iter.skip(key_len)))
1562        } else {
1563            None
1564        }
1565    } else {
1566        Some(row_builder.pack_using(&datums_local[key_len..]))
1567    }
1568}
1569
1570fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1571    match aggr_func {
1572        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1573            trues: Diff::ZERO,
1574            falses: Diff::ZERO,
1575        },
1576        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1577            accum: AccumCount::ZERO,
1578            pos_infs: Diff::ZERO,
1579            neg_infs: Diff::ZERO,
1580            nans: Diff::ZERO,
1581            non_nulls: Diff::ZERO,
1582        },
1583        AggregateFunc::SumNumeric => Accum::Numeric {
1584            accum: OrderedDecimal(NumericAgg::zero()),
1585            pos_infs: Diff::ZERO,
1586            neg_infs: Diff::ZERO,
1587            nans: Diff::ZERO,
1588            non_nulls: Diff::ZERO,
1589        },
1590        _ => Accum::SimpleNumber {
1591            accum: AccumCount::ZERO,
1592            non_nulls: Diff::ZERO,
1593        },
1594    }
1595}
1596
1597/// The number of fractional bits of binary precision retained by the
1598/// fixed-point representation used to accumulate float sums. The fixed-point
1599/// scale is `FLOAT_SCALE == 2^FLOAT_SCALE_EXP`.
1600const FLOAT_SCALE_EXP: u32 = 24;
1601
1602/// The fixed-point scale applied to float sums, i.e. `2^FLOAT_SCALE_EXP`.
1603#[allow(clippy::as_conversions)] // Integer-to-float cast, exact and const-evaluable.
1604const FLOAT_SCALE: f64 = (1_u64 << FLOAT_SCALE_EXP) as f64;
1605
1606/// Maps a finite `f64` onto the fixed-point `i128` domain used to accumulate
1607/// float sums, i.e. computes `trunc(n * FLOAT_SCALE)` reduced modulo `2^128`.
1608///
1609/// Conceptually this multiplies `n` by `FLOAT_SCALE` and truncates towards zero,
1610/// but it does so using *wrapping* (modulo `2^128`) rather than *saturating*
1611/// semantics, and it never forms the intermediate product `n * FLOAT_SCALE` as
1612/// an `f64` (which could itself overflow to infinity for very large `n`).
1613///
1614/// Wrapping is what makes this conversion a group homomorphism into the additive
1615/// group of `i128` (mod `2^128`), matching the wrapping arithmetic used when
1616/// accumulators are combined and retracted. As a result, a set of large finite
1617/// values whose *sum* is representable produces the correct result even when the
1618/// individual values fall outside the representable fixed-point range. Saturating
1619/// instead breaks this: e.g. `1.1e31` and `-1.1e31` both overflow the domain and
1620/// would saturate to `i128::MAX` and `i128::MIN`, which sum to `-1` rather than
1621/// `0` (see database-issues#11265).
1622fn float_to_fixed_point(n: f64) -> i128 {
1623    debug_assert!(n.is_finite());
1624
1625    // Decompose `n` into integer parts such that `n == sign * mantissa *
1626    // 2^exponent`. Folding in the `* 2^FLOAT_SCALE_EXP` scaling then amounts to
1627    // shifting `mantissa` left by `exponent + FLOAT_SCALE_EXP` bits.
1628    let (mantissa, exponent, sign) = Float::integer_decode(n);
1629    let significand = u128::from(mantissa);
1630    let exp = i64::from(exponent) + i64::from(FLOAT_SCALE_EXP);
1631
1632    let magnitude: u128 = if exp >= 0 {
1633        // Left shifts of 128 or more bits leave nothing within the 128-bit
1634        // window; smaller shifts keep only the low 128 bits (i.e. mod `2^128`).
1635        match u32::try_from(exp) {
1636            Ok(shift) if shift < 128 => significand << shift,
1637            _ => 0,
1638        }
1639    } else {
1640        // Right shift truncates the fractional part towards zero. Subnormals
1641        // (and zero) shift entirely out of the window and become zero.
1642        match u32::try_from(-exp) {
1643            Ok(shift) if shift < 128 => significand >> shift,
1644            _ => 0,
1645        }
1646    };
1647
1648    // Reinterpret the magnitude as a signed `i128` (wrapping into the signed
1649    // domain) and apply the sign of `n`.
1650    let magnitude = magnitude.cast_signed();
1651    if sign < 0 {
1652        magnitude.wrapping_neg()
1653    } else {
1654        magnitude
1655    }
1656}
1657
1658fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1659    match aggregate_func {
1660        AggregateFunc::Count => Accum::SimpleNumber {
1661            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1662            non_nulls: if datum.is_null() {
1663                Diff::ZERO
1664            } else {
1665                Diff::ONE
1666            },
1667        },
1668        AggregateFunc::Any | AggregateFunc::All => match datum {
1669            Datum::True => Accum::Bool {
1670                trues: Diff::ONE,
1671                falses: Diff::ZERO,
1672            },
1673            Datum::Null => Accum::Bool {
1674                trues: Diff::ZERO,
1675                falses: Diff::ZERO,
1676            },
1677            Datum::False => Accum::Bool {
1678                trues: Diff::ZERO,
1679                falses: Diff::ONE,
1680            },
1681            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1682        },
1683        AggregateFunc::Dummy => match datum {
1684            Datum::Dummy => Accum::SimpleNumber {
1685                accum: AccumCount::ZERO,
1686                non_nulls: Diff::ZERO,
1687            },
1688            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1689        },
1690        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1691            let n = match datum {
1692                Datum::Float32(n) => f64::from(*n),
1693                Datum::Float64(n) => *n,
1694                Datum::Null => 0f64,
1695                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1696            };
1697
1698            let nans = Diff::from(n.is_nan());
1699            let pos_infs = Diff::from(n == f64::INFINITY);
1700            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1701            let non_nulls = Diff::from(datum != Datum::Null);
1702
1703            // Map the floating point value onto a fixed precision domain
1704            // All special values should map to zero, since they are tracked separately
1705            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1706                AccumCount::ZERO
1707            } else {
1708                // Wrap (rather than saturate) on overflow, so that the mapping is
1709                // a group homomorphism and large finite values whose sum is in
1710                // range still produce correct results (database-issues#11265).
1711                float_to_fixed_point(n).into()
1712            };
1713
1714            Accum::Float {
1715                accum,
1716                pos_infs,
1717                neg_infs,
1718                nans,
1719                non_nulls,
1720            }
1721        }
1722        AggregateFunc::SumNumeric => match datum {
1723            Datum::Numeric(n) => {
1724                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1725                    if n.0.is_negative() {
1726                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1727                    } else {
1728                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1729                    }
1730                } else if n.0.is_nan() {
1731                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1732                } else {
1733                    // Take a narrow decimal (datum) into a wide decimal
1734                    // (aggregator).
1735                    let mut cx_agg = numeric::cx_agg();
1736                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1737                };
1738
1739                Accum::Numeric {
1740                    accum: OrderedDecimal(accum),
1741                    pos_infs,
1742                    neg_infs,
1743                    nans,
1744                    non_nulls: Diff::ONE,
1745                }
1746            }
1747            Datum::Null => Accum::Numeric {
1748                accum: OrderedDecimal(NumericAgg::zero()),
1749                pos_infs: Diff::ZERO,
1750                neg_infs: Diff::ZERO,
1751                nans: Diff::ZERO,
1752                non_nulls: Diff::ZERO,
1753            },
1754            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1755        },
1756        _ => {
1757            // Other accumulations need to disentangle the accumulable
1758            // value from its NULL-ness, which is not quite as easily
1759            // accumulated.
1760            match datum {
1761                Datum::Int16(i) => Accum::SimpleNumber {
1762                    accum: i.into(),
1763                    non_nulls: Diff::ONE,
1764                },
1765                Datum::Int32(i) => Accum::SimpleNumber {
1766                    accum: i.into(),
1767                    non_nulls: Diff::ONE,
1768                },
1769                Datum::Int64(i) => Accum::SimpleNumber {
1770                    accum: i.into(),
1771                    non_nulls: Diff::ONE,
1772                },
1773                Datum::UInt16(u) => Accum::SimpleNumber {
1774                    accum: u.into(),
1775                    non_nulls: Diff::ONE,
1776                },
1777                Datum::UInt32(u) => Accum::SimpleNumber {
1778                    accum: u.into(),
1779                    non_nulls: Diff::ONE,
1780                },
1781                Datum::UInt64(u) => Accum::SimpleNumber {
1782                    accum: u.into(),
1783                    non_nulls: Diff::ONE,
1784                },
1785                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1786                    accum: u64::from(t).into(),
1787                    non_nulls: Diff::ONE,
1788                },
1789                Datum::Null => Accum::SimpleNumber {
1790                    accum: AccumCount::ZERO,
1791                    non_nulls: Diff::ZERO,
1792                },
1793                x => panic!("Accumulating non-integer data: {x:?}"),
1794            }
1795        }
1796    }
1797}
1798
1799fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1800    // The finished value depends on the aggregation function in a variety of ways.
1801    // For all aggregates but count, if only null values were
1802    // accumulated, then the output is null.
1803    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1804        Datum::Null
1805    } else {
1806        match (&aggr_func, &accum) {
1807            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1808                Datum::Int64(non_nulls.into_inner())
1809            }
1810            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1811                // If any false, else if all true, else must be no false and some nulls.
1812                if falses.is_positive() {
1813                    Datum::False
1814                } else if *trues == total {
1815                    Datum::True
1816                } else {
1817                    Datum::Null
1818                }
1819            }
1820            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1821                // If any true, else if all false, else must be no true and some nulls.
1822                if trues.is_positive() {
1823                    Datum::True
1824                } else if *falses == total {
1825                    Datum::False
1826                } else {
1827                    Datum::Null
1828                }
1829            }
1830            (AggregateFunc::Dummy, _) => Datum::Dummy,
1831            // If any non-nulls, just report the aggregate.
1832            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1833            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1834                // This conversion is safe, as long as we have less than 2^32
1835                // summands.
1836                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1837                // If so, rewrite to avoid `as`.
1838                #[allow(clippy::as_conversions)]
1839                Datum::Int64(accum.into_inner() as i64)
1840            }
1841            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1842            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1843            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1844                if !accum.is_negative() {
1845                    // Our semantics of overflow are not clearly articulated wrt.
1846                    // unsigned vs. signed types (database-issues#5172). We adopt an
1847                    // unsigned wrapping behavior to match what we do above for
1848                    // signed types.
1849                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1850                    #[allow(clippy::as_conversions)]
1851                    Datum::UInt64(accum.into_inner() as u64)
1852                } else {
1853                    // Note that we return a value here, but an error in the other
1854                    // operator of the reduce_pair. Therefore, we expect that this
1855                    // value will never be exposed as an output.
1856                    Datum::Null
1857                }
1858            }
1859            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1860                if !accum.is_negative() {
1861                    Datum::from(*accum)
1862                } else {
1863                    // Note that we return a value here, but an error in the other
1864                    // operator of the reduce_pair. Therefore, we expect that this
1865                    // value will never be exposed as an output.
1866                    Datum::Null
1867                }
1868            }
1869            (
1870                AggregateFunc::SumFloat32,
1871                Accum::Float {
1872                    accum,
1873                    pos_infs,
1874                    neg_infs,
1875                    nans,
1876                    non_nulls: _,
1877                },
1878            ) => {
1879                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1880                    // NaNs are NaNs and cases where we've seen a
1881                    // mixture of positive and negative infinities.
1882                    Datum::from(f32::NAN)
1883                } else if pos_infs.is_positive() {
1884                    Datum::from(f32::INFINITY)
1885                } else if neg_infs.is_positive() {
1886                    Datum::from(f32::NEG_INFINITY)
1887                } else {
1888                    let sum = f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE;
1889                    Datum::from(f32::cast_lossy(sum))
1890                }
1891            }
1892            (
1893                AggregateFunc::SumFloat64,
1894                Accum::Float {
1895                    accum,
1896                    pos_infs,
1897                    neg_infs,
1898                    nans,
1899                    non_nulls: _,
1900                },
1901            ) => {
1902                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1903                    // NaNs are NaNs and cases where we've seen a
1904                    // mixture of positive and negative infinities.
1905                    Datum::from(f64::NAN)
1906                } else if pos_infs.is_positive() {
1907                    Datum::from(f64::INFINITY)
1908                } else if neg_infs.is_positive() {
1909                    Datum::from(f64::NEG_INFINITY)
1910                } else {
1911                    Datum::from(f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE)
1912                }
1913            }
1914            (
1915                AggregateFunc::SumNumeric,
1916                Accum::Numeric {
1917                    accum,
1918                    pos_infs,
1919                    neg_infs,
1920                    nans,
1921                    non_nulls: _,
1922                },
1923            ) => {
1924                let mut cx_datum = numeric::cx_datum();
1925                let d = cx_datum.to_width(accum.0);
1926                // Take a wide decimal (aggregator) into a
1927                // narrow decimal (datum). If this operation
1928                // overflows the datum, this new value will be
1929                // +/- infinity. However, the aggregator tracks
1930                // the amount of overflow, making it invertible.
1931                let inf_d = d.is_infinite();
1932                let neg_d = d.is_negative();
1933                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1934                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1935                if nans.is_positive() || (pos_inf && neg_inf) {
1936                    // NaNs are NaNs and cases where we've seen a
1937                    // mixture of positive and negative infinities.
1938                    Datum::from(Numeric::nan())
1939                } else if pos_inf {
1940                    Datum::from(Numeric::infinity())
1941                } else if neg_inf {
1942                    let mut cx = numeric::cx_datum();
1943                    let mut d = Numeric::infinity();
1944                    cx.neg(&mut d);
1945                    Datum::from(d)
1946                } else {
1947                    Datum::from(d)
1948                }
1949            }
1950            _ => panic!(
1951                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1952                aggr_func
1953            ),
1954        }
1955    }
1956}
1957
1958/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1959type AccumCount = mz_ore::Overflowing<i128>;
1960
1961/// Accumulates values for the various types of accumulable aggregations.
1962///
1963/// We assume that there are not more than 2^32 elements for the aggregation.
1964/// Thus we can perform a summation over i32 in an i64 accumulator
1965/// and not worry about exceeding its bounds.
1966///
1967/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1968/// point representation has less precision than a double. It is entirely possible
1969/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1970/// to preserve group guarantees.
1971#[derive(
1972    Debug,
1973    Clone,
1974    Copy,
1975    PartialEq,
1976    Eq,
1977    PartialOrd,
1978    Ord,
1979    Serialize,
1980    Deserialize
1981)]
1982enum Accum {
1983    /// Accumulates boolean values.
1984    Bool {
1985        /// The number of `true` values observed.
1986        trues: Diff,
1987        /// The number of `false` values observed.
1988        falses: Diff,
1989    },
1990    /// Accumulates simple numeric values.
1991    SimpleNumber {
1992        /// The accumulation of all non-NULL values observed.
1993        accum: AccumCount,
1994        /// The number of non-NULL values observed.
1995        non_nulls: Diff,
1996    },
1997    /// Accumulates float values.
1998    Float {
1999        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
2000        /// preserve associativity and commutativity
2001        accum: AccumCount,
2002        /// Counts +inf
2003        pos_infs: Diff,
2004        /// Counts -inf
2005        neg_infs: Diff,
2006        /// Counts NaNs
2007        nans: Diff,
2008        /// Counts non-NULL values
2009        non_nulls: Diff,
2010    },
2011    /// Accumulates arbitrary precision decimals.
2012    Numeric {
2013        /// Accumulates non-special values
2014        accum: OrderedDecimal<NumericAgg>,
2015        /// Counts +inf
2016        pos_infs: Diff,
2017        /// Counts -inf
2018        neg_infs: Diff,
2019        /// Counts NaNs
2020        nans: Diff,
2021        /// Counts non-NULL values
2022        non_nulls: Diff,
2023    },
2024}
2025
2026impl IsZero for Accum {
2027    fn is_zero(&self) -> bool {
2028        match self {
2029            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2030            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2031            Accum::Float {
2032                accum,
2033                pos_infs,
2034                neg_infs,
2035                nans,
2036                non_nulls,
2037            } => {
2038                accum.is_zero()
2039                    && pos_infs.is_zero()
2040                    && neg_infs.is_zero()
2041                    && nans.is_zero()
2042                    && non_nulls.is_zero()
2043            }
2044            Accum::Numeric {
2045                accum,
2046                pos_infs,
2047                neg_infs,
2048                nans,
2049                non_nulls,
2050            } => {
2051                accum.0.is_zero()
2052                    && pos_infs.is_zero()
2053                    && neg_infs.is_zero()
2054                    && nans.is_zero()
2055                    && non_nulls.is_zero()
2056            }
2057        }
2058    }
2059}
2060
2061impl Semigroup for Accum {
2062    fn plus_equals(&mut self, other: &Accum) {
2063        match (&mut *self, other) {
2064            (
2065                Accum::Bool { trues, falses },
2066                Accum::Bool {
2067                    trues: other_trues,
2068                    falses: other_falses,
2069                },
2070            ) => {
2071                *trues += other_trues;
2072                *falses += other_falses;
2073            }
2074            (
2075                Accum::SimpleNumber { accum, non_nulls },
2076                Accum::SimpleNumber {
2077                    accum: other_accum,
2078                    non_nulls: other_non_nulls,
2079                },
2080            ) => {
2081                *accum += other_accum;
2082                *non_nulls += other_non_nulls;
2083            }
2084            (
2085                Accum::Float {
2086                    accum,
2087                    pos_infs,
2088                    neg_infs,
2089                    nans,
2090                    non_nulls,
2091                },
2092                Accum::Float {
2093                    accum: other_accum,
2094                    pos_infs: other_pos_infs,
2095                    neg_infs: other_neg_infs,
2096                    nans: other_nans,
2097                    non_nulls: other_non_nulls,
2098                },
2099            ) => {
2100                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2101                    warn!("Float accumulator overflow. Incorrect results possible");
2102                    accum.wrapping_add(*other_accum)
2103                });
2104                *pos_infs += other_pos_infs;
2105                *neg_infs += other_neg_infs;
2106                *nans += other_nans;
2107                *non_nulls += other_non_nulls;
2108            }
2109            (
2110                Accum::Numeric {
2111                    accum,
2112                    pos_infs,
2113                    neg_infs,
2114                    nans,
2115                    non_nulls,
2116                },
2117                Accum::Numeric {
2118                    accum: other_accum,
2119                    pos_infs: other_pos_infs,
2120                    neg_infs: other_neg_infs,
2121                    nans: other_nans,
2122                    non_nulls: other_non_nulls,
2123                },
2124            ) => {
2125                let mut cx_agg = numeric::cx_agg();
2126                cx_agg.add(&mut accum.0, &other_accum.0);
2127                // `rounded` signals we have exceeded the aggregator's max
2128                // precision, which means we've lost commutativity and
2129                // associativity; nothing to be done here, so panic. For more
2130                // context, see the DEC_Rounded definition at
2131                // http://speleotrove.com/decimal/dncont.html
2132                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2133                // Reduce to reclaim unused decimal precision. Note that this
2134                // reduction must happen somewhere to make the following
2135                // invertible:
2136                // ```
2137                // CREATE TABLE a (a numeric);
2138                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2139                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2140                // ```
2141                // This will now return infinity. However, we can retract the
2142                // value that blew up its precision:
2143                // ```
2144                // INSERT INTO a VALUES ('-9e-39');
2145                // ```
2146                // This leaves `t`'s aggregator with a value of 9e39. However,
2147                // without doing a reduction, `libdecnum` will store the value
2148                // as 9e39+0e-39, which still exceeds the narrower context's
2149                // precision. By doing the reduction, we can "reclaim" the 39
2150                // digits of precision.
2151                cx_agg.reduce(&mut accum.0);
2152                *pos_infs += other_pos_infs;
2153                *neg_infs += other_neg_infs;
2154                *nans += other_nans;
2155                *non_nulls += other_non_nulls;
2156            }
2157            (l, r) => unreachable!(
2158                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2159            ),
2160        }
2161    }
2162}
2163
2164impl Multiply<Diff> for Accum {
2165    type Output = Accum;
2166
2167    fn multiply(self, factor: &Diff) -> Accum {
2168        let factor = *factor;
2169        match self {
2170            Accum::Bool { trues, falses } => Accum::Bool {
2171                trues: trues * factor,
2172                falses: falses * factor,
2173            },
2174            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2175                accum: accum * AccumCount::from(factor),
2176                non_nulls: non_nulls * factor,
2177            },
2178            Accum::Float {
2179                accum,
2180                pos_infs,
2181                neg_infs,
2182                nans,
2183                non_nulls,
2184            } => Accum::Float {
2185                accum: accum
2186                    .checked_mul(AccumCount::from(factor))
2187                    .unwrap_or_else(|| {
2188                        warn!("Float accumulator overflow. Incorrect results possible");
2189                        accum.wrapping_mul(AccumCount::from(factor))
2190                    }),
2191                pos_infs: pos_infs * factor,
2192                neg_infs: neg_infs * factor,
2193                nans: nans * factor,
2194                non_nulls: non_nulls * factor,
2195            },
2196            Accum::Numeric {
2197                accum,
2198                pos_infs,
2199                neg_infs,
2200                nans,
2201                non_nulls,
2202            } => {
2203                let mut cx = numeric::cx_agg();
2204                let mut f = NumericAgg::from(factor.into_inner());
2205                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2206                // always be an integer, i.e. we are never increasing the
2207                // values' scale.
2208                cx.mul(&mut f, &accum.0);
2209                // `rounded` signals we have exceeded the aggregator's max
2210                // precision, which means we've lost commutativity and
2211                // associativity; nothing to be done here, so panic. For more
2212                // context, see the DEC_Rounded definition at
2213                // http://speleotrove.com/decimal/dncont.html
2214                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2215                Accum::Numeric {
2216                    accum: OrderedDecimal(f),
2217                    pos_infs: pos_infs * factor,
2218                    neg_infs: neg_infs * factor,
2219                    nans: nans * factor,
2220                    non_nulls: non_nulls * factor,
2221                }
2222            }
2223        }
2224    }
2225}
2226
2227impl Columnation for Accum {
2228    type InnerRegion = CopyRegion<Self>;
2229}
2230
2231/// Monoids for in-place compaction of monotonic streams.
2232mod monoids {
2233
2234    // We can improve the performance of some aggregations through the use of algebra.
2235    // In particular, we can move some of the aggregations in to the `diff` field of
2236    // updates, by changing `diff` from integers to a different algebraic structure.
2237    //
2238    // The one we use is called a "semigroup", and it means that the structure has a
2239    // symmetric addition operator. The trait we use also allows the semigroup elements
2240    // to present as "zero", meaning they always act as the identity under +. Here,
2241    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2242    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2243    // of DD, this Semigroup should _not_ have a zero.
2244    //
2245    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2246    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2247    // assumes this.
2248
2249    use columnation::{Columnation, Region};
2250    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2251    use mz_expr::AggregateFunc;
2252    use mz_ore::soft_panic_or_log;
2253    use mz_repr::{Datum, Diff, Row};
2254    use serde::{Deserialize, Serialize};
2255
2256    /// A monoid containing a single-datum row.
2257    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2258    pub enum ReductionMonoid {
2259        Min(Row),
2260        Max(Row),
2261    }
2262
2263    impl ReductionMonoid {
2264        pub fn finalize(&self) -> &Row {
2265            use ReductionMonoid::*;
2266            match self {
2267                Min(row) | Max(row) => row,
2268            }
2269        }
2270    }
2271
2272    impl Clone for ReductionMonoid {
2273        fn clone(&self) -> Self {
2274            use ReductionMonoid::*;
2275            match self {
2276                Min(row) => Min(row.clone()),
2277                Max(row) => Max(row.clone()),
2278            }
2279        }
2280
2281        fn clone_from(&mut self, source: &Self) {
2282            use ReductionMonoid::*;
2283
2284            let mut row = std::mem::take(match self {
2285                Min(row) | Max(row) => row,
2286            });
2287
2288            let source_row = match source {
2289                Min(row) | Max(row) => row,
2290            };
2291
2292            row.clone_from(source_row);
2293
2294            match source {
2295                Min(_) => *self = Min(row),
2296                Max(_) => *self = Max(row),
2297            }
2298        }
2299    }
2300
2301    impl Multiply<Diff> for ReductionMonoid {
2302        type Output = Self;
2303
2304        fn multiply(self, factor: &Diff) -> Self {
2305            // Multiplication in ReductionMonoid is idempotent, and
2306            // its users must ascertain its monotonicity beforehand
2307            // (typically with ensure_monotonic) since it has no zero
2308            // value for us to use here.
2309            assert!(factor.is_positive());
2310            self
2311        }
2312    }
2313
2314    impl Semigroup for ReductionMonoid {
2315        fn plus_equals(&mut self, rhs: &Self) {
2316            match (self, rhs) {
2317                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2318                    let swap = {
2319                        let lhs_val = lhs.unpack_first();
2320                        let rhs_val = rhs.unpack_first();
2321                        // Datum::Null is the identity, not a small element.
2322                        match (lhs_val, rhs_val) {
2323                            (_, Datum::Null) => false,
2324                            (Datum::Null, _) => true,
2325                            (lhs, rhs) => rhs < lhs,
2326                        }
2327                    };
2328                    if swap {
2329                        lhs.clone_from(rhs);
2330                    }
2331                }
2332                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2333                    let swap = {
2334                        let lhs_val = lhs.unpack_first();
2335                        let rhs_val = rhs.unpack_first();
2336                        // Datum::Null is the identity, not a large element.
2337                        match (lhs_val, rhs_val) {
2338                            (_, Datum::Null) => false,
2339                            (Datum::Null, _) => true,
2340                            (lhs, rhs) => rhs > lhs,
2341                        }
2342                    };
2343                    if swap {
2344                        lhs.clone_from(rhs);
2345                    }
2346                }
2347                (lhs, rhs) => {
2348                    soft_panic_or_log!(
2349                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2350                    );
2351                }
2352            }
2353        }
2354    }
2355
2356    impl IsZero for ReductionMonoid {
2357        fn is_zero(&self) -> bool {
2358            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2359            // DD uses true results of this method to make stuff disappear. This makes sense when
2360            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2361            // We don't want funny stuff, like disappearing, happening to reduction results even
2362            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2363            false
2364        }
2365    }
2366
2367    impl Columnation for ReductionMonoid {
2368        type InnerRegion = ReductionMonoidRegion;
2369    }
2370
2371    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2372    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2373    /// the former for simplicity reasons.
2374    #[derive(Default)]
2375    pub struct ReductionMonoidRegion {
2376        inner: <Row as Columnation>::InnerRegion,
2377    }
2378
2379    impl Region for ReductionMonoidRegion {
2380        type Item = ReductionMonoid;
2381
2382        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2383            use ReductionMonoid::*;
2384            match item {
2385                Min(row) => Min(unsafe { self.inner.copy(row) }),
2386                Max(row) => Max(unsafe { self.inner.copy(row) }),
2387            }
2388        }
2389
2390        fn clear(&mut self) {
2391            self.inner.clear();
2392        }
2393
2394        fn reserve_items<'a, I>(&mut self, items: I)
2395        where
2396            Self: 'a,
2397            I: Iterator<Item = &'a Self::Item> + Clone,
2398        {
2399            self.inner
2400                .reserve_items(items.map(ReductionMonoid::finalize));
2401        }
2402
2403        fn reserve_regions<'a, I>(&mut self, regions: I)
2404        where
2405            Self: 'a,
2406            I: Iterator<Item = &'a Self> + Clone,
2407        {
2408            self.inner.reserve_regions(regions.map(|r| &r.inner));
2409        }
2410
2411        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2412            self.inner.heap_size(callback);
2413        }
2414    }
2415
2416    /// Get the correct monoid implementation for a given aggregation function. Note that
2417    /// all hierarchical aggregation functions need to supply a monoid implementation.
2418    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2419        match func {
2420            AggregateFunc::MaxNumeric
2421            | AggregateFunc::MaxInt16
2422            | AggregateFunc::MaxInt32
2423            | AggregateFunc::MaxInt64
2424            | AggregateFunc::MaxUInt16
2425            | AggregateFunc::MaxUInt32
2426            | AggregateFunc::MaxUInt64
2427            | AggregateFunc::MaxMzTimestamp
2428            | AggregateFunc::MaxFloat32
2429            | AggregateFunc::MaxFloat64
2430            | AggregateFunc::MaxBool
2431            | AggregateFunc::MaxString
2432            | AggregateFunc::MaxDate
2433            | AggregateFunc::MaxTimestamp
2434            | AggregateFunc::MaxTimestampTz
2435            | AggregateFunc::MaxInterval
2436            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2437            AggregateFunc::MinNumeric
2438            | AggregateFunc::MinInt16
2439            | AggregateFunc::MinInt32
2440            | AggregateFunc::MinInt64
2441            | AggregateFunc::MinUInt16
2442            | AggregateFunc::MinUInt32
2443            | AggregateFunc::MinUInt64
2444            | AggregateFunc::MinMzTimestamp
2445            | AggregateFunc::MinFloat32
2446            | AggregateFunc::MinFloat64
2447            | AggregateFunc::MinBool
2448            | AggregateFunc::MinString
2449            | AggregateFunc::MinDate
2450            | AggregateFunc::MinTimestamp
2451            | AggregateFunc::MinTimestampTz
2452            | AggregateFunc::MinInterval
2453            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2454            AggregateFunc::SumInt16
2455            | AggregateFunc::SumInt32
2456            | AggregateFunc::SumInt64
2457            | AggregateFunc::SumUInt16
2458            | AggregateFunc::SumUInt32
2459            | AggregateFunc::SumUInt64
2460            | AggregateFunc::SumFloat32
2461            | AggregateFunc::SumFloat64
2462            | AggregateFunc::SumNumeric
2463            | AggregateFunc::Count
2464            | AggregateFunc::Any
2465            | AggregateFunc::All
2466            | AggregateFunc::Dummy
2467            | AggregateFunc::JsonbAgg { .. }
2468            | AggregateFunc::JsonbObjectAgg { .. }
2469            | AggregateFunc::MapAgg { .. }
2470            | AggregateFunc::ArrayConcat { .. }
2471            | AggregateFunc::ListConcat { .. }
2472            | AggregateFunc::StringAgg { .. }
2473            | AggregateFunc::RowNumber { .. }
2474            | AggregateFunc::Rank { .. }
2475            | AggregateFunc::DenseRank { .. }
2476            | AggregateFunc::LagLead { .. }
2477            | AggregateFunc::FirstValue { .. }
2478            | AggregateFunc::LastValue { .. }
2479            | AggregateFunc::WindowAggregate { .. }
2480            | AggregateFunc::FusedValueWindowFunc { .. }
2481            | AggregateFunc::FusedWindowAggregate { .. } => None,
2482        }
2483    }
2484}
2485
2486mod window_agg_helpers {
2487    use crate::render::reduce::*;
2488
2489    /// TODO: It would be better for performance to do the branching that is in the methods of this
2490    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2491    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2492    /// directly.
2493    pub enum OneByOneAggrImpls {
2494        Accumulable(AccumulableOneByOneAggr),
2495        Hierarchical(HierarchicalOneByOneAggr),
2496        Basic(mz_expr::NaiveOneByOneAggr),
2497    }
2498
2499    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2500        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2501            match reduction_type(agg) {
2502                ReductionType::Basic => {
2503                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2504                }
2505                ReductionType::Accumulable => {
2506                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2507                }
2508                ReductionType::Hierarchical => {
2509                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2510                }
2511            }
2512        }
2513
2514        fn give(&mut self, d: &Datum) {
2515            match self {
2516                OneByOneAggrImpls::Basic(i) => i.give(d),
2517                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2518                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2519            }
2520        }
2521
2522        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2523            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2524            match self {
2525                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2526                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2527                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2528            }
2529        }
2530    }
2531
2532    pub struct AccumulableOneByOneAggr {
2533        aggr_func: AggregateFunc,
2534        accum: Accum,
2535        total: Diff,
2536    }
2537
2538    impl AccumulableOneByOneAggr {
2539        fn new(aggr_func: &AggregateFunc) -> Self {
2540            AccumulableOneByOneAggr {
2541                aggr_func: aggr_func.clone(),
2542                accum: accumulable_zero(aggr_func),
2543                total: Diff::ZERO,
2544            }
2545        }
2546
2547        fn give(&mut self, d: &Datum) {
2548            self.accum
2549                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2550            self.total += Diff::ONE;
2551        }
2552
2553        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2554            temp_storage.make_datum(|packer| {
2555                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2556            })
2557        }
2558    }
2559
2560    pub struct HierarchicalOneByOneAggr {
2561        aggr_func: AggregateFunc,
2562        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2563        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2564        monoid: ReductionMonoid,
2565    }
2566
2567    impl HierarchicalOneByOneAggr {
2568        fn new(aggr_func: &AggregateFunc) -> Self {
2569            let mut row_buf = Row::default();
2570            row_buf.packer().push(Datum::Null);
2571            HierarchicalOneByOneAggr {
2572                aggr_func: aggr_func.clone(),
2573                monoid: get_monoid(row_buf, aggr_func)
2574                    .expect("aggr_func should be a hierarchical aggregation function"),
2575            }
2576        }
2577
2578        fn give(&mut self, d: &Datum) {
2579            let mut row_buf = Row::default();
2580            row_buf.packer().push(d);
2581            let m = get_monoid(row_buf, &self.aggr_func)
2582                .expect("aggr_func should be a hierarchical aggregation function");
2583            self.monoid.plus_equals(&m);
2584        }
2585
2586        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2587            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2588        }
2589    }
2590}
2591
2592#[cfg(test)]
2593mod tests {
2594    use super::*;
2595
2596    /// The saturating conversion that `float_to_fixed_point` replaces. Used to
2597    /// assert that the new wrapping conversion agrees on the in-range values
2598    /// where the old conversion was already correct.
2599    #[allow(clippy::as_conversions)]
2600    fn saturating_convert(n: f64) -> i128 {
2601        (n * FLOAT_SCALE) as i128
2602    }
2603
2604    #[mz_ore::test]
2605    fn float_to_fixed_point_matches_saturating_in_range() {
2606        // For values whose scaled magnitude comfortably fits in an `i128`, the
2607        // wrapping conversion must produce exactly the same result the previous
2608        // saturating cast did.
2609        let cases = [
2610            0.0,
2611            -0.0,
2612            1.0,
2613            -1.0,
2614            0.1,
2615            -0.1,
2616            0.5,
2617            -0.5,
2618            3.25,
2619            -3.25,
2620            123456.789,
2621            -123456.789,
2622            1e10,
2623            -1e10,
2624            1e20,
2625            -1e20,
2626            5e30, // large, but scaled magnitude still fits comfortably in i128
2627            -5e30,
2628        ];
2629        for n in cases {
2630            assert_eq!(
2631                float_to_fixed_point(n),
2632                saturating_convert(n),
2633                "mismatch for n = {n}"
2634            );
2635        }
2636    }
2637
2638    #[mz_ore::test]
2639    fn float_to_fixed_point_truncates_toward_zero() {
2640        // 1.75 * 2^24 = 29360128, exactly representable.
2641        assert_eq!(float_to_fixed_point(1.75), 29_360_128);
2642        assert_eq!(float_to_fixed_point(-1.75), -29_360_128);
2643
2644        // Fractional results truncate toward zero, matching the previous cast.
2645        let frac = 0.123_456_7_f64;
2646        assert_eq!(float_to_fixed_point(frac), saturating_convert(frac));
2647        assert_eq!(float_to_fixed_point(-frac), saturating_convert(-frac));
2648        assert_eq!(float_to_fixed_point(-frac), -float_to_fixed_point(frac));
2649    }
2650
2651    #[mz_ore::test]
2652    fn float_to_fixed_point_subnormals_round_to_zero() {
2653        assert_eq!(float_to_fixed_point(0.0), 0);
2654        assert_eq!(float_to_fixed_point(-0.0), 0);
2655        assert_eq!(float_to_fixed_point(f64::MIN_POSITIVE / 2.0), 0);
2656        assert_eq!(float_to_fixed_point(5e-324), 0); // smallest subnormal
2657    }
2658
2659    #[mz_ore::test]
2660    fn float_to_fixed_point_cancels_large_finite_values() {
2661        // Regression test for database-issues#11265: large finite values that
2662        // individually overflow the fixed-point domain must still sum to the
2663        // correct result when their mathematical sum is representable. The
2664        // previous saturating conversion produced `i128::MAX + i128::MIN == -1`.
2665        for &n in &[1.1e31_f64, 1e32, 5e33, 1e284] {
2666            assert_eq!(
2667                float_to_fixed_point(n).wrapping_add(float_to_fixed_point(-n)),
2668                0,
2669                "n = {n} did not cancel with -n"
2670            );
2671        }
2672    }
2673
2674    #[mz_ore::test]
2675    fn float_to_fixed_point_sum_via_accumulator() {
2676        // Exercise the full accumulate-then-finalize path for the reported case.
2677        let func = AggregateFunc::SumFloat64;
2678        let mut acc = accumulable_zero(&func);
2679        acc.plus_equals(&datum_to_accumulator(&func, Datum::from(1.1e31_f64)));
2680        acc.plus_equals(&datum_to_accumulator(&func, Datum::from(-1.1e31_f64)));
2681        let datum = finalize_accum(&func, &acc, Diff::from(2_i64));
2682        assert_eq!(datum, Datum::from(0.0_f64));
2683    }
2684}