Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use columnation::{Columnation, CopyRegion};
18use dec::OrderedDecimal;
19use differential_dataflow::Diff as _;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::BatchContainer;
26use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
27use differential_dataflow::trace::{Builder, Trace};
28use differential_dataflow::{Data, VecCollection};
29use itertools::Itertools;
30use mz_compute_types::plan::reduce::{
31    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
38use mz_repr::fixed_length::ToDatumIter;
39use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
40use mz_storage_types::errors::DataflowError;
41use mz_timely_util::operator::CollectionExt;
42use serde::{Deserialize, Serialize};
43use timely::Container;
44use timely::container::{CapacityContainerBuilder, PushInto};
45use tracing::warn;
46
47use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
48use crate::extensions::reduce::{ClearContainer, MzReduce};
49use crate::render::context::{CollectionBundle, Context};
50use crate::render::errors::MaybeValidatingRow;
51use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
52use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
53use crate::row_spine::{
54    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
55};
56use crate::typedefs::{
57    ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58    RowRowSpine, RowSpine, RowValSpine,
59};
60
61impl<'scope, T: RenderTimestamp> Context<'scope, T> {
62    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
63    /// minimize worst-case incremental update times and memory footprint.
64    pub fn render_reduce(
65        &self,
66        input_key: Option<Vec<MirScalarExpr>>,
67        input: CollectionBundle<'scope, T>,
68        key_val_plan: KeyValPlan,
69        reduce_plan: ReducePlan,
70        mfp_after: Option<MapFilterProject>,
71    ) -> CollectionBundle<'scope, T> {
72        // Convert `mfp_after` to an actionable plan.
73        let mfp_after = mfp_after.map(|m| {
74            m.into_plan()
75                .expect("MFP planning must succeed")
76                .into_nontemporal()
77                .expect("Fused Reduce MFPs do not have temporal predicates")
78        });
79
80        input.scope().region_named("Reduce", |inner| {
81            let KeyValPlan {
82                mut key_plan,
83                mut val_plan,
84            } = key_val_plan;
85            let key_arity = key_plan.projection.len();
86            let mut datums = DatumVec::new();
87
88            // Determine the columns we'll need from the row.
89            let mut demand = Vec::new();
90            demand.extend(key_plan.demand());
91            demand.extend(val_plan.demand());
92            demand.sort();
93            demand.dedup();
94
95            // remap column references to the subset we use.
96            let mut demand_map = BTreeMap::new();
97            for column in demand.iter() {
98                demand_map.insert(*column, demand_map.len());
99            }
100            let demand_map_len = demand_map.len();
101            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
102            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
103            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
104            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
105
106            let (key_val_input, err_input) = input.enter_region(inner).flat_map(
107                input_key.map(|k| (k, None)),
108                max_demand,
109                move |row_datums, time, diff| {
110                    let mut row_builder = SharedRow::get();
111                    let temp_storage = RowArena::new();
112
113                    let mut row_iter = row_datums.drain(..);
114                    let mut datums_local = datums.borrow();
115                    // Unpack only the demanded columns.
116                    for skip in skips.iter() {
117                        datums_local.push(row_iter.nth(*skip).unwrap());
118                    }
119
120                    // Evaluate the key expressions.
121                    let key =
122                        key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
123                    let key = match key {
124                        Err(e) => {
125                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
126                        }
127                        Ok(Some(key)) => key.clone(),
128                        Ok(None) => panic!("Row expected as no predicate was used"),
129                    };
130
131                    // Evaluate the value expressions.
132                    // The prior evaluation may have left additional columns we should delete.
133                    datums_local.truncate(skips.len());
134                    let val =
135                        val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
136                    let val = match val {
137                        Err(e) => {
138                            return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
139                        }
140                        Ok(Some(val)) => val.clone(),
141                        Ok(None) => panic!("Row expected as no predicate was used"),
142                    };
143
144                    Some((Ok((key, val)), time.clone(), diff.clone()))
145                },
146            );
147
148            // Demux out the potential errors from key and value selector evaluation.
149            type CB<T> = ConsolidatingContainerBuilder<T>;
150            let (ok, mut err) = key_val_input
151                .as_collection()
152                .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
153
154            err = err.concat(err_input);
155
156            // Render the reduce plan
157            self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
158                .leave_region(self.scope)
159        })
160    }
161
162    /// Render a dataflow based on the provided plan.
163    ///
164    /// The output will be an arrangements that looks the same as if
165    /// we just had a single reduce operator computing everything together, and
166    /// this arrangement can also be re-used.
167    fn render_reduce_plan<'s>(
168        &self,
169        plan: ReducePlan,
170        collection: VecCollection<'s, T, (Row, Row), Diff>,
171        err_input: VecCollection<'s, T, DataflowError, Diff>,
172        key_arity: usize,
173        mfp_after: Option<SafeMfpPlan>,
174    ) -> CollectionBundle<'s, T> {
175        let mut errors = Default::default();
176        let arrangement =
177            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
178        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
179        CollectionBundle::from_columns(
180            0..key_arity,
181            ArrangementFlavor::Local(
182                arrangement,
183                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
184            ),
185        )
186    }
187
188    fn render_reduce_plan_inner<'s>(
189        &self,
190        plan: ReducePlan,
191        collection: VecCollection<'s, T, (Row, Row), Diff>,
192        errors: &mut Vec<VecCollection<'s, T, DataflowError, Diff>>,
193        key_arity: usize,
194        mfp_after: Option<SafeMfpPlan>,
195    ) -> Arranged<'s, RowRowAgent<T, Diff>> {
196        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
197        // not only values (database-issues#6658).
198        let arrangement = match plan {
199            // If we have no aggregations or just a single type of reduction, we
200            // can go ahead and render them directly.
201            ReducePlan::Distinct => {
202                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
203                errors.push(errs);
204                arranged_output
205            }
206            ReducePlan::Accumulable(expr) => {
207                let (arranged_output, errs) =
208                    self.build_accumulable(collection, expr, key_arity, mfp_after);
209                errors.push(errs);
210                arranged_output
211            }
212            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
213                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
214                errors.push(errs);
215                output
216            }
217            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
218                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
219                errors.push(errs);
220                output
221            }
222            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
223                expr,
224                fused_unnest_list,
225            })) => {
226                // Note that we skip validating for negative diffs when we have a fused unnest list,
227                // because this is already a CPU-intensive situation due to the non-incrementalness
228                // of window functions.
229                let validating = !fused_unnest_list;
230                let (output, errs) = self.build_basic_aggregate(
231                    collection,
232                    0,
233                    &expr,
234                    validating,
235                    key_arity,
236                    mfp_after,
237                    fused_unnest_list,
238                );
239                if validating {
240                    errors.push(errs.expect("validation should have occurred as it was requested"));
241                }
242                output
243            }
244            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
245                let (output, errs) =
246                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
247                errors.push(errs);
248                output
249            }
250        };
251        arrangement
252    }
253
254    /// Build the dataflow to compute the set of distinct keys.
255    fn build_distinct<'s>(
256        &self,
257        collection: VecCollection<'s, T, (Row, Row), Diff>,
258        mfp_after: Option<SafeMfpPlan>,
259    ) -> (
260        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
261        VecCollection<'s, T, DataflowError, Diff>,
262    ) {
263        let error_logger = self.error_logger();
264
265        // Allocations for the two closures.
266        let mut datums1 = DatumVec::new();
267        let mut datums2 = DatumVec::new();
268        let mfp_after1 = mfp_after.clone();
269        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
270
271        let arranged = collection
272            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
273                "Arranged DistinctBy",
274            );
275        let output = arranged
276            .clone()
277            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
278                "DistinctBy",
279                move |key, _input, output| {
280                    let temp_storage = RowArena::new();
281                    let mut datums_local = datums1.borrow();
282                    datums_local.extend(key.to_datum_iter());
283
284                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
285                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
286                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
287                    if mfp_after1
288                        .as_ref()
289                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
290                        .unwrap_or(Ok(true))
291                        == Ok(true)
292                    {
293                        // We're pushing a unit value here because the key is implicitly added by the
294                        // arrangement, and the permutation logic takes care of using the key part of the
295                        // output.
296                        output.push((Row::default(), Diff::ONE));
297                    }
298                },
299            );
300        let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
301            "DistinctByErrorCheck",
302            move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
303                for (_, count) in input.iter() {
304                    if count.is_positive() {
305                        continue;
306                    }
307                    let message = "Non-positive multiplicity in DistinctBy";
308                    error_logger.log(message, &format!("row={key:?}, count={count}"));
309                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
310                    return;
311                }
312                // If `mfp_after` can error, then evaluate it here.
313                let Some(mfp) = &mfp_after2 else { return };
314                let temp_storage = RowArena::new();
315                let datum_iter = key.to_datum_iter();
316                let mut datums_local = datums2.borrow();
317                datums_local.extend(datum_iter);
318
319                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
320                    output.push((e.into(), Diff::ONE));
321                }
322            },
323        );
324        (output, errors.as_collection(|_k, v| v.clone()))
325    }
326
327    /// Build the dataflow to compute and arrange multiple non-accumulable,
328    /// non-hierarchical aggregations on `input`.
329    ///
330    /// This function assumes that we are explicitly rendering multiple basic aggregations.
331    /// For each aggregate, we render a different reduce operator, and then fuse
332    /// results together into a final arrangement that presents all the results
333    /// in the order specified by `aggrs`.
334    fn build_basic_aggregates<'s>(
335        &self,
336        input: VecCollection<'s, T, (Row, Row), Diff>,
337        aggrs: Vec<AggregateExpr>,
338        key_arity: usize,
339        mfp_after: Option<SafeMfpPlan>,
340    ) -> (
341        RowRowArrangement<'s, T>,
342        VecCollection<'s, T, DataflowError, Diff>,
343    ) {
344        // We are only using this function to render multiple basic aggregates and
345        // stitch them together. If that's not true we should complain.
346        if aggrs.len() <= 1 {
347            self.error_logger().soft_panic_or_log(
348                "Too few aggregations when building basic aggregates",
349                &format!("len={}", aggrs.len()),
350            )
351        }
352        let mut err_output = None;
353        let mut to_collect = Vec::new();
354        for (index, aggr) in aggrs.into_iter().enumerate() {
355            let (result, errs) = self.build_basic_aggregate(
356                input.clone(),
357                index,
358                &aggr,
359                err_output.is_none(),
360                key_arity,
361                None,
362                false,
363            );
364            if errs.is_some() {
365                err_output = errs
366            }
367            to_collect
368                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
369        }
370
371        // Allocations for the two closures.
372        let mut datums1 = DatumVec::new();
373        let mut datums2 = DatumVec::new();
374        let mfp_after1 = mfp_after.clone();
375        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
376
377        let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
378            .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
379            "Arranged ReduceFuseBasic input",
380        );
381
382        let output = arranged
383            .clone()
384            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
385                move |key, input, output| {
386                    let temp_storage = RowArena::new();
387                    let datum_iter = key.to_datum_iter();
388                    let mut datums_local = datums1.borrow();
389                    datums_local.extend(datum_iter);
390                    let key_len = datums_local.len();
391
392                    for ((_, row), _) in input.iter() {
393                        datums_local.push(row.unpack_first());
394                    }
395
396                    if let Some(row) =
397                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
398                    {
399                        output.push((row, Diff::ONE));
400                    }
401                }
402            });
403        // If `mfp_after` can error, then we need to render a paired reduction
404        // to scan for these potential errors. Note that we cannot directly use
405        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
406        // conditionally render the second component of the reduction pair.
407        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
408        if let Some(mfp) = mfp_after2 {
409            let mfp_errs = arranged
410                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
411                    "ReduceFuseBasic Error Check",
412                    move |key, input, output| {
413                        // Since negative accumulations are checked in at least one component
414                        // aggregate, we only need to look for MFP errors here.
415                        let temp_storage = RowArena::new();
416                        let datum_iter = key.to_datum_iter();
417                        let mut datums_local = datums2.borrow();
418                        datums_local.extend(datum_iter);
419
420                        for ((_, row), _) in input.iter() {
421                            datums_local.push(row.unpack_first());
422                        }
423
424                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
425                            output.push((e.into(), Diff::ONE));
426                        }
427                    },
428                )
429                .as_collection(|_, v| v.clone());
430            (output, validation_errs.concat(mfp_errs))
431        } else {
432            (output, validation_errs)
433        }
434    }
435
436    /// Build the dataflow to compute a single basic aggregation.
437    ///
438    /// This method also applies distinctness if required.
439    fn build_basic_aggregate<'s>(
440        &self,
441        input: VecCollection<'s, T, (Row, Row), Diff>,
442        index: usize,
443        aggr: &AggregateExpr,
444        validating: bool,
445        key_arity: usize,
446        mfp_after: Option<SafeMfpPlan>,
447        fused_unnest_list: bool,
448    ) -> (
449        RowRowArrangement<'s, T>,
450        Option<VecCollection<'s, T, DataflowError, Diff>>,
451    ) {
452        let AggregateExpr {
453            func,
454            expr: _,
455            distinct,
456        } = aggr.clone();
457
458        // Extract the value we were asked to aggregate over.
459        let mut partial = input.map(move |(key, row)| {
460            let mut row_builder = SharedRow::get();
461            let value = row.iter().nth(index).unwrap();
462            row_builder.packer().push(value);
463            (key, row_builder.clone())
464        });
465
466        let mut err_output = None;
467
468        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
469        if distinct {
470            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
471            let pairer = Pairer::new(key_arity);
472            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
473            if validating {
474                let (oks, errs) = self
475                    .build_reduce_inaccumulable_distinct::<
476                        RowValBuilder<Result<(), String>, _, _>,
477                        RowValSpine<Result<(), String>, _, _>,
478                    >(keyed, None)
479                    .as_collection(|k, v| {
480                        (
481                            k.to_row(),
482                            v.as_ref()
483                                .map(|&()| ())
484                                .map_err(|m| m.as_str().into()),
485                        )
486                    })
487                    .map_fallible::<
488                        CapacityContainerBuilder<_>,
489                        CapacityContainerBuilder<_>,
490                        _,
491                        _,
492                        _,
493                    >(
494                        "Demux Errors",
495                        move |(key_val, result)| match result {
496                            Ok(()) => Ok(pairer.split(&key_val)),
497                            Err(m) => {
498                                Err(EvalError::Internal(m).into())
499                            }
500                        },
501                    );
502                err_output = Some(errs);
503                partial = oks;
504            } else {
505                partial = self
506                    .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
507                        keyed,
508                        Some(" [val: empty]"),
509                    )
510                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
511            }
512        }
513
514        // Allocations for the two closures.
515        let mut datums1 = DatumVec::new();
516        let mut datums2 = DatumVec::new();
517        let mut datums_key_1 = DatumVec::new();
518        let mut datums_key_2 = DatumVec::new();
519        let mfp_after1 = mfp_after.clone();
520        let func2 = func.clone();
521
522        let name = if !fused_unnest_list {
523            "ReduceInaccumulable"
524        } else {
525            "FusedReduceUnnestList"
526        };
527        let arranged = partial
528            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
529                "Arranged {name}"
530            ));
531        let oks = if !fused_unnest_list {
532            arranged
533                .clone()
534                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
535                    move |key, source, target| {
536                        // We respect the multiplicity here (unlike in hierarchical aggregation)
537                        // because we don't know that the aggregation method is not sensitive
538                        // to the number of records.
539                        let iter = source.iter().flat_map(|(v, w)| {
540                            // Note that in the non-positive case, this is wrong, but harmless because
541                            // our other reduction will produce an error.
542                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
543                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
544                        });
545
546                        let temp_storage = RowArena::new();
547                        let datum_iter = key.to_datum_iter();
548                        let mut datums_local = datums1.borrow();
549                        datums_local.extend(datum_iter);
550                        let key_len = datums_local.len();
551                        datums_local.push(
552                        // Note that this is not necessarily a window aggregation, in which case
553                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
554                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
555                            iter,
556                            &temp_storage,
557                        ),
558                    );
559
560                        if let Some(row) = evaluate_mfp_after(
561                            &mfp_after1,
562                            &mut datums_local,
563                            &temp_storage,
564                            key_len,
565                        ) {
566                            target.push((row, Diff::ONE));
567                        }
568                    }
569                })
570        } else {
571            arranged
572                .clone()
573                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
574                    move |key, source, target| {
575                        // This part is the same as in the `!fused_unnest_list` if branch above.
576                        let iter = source.iter().flat_map(|(v, w)| {
577                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
578                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
579                        });
580
581                        // This is the part that is specific to the `fused_unnest_list` branch.
582                        let temp_storage = RowArena::new();
583                        let mut datums_local = datums_key_1.borrow();
584                        datums_local.extend(key.to_datum_iter());
585                        let key_len = datums_local.len();
586                        for datum in func
587                            .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
588                                iter,
589                                &temp_storage,
590                            )
591                        {
592                            datums_local.truncate(key_len);
593                            datums_local.push(datum);
594                            if let Some(row) = evaluate_mfp_after(
595                                &mfp_after1,
596                                &mut datums_local,
597                                &temp_storage,
598                                key_len,
599                            ) {
600                                target.push((row, Diff::ONE));
601                            }
602                        }
603                    }
604                })
605        };
606
607        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
608        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
609        // rationale around using a second reduction here.
610        let must_validate = validating && err_output.is_none();
611        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
612        if must_validate || mfp_after2.is_some() {
613            let error_logger = self.error_logger();
614
615            let errs = if !fused_unnest_list {
616                arranged
617                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
618                        &format!("{name} Error Check"),
619                        move |key, source, target| {
620                            // Negative counts would be surprising, but until we are 100% certain we won't
621                            // see them, we should report when we do. We may want to bake even more info
622                            // in here in the future.
623                            if must_validate {
624                                for (value, count) in source.iter() {
625                                    if count.is_positive() {
626                                        continue;
627                                    }
628                                    let value = value.to_row();
629                                    let message =
630                                        "Non-positive accumulation in ReduceInaccumulable";
631                                    error_logger
632                                        .log(message, &format!("value={value:?}, count={count}"));
633                                    let err = EvalError::Internal(message.into());
634                                    target.push((err.into(), Diff::ONE));
635                                    return;
636                                }
637                            }
638
639                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
640                            let Some(mfp) = &mfp_after2 else { return };
641                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
642                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
643                                // This would ideally use `to_datum_iter` but we cannot as it needs to
644                                // borrow `v` and only presents datums with that lifetime, not any longer.
645                                std::iter::repeat(v.next().unwrap()).take(count)
646                            });
647
648                            let temp_storage = RowArena::new();
649                            let datum_iter = key.to_datum_iter();
650                            let mut datums_local = datums2.borrow();
651                            datums_local.extend(datum_iter);
652                            datums_local.push(
653                                func2.eval_with_fast_window_agg::<
654                                    _,
655                                    window_agg_helpers::OneByOneAggrImpls,
656                                >(
657                                    iter, &temp_storage
658                                ),
659                            );
660                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
661                                target.push((e.into(), Diff::ONE));
662                            }
663                        },
664                    )
665                    .as_collection(|_, v| v.clone())
666            } else {
667                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
668                assert!(!must_validate);
669                // We couldn't have got into this if branch due to `must_validate`, so it must be
670                // because of the `mfp_after2.is_some()`.
671                let Some(mfp) = mfp_after2 else {
672                    unreachable!()
673                };
674                arranged
675                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
676                        &format!("{name} Error Check"),
677                        move |key, source, target| {
678                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
679                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
680                                // This would ideally use `to_datum_iter` but we cannot as it needs to
681                                // borrow `v` and only presents datums with that lifetime, not any longer.
682                                std::iter::repeat(v.next().unwrap()).take(count)
683                            });
684
685                            let temp_storage = RowArena::new();
686                            let mut datums_local = datums_key_2.borrow();
687                            datums_local.extend(key.to_datum_iter());
688                            let key_len = datums_local.len();
689                            for datum in func2
690                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
691                                    iter,
692                                    &temp_storage,
693                                )
694                            {
695                                datums_local.truncate(key_len);
696                                datums_local.push(datum);
697                                // We know that `mfp` can error (because of the `could_error` call
698                                // above), so try to evaluate it here.
699                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
700                                {
701                                    target.push((e.into(), Diff::ONE));
702                                }
703                            }
704                        },
705                    )
706                    .as_collection(|_, v| v.clone())
707            };
708
709            if let Some(e) = err_output {
710                err_output = Some(e.concat(errs));
711            } else {
712                err_output = Some(errs);
713            }
714        }
715        (oks, err_output)
716    }
717
718    fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
719        &self,
720        input: VecCollection<'s, T, Row, Diff>,
721        name_tag: Option<&str>,
722    ) -> Arranged<'s, TraceAgent<Tr>>
723    where
724        Tr: for<'a> Trace<
725                Key<'a> = DatumSeq<'a>,
726                KeyContainer: BatchContainer<Owned = Row>,
727                Time = T,
728                Diff = Diff,
729                ValOwn: Data + MaybeValidatingRow<(), String>,
730            > + 'static,
731        Bu: Builder<
732                Time = T,
733                Input: Container
734                           + InternalMerge
735                           + ClearContainer
736                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
737                Output = Tr::Batch,
738            >,
739        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
740    {
741        let error_logger = self.error_logger();
742
743        let output_name = format!(
744            "ReduceInaccumulable Distinct{}",
745            name_tag.unwrap_or_default()
746        );
747
748        let input: KeyCollection<_, _, _> = input.into();
749        input
750            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
751                "Arranged ReduceInaccumulable Distinct [val: empty]",
752            )
753            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
754                if let Some(err) = Tr::ValOwn::into_error() {
755                    for (value, count) in source.iter() {
756                        if count.is_positive() {
757                            continue;
758                        }
759
760                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
761                        error_logger.log(message, &format!("value={value:?}, count={count}"));
762                        t.push((err(message.to_string()), Diff::ONE));
763                        return;
764                    }
765                }
766                t.push((Tr::ValOwn::ok(()), Diff::ONE))
767            })
768    }
769
770    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
771    /// on non-monotonic inputs.
772    ///
773    /// This function renders a single reduction tree that computes aggregations with
774    /// a priority queue implemented with a series of reduce operators that partition
775    /// the input into buckets, and compute the aggregation over very small buckets
776    /// and feed the results up to larger buckets.
777    ///
778    /// Note that this implementation currently ignores the distinct bit because we
779    /// currently only perform min / max hierarchically and the reduction tree
780    /// efficiently suppresses non-distinct updates.
781    ///
782    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
783    /// trickery here to limit the memory usage per layer by internally
784    /// holding only the elements that were rejected by this stage. However, the
785    /// output collection maintains the `((key, bucket), (passing value)` for this
786    /// stage.
787    fn build_bucketed<'s>(
788        &self,
789        input: VecCollection<'s, T, (Row, Row), Diff>,
790        BucketedPlan {
791            aggr_funcs,
792            buckets,
793        }: BucketedPlan,
794        key_arity: usize,
795        mfp_after: Option<SafeMfpPlan>,
796    ) -> (
797        RowRowArrangement<'s, T>,
798        VecCollection<'s, T, DataflowError, Diff>,
799    ) {
800        let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
801        let outer_scope = input.scope();
802        let arranged_output = outer_scope
803            .clone()
804            .region_named("ReduceHierarchical", |inner| {
805                let input = input.enter(inner);
806
807                // The first mod to apply to the hash.
808                let first_mod = buckets.get(0).copied().unwrap_or(1);
809                let aggregations = aggr_funcs.len();
810
811                // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
812                let mut stage = input.map(move |(key, row)| {
813                    let mut row_builder = SharedRow::get();
814                    let mut row_packer = row_builder.packer();
815                    row_packer.extend(row.iter().take(aggregations));
816                    let values = row_builder.clone();
817
818                    // Apply the initial mod here.
819                    let hash = values.hashed() % first_mod;
820                    let hash_key =
821                        row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
822                    (hash_key, values)
823                });
824
825                // Repeatedly apply hierarchical reduction with a progressively coarser key.
826                for (index, b) in buckets.into_iter().enumerate() {
827                    // Apply subsequent bucket mods for all but the first round.
828                    let input = if index == 0 {
829                        stage
830                    } else {
831                        stage.map(move |(hash_key, values)| {
832                            let mut hash_key_iter = hash_key.iter();
833                            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
834                            // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
835                            let hash_key = SharedRow::pack(
836                                std::iter::once(Datum::from(hash))
837                                    .chain(hash_key_iter.take(key_arity)),
838                            );
839                            (hash_key, values)
840                        })
841                    };
842
843                    // We only want the first stage to perform validation of whether invalid accumulations
844                    // were observed in the input. Subsequently, we will either produce an error in the error
845                    // stream or produce correct data in the output stream.
846                    let validating = err_output.is_none();
847
848                    let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
849                    if let Some(errs) = errs {
850                        err_output = Some(errs.leave_region(outer_scope));
851                    }
852                    stage = oks
853                }
854
855                // Discard the hash from the key and return to the format of the input data.
856                let partial = stage.map(move |(hash_key, values)| {
857                    let mut hash_key_iter = hash_key.iter();
858                    let _hash = hash_key_iter.next();
859                    (SharedRow::pack(hash_key_iter.take(key_arity)), values)
860                });
861
862                // Allocations for the two closures.
863                let mut datums1 = DatumVec::new();
864                let mut datums2 = DatumVec::new();
865                let mfp_after1 = mfp_after.clone();
866                let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
867                let aggr_funcs2 = aggr_funcs.clone();
868
869                // Build a series of stages for the reduction
870                // Arrange the final result into (key, Row)
871                let error_logger = self.error_logger();
872                // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
873                // view mz_introspection.mz_expected_group_size_advice.
874                let arranged = partial
875                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
876                        "Arrange ReduceMinsMaxes",
877                    );
878                // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
879                // but we then wouldn't be able to do this error check conditionally.  See its documentation
880                // for the rationale around using a second reduction here.
881                let must_validate = err_output.is_none();
882                if must_validate || mfp_after2.is_some() {
883                    let errs = arranged
884                        .clone()
885                        .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
886                            "ReduceMinsMaxes Error Check",
887                            move |key, source, target| {
888                                // Negative counts would be surprising, but until we are 100% certain we wont
889                                // see them, we should report when we do. We may want to bake even more info
890                                // in here in the future.
891                                if must_validate {
892                                    for (val, count) in source.iter() {
893                                        if count.is_positive() {
894                                            continue;
895                                        }
896                                        let val = val.to_row();
897                                        let message =
898                                            "Non-positive accumulation in ReduceMinsMaxes";
899                                        error_logger
900                                            .log(message, &format!("val={val:?}, count={count}"));
901                                        target.push((
902                                            EvalError::Internal(message.into()).into(),
903                                            Diff::ONE,
904                                        ));
905                                        return;
906                                    }
907                                }
908
909                                // We know that `mfp_after` can error if it exists, so try to evaluate it here.
910                                let Some(mfp) = &mfp_after2 else { return };
911                                let temp_storage = RowArena::new();
912                                let datum_iter = key.to_datum_iter();
913                                let mut datums_local = datums2.borrow();
914                                datums_local.extend(datum_iter);
915
916                                let mut source_iters = source
917                                    .iter()
918                                    .map(|(values, _cnt)| *values)
919                                    .collect::<Vec<_>>();
920                                for func in aggr_funcs2.iter() {
921                                    let column_iter = (0..source_iters.len())
922                                        .map(|i| source_iters[i].next().unwrap());
923                                    datums_local.push(func.eval(column_iter, &temp_storage));
924                                }
925                                if let Result::Err(e) =
926                                    mfp.evaluate_inner(&mut datums_local, &temp_storage)
927                                {
928                                    target.push((e.into(), Diff::ONE));
929                                }
930                            },
931                        )
932                        .as_collection(|_, v| v.clone())
933                        .leave_region(outer_scope);
934                    if let Some(e) = err_output.take() {
935                        err_output = Some(e.concat(errs));
936                    } else {
937                        err_output = Some(errs);
938                    }
939                }
940                arranged
941                    .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
942                        "ReduceMinsMaxes",
943                        move |key, source, target| {
944                            let temp_storage = RowArena::new();
945                            let datum_iter = key.to_datum_iter();
946                            let mut datums_local = datums1.borrow();
947                            datums_local.extend(datum_iter);
948                            let key_len = datums_local.len();
949
950                            let mut source_iters = source
951                                .iter()
952                                .map(|(values, _cnt)| *values)
953                                .collect::<Vec<_>>();
954                            for func in aggr_funcs.iter() {
955                                let column_iter = (0..source_iters.len())
956                                    .map(|i| source_iters[i].next().unwrap());
957                                datums_local.push(func.eval(column_iter, &temp_storage));
958                            }
959
960                            if let Some(row) = evaluate_mfp_after(
961                                &mfp_after1,
962                                &mut datums_local,
963                                &temp_storage,
964                                key_len,
965                            ) {
966                                target.push((row, Diff::ONE));
967                            }
968                        },
969                    )
970                    .leave_region(outer_scope)
971            });
972        (
973            arranged_output,
974            err_output.expect("expected to validate in one level of the hierarchy"),
975        )
976    }
977
978    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
979    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
980    /// with the negation of what's produced by the reduction.
981    /// `validating` indicates whether we want this stage to perform error detection
982    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
983    /// stages can skip validation.
984    fn build_bucketed_stage<'s>(
985        &self,
986        aggr_funcs: &Vec<AggregateFunc>,
987        input: VecCollection<'s, T, (Row, Row), Diff>,
988        validating: bool,
989    ) -> (
990        VecCollection<'s, T, (Row, Row), Diff>,
991        Option<VecCollection<'s, T, DataflowError, Diff>>,
992    ) {
993        let (input, negated_output, errs) = if validating {
994            let (input, reduced) = self
995                .build_bucketed_negated_output::<
996                    RowValBuilder<_, _, _>,
997                    RowValSpine<Result<Row, Row>, _, _>,
998                >(
999                    input.clone(),
1000                    aggr_funcs.clone(),
1001                );
1002            let (oks, errs) = reduced
1003                .as_collection(|k, v| (k.to_row(), v.clone()))
1004                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1005                "Checked Invalid Accumulations",
1006                |(hash_key, result)| match result {
1007                    Err(hash_key) => {
1008                        let mut hash_key_iter = hash_key.iter();
1009                        let _hash = hash_key_iter.next();
1010                        let key = SharedRow::pack(hash_key_iter);
1011                        let message = format!(
1012                            "Invalid data in source, saw non-positive accumulation \
1013                                         for key {key:?} in hierarchical mins-maxes aggregate"
1014                        );
1015                        Err(EvalError::Internal(message.into()).into())
1016                    }
1017                    Ok(values) => Ok((hash_key, values)),
1018                },
1019            );
1020            (input, oks, Some(errs))
1021        } else {
1022            let (input, reduced) = self
1023                .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1024                    input,
1025                    aggr_funcs.clone(),
1026                );
1027            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1028            // that we need to apply the mod on both input and oks.
1029            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1030            (input, oks, None)
1031        };
1032
1033        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1034        let oks = negated_output.concat(input);
1035        (oks, errs)
1036    }
1037
1038    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1039    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1040    /// with all diffs in the reduction's output negated.
1041    fn build_bucketed_negated_output<'s, Bu, Tr>(
1042        &self,
1043        input: VecCollection<'s, T, (Row, Row), Diff>,
1044        aggrs: Vec<AggregateFunc>,
1045    ) -> (
1046        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1047        Arranged<'s, TraceAgent<Tr>>,
1048    )
1049    where
1050        Tr: for<'a> Trace<
1051                Key<'a> = DatumSeq<'a>,
1052                KeyContainer: BatchContainer<Owned = Row>,
1053                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1054                Time = T,
1055                Diff = Diff,
1056            > + 'static,
1057        Bu: Builder<
1058                Time = T,
1059                Input: Container
1060                           + InternalMerge
1061                           + ClearContainer
1062                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1063                Output = Tr::Batch,
1064            >,
1065        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1066    {
1067        let error_logger = self.error_logger();
1068        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1069        // view mz_introspection.mz_expected_group_size_advice.
1070        let arranged_input = input
1071            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1072                "Arranged MinsMaxesHierarchical input",
1073            );
1074
1075        let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1076            "Reduced Fallibly MinsMaxesHierarchical",
1077            move |key, source, target| {
1078                if let Some(err) = Tr::ValOwn::into_error() {
1079                    // Should negative accumulations reach us, we should loudly complain.
1080                    for (value, count) in source.iter() {
1081                        if count.is_positive() {
1082                            continue;
1083                        }
1084                        error_logger.log(
1085                            "Non-positive accumulation in MinsMaxesHierarchical",
1086                            &format!("key={key:?}, value={value:?}, count={count}"),
1087                        );
1088                        // After complaining, output an error here so that we can eventually
1089                        // report it in an error stream.
1090                        target.push((
1091                            err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1092                            Diff::ONE,
1093                        ));
1094                        return;
1095                    }
1096                }
1097
1098                let mut row_builder = SharedRow::get();
1099                let mut row_packer = row_builder.packer();
1100
1101                let mut source_iters = source
1102                    .iter()
1103                    .map(|(values, _cnt)| *values)
1104                    .collect::<Vec<_>>();
1105                for func in aggrs.iter() {
1106                    let column_iter =
1107                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1108                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1109                }
1110                // We only want to arrange the parts of the input that are not part of the output.
1111                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1112                // gives us the intended value of this aggregate function. Also we assume that regardless
1113                // of the multiplicity of the final result in the input, we only want to have one copy
1114                // in the output.
1115                target.reserve(source.len().saturating_add(1));
1116                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1117                target.extend(source.iter().map(|(values, cnt)| {
1118                    let mut cnt = *cnt;
1119                    cnt.negate();
1120                    (Tr::ValOwn::ok(values.to_row()), cnt)
1121                }));
1122            },
1123        );
1124        (arranged_input, reduced)
1125    }
1126
1127    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1128    /// on monotonic inputs.
1129    fn build_monotonic<'s>(
1130        &self,
1131        collection: VecCollection<'s, T, (Row, Row), Diff>,
1132        MonotonicPlan {
1133            aggr_funcs,
1134            must_consolidate,
1135        }: MonotonicPlan,
1136        mfp_after: Option<SafeMfpPlan>,
1137    ) -> (
1138        RowRowArrangement<'s, T>,
1139        VecCollection<'s, T, DataflowError, Diff>,
1140    ) {
1141        let aggregations = aggr_funcs.len();
1142        // Gather the relevant values into a vec of rows ordered by aggregation_index
1143        let collection = collection
1144            .map(move |(key, row)| {
1145                let mut row_builder = SharedRow::get();
1146                let mut values = Vec::with_capacity(aggregations);
1147                values.extend(
1148                    row.iter()
1149                        .take(aggregations)
1150                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1151                );
1152
1153                (key, values)
1154            })
1155            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1156                must_consolidate,
1157                "Consolidated ReduceMonotonic input",
1158            );
1159
1160        // It should be now possible to ensure that we have a monotonic collection.
1161        let error_logger = self.error_logger();
1162        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1163            error_logger.log(
1164                "Non-monotonic input to ReduceMonotonic",
1165                &format!("data={data:?}, diff={diff}"),
1166            );
1167            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1168            (EvalError::Internal(m).into(), Diff::ONE)
1169        });
1170        // We can place our rows directly into the diff field, and
1171        // only keep the relevant one corresponding to evaluating our
1172        // aggregate, instead of having to do a hierarchical reduction.
1173        let partial = partial.explode_one(move |(key, values)| {
1174            let mut output = Vec::new();
1175            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1176                output.push(monoids::get_monoid(row, func).expect(
1177                    "hierarchical aggregations are expected to have monoid implementations",
1178                ));
1179            }
1180            (key, output)
1181        });
1182
1183        // Allocations for the two closures.
1184        let mut datums1 = DatumVec::new();
1185        let mut datums2 = DatumVec::new();
1186        let mfp_after1 = mfp_after.clone();
1187        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1188
1189        let partial: KeyCollection<_, _, _> = partial.into();
1190        let arranged = partial
1191            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1192                "ArrangeMonotonic [val: empty]",
1193            );
1194        let output = arranged
1195            .clone()
1196            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1197                move |key, input, output| {
1198                    let temp_storage = RowArena::new();
1199                    let datum_iter = key.to_datum_iter();
1200                    let mut datums_local = datums1.borrow();
1201                    datums_local.extend(datum_iter);
1202                    let key_len = datums_local.len();
1203                    let accum = &input[0].1;
1204                    for monoid in accum.iter() {
1205                        datums_local.extend(monoid.finalize().iter());
1206                    }
1207
1208                    if let Some(row) =
1209                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1210                    {
1211                        output.push((row, Diff::ONE));
1212                    }
1213                }
1214            });
1215
1216        // If `mfp_after` can error, then we need to render a paired reduction
1217        // to scan for these potential errors. Note that we cannot directly use
1218        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1219        // conditionally render the second component of the reduction pair.
1220        if let Some(mfp) = mfp_after2 {
1221            let mfp_errs = arranged
1222                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1223                    "ReduceMonotonic Error Check",
1224                    move |key, input, output| {
1225                        let temp_storage = RowArena::new();
1226                        let datum_iter = key.to_datum_iter();
1227                        let mut datums_local = datums2.borrow();
1228                        datums_local.extend(datum_iter);
1229                        let accum = &input[0].1;
1230                        for monoid in accum.iter() {
1231                            datums_local.extend(monoid.finalize().iter());
1232                        }
1233                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1234                        {
1235                            output.push((e.into(), Diff::ONE));
1236                        }
1237                    },
1238                )
1239                .as_collection(|_k, v| v.clone());
1240            (output, validation_errs.concat(mfp_errs))
1241        } else {
1242            (output, validation_errs)
1243        }
1244    }
1245
1246    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1247    ///
1248    /// The incoming values are moved to the update's "difference" field, at which point
1249    /// they can be accumulated in place. The `count` operator promotes the accumulated
1250    /// values to data, at which point a final map applies operator-specific logic to
1251    /// yield the final aggregate.
1252    fn build_accumulable<'s>(
1253        &self,
1254        collection: VecCollection<'s, T, (Row, Row), Diff>,
1255        AccumulablePlan {
1256            full_aggrs,
1257            simple_aggrs,
1258            distinct_aggrs,
1259        }: AccumulablePlan,
1260        key_arity: usize,
1261        mfp_after: Option<SafeMfpPlan>,
1262    ) -> (
1263        RowRowArrangement<'s, T>,
1264        VecCollection<'s, T, DataflowError, Diff>,
1265    ) {
1266        let collection_scope = collection.scope();
1267
1268        // we must have called this function with something to reduce
1269        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1270            self.error_logger().soft_panic_or_log(
1271                "Incorrect numbers of aggregates in accummulable reduction rendering",
1272                &format!(
1273                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1274                    full_aggrs.len(),
1275                    simple_aggrs.len(),
1276                    distinct_aggrs.len(),
1277                ),
1278            );
1279        }
1280
1281        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1282        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1283        // Other aggregations can be directly moved in to the `diff` field.
1284        //
1285        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1286        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1287        // generally the count, and then two aggregation-specific values. The size could be
1288        // reduced if we want to specialize for the aggregations.
1289
1290        // Instantiate a default vector for diffs with the correct types at each
1291        // position.
1292        let zero_diffs: (Vec<_>, Diff) = (
1293            full_aggrs
1294                .iter()
1295                .map(|f| accumulable_zero(&f.func))
1296                .collect(),
1297            Diff::ZERO,
1298        );
1299
1300        let mut to_aggregate = Vec::new();
1301        if simple_aggrs.len() > 0 {
1302            // First, collect all non-distinct aggregations in one pass.
1303            let collection = collection.clone();
1304            let easy_cases = collection.explode_one({
1305                let zero_diffs = zero_diffs.clone();
1306                move |(key, row)| {
1307                    let mut diffs = zero_diffs.clone();
1308                    // Try to unpack only the datums we need. Unfortunately, since we
1309                    // can't random access into a Row, we have to iterate through one by one.
1310                    // TODO: Even though we don't have random access, we could still avoid unpacking
1311                    // everything that we don't care about, and it might be worth it to extend the
1312                    // Row API to do that.
1313                    let mut row_iter = row.iter().enumerate();
1314                    for (datum_index, aggr) in simple_aggrs.iter() {
1315                        let mut datum = row_iter.next().unwrap();
1316                        while datum_index != &datum.0 {
1317                            datum = row_iter.next().unwrap();
1318                        }
1319                        let datum = datum.1;
1320                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1321                        diffs.1 = Diff::ONE;
1322                    }
1323                    ((key, ()), diffs)
1324                }
1325            });
1326            to_aggregate.push(easy_cases);
1327        }
1328
1329        // Next, collect all aggregations that require distinctness.
1330        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1331            let pairer = Pairer::new(key_arity);
1332            let collection = collection
1333                .clone()
1334                .map(move |(key, row)| {
1335                    let value = row.iter().nth(datum_index).unwrap();
1336                    (pairer.merge(&key, std::iter::once(value)), ())
1337                })
1338                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1339                    "Arranged Accumulable Distinct [val: empty]",
1340                )
1341                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1342                    "Reduced Accumulable Distinct [val: empty]",
1343                    move |_k, _s, t| t.push(((), Diff::ONE)),
1344                )
1345                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1346                .explode_one({
1347                    let zero_diffs = zero_diffs.clone();
1348                    move |(key, row)| {
1349                        let datum = row.iter().next().unwrap();
1350                        let mut diffs = zero_diffs.clone();
1351                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1352                        diffs.1 = Diff::ONE;
1353                        ((key, ()), diffs)
1354                    }
1355                });
1356            to_aggregate.push(collection);
1357        }
1358
1359        // now concatenate, if necessary, multiple aggregations
1360        let collection = if to_aggregate.len() == 1 {
1361            to_aggregate.remove(0)
1362        } else {
1363            differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1364        };
1365
1366        // Allocations for the two closures.
1367        let mut datums1 = DatumVec::new();
1368        let mut datums2 = DatumVec::new();
1369        let mfp_after1 = mfp_after.clone();
1370        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1371        let full_aggrs2 = full_aggrs.clone();
1372
1373        let error_logger = self.error_logger();
1374        let err_full_aggrs = full_aggrs.clone();
1375        let arranged = collection
1376            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, (Vec<Accum>, Diff)>>(
1377                "ArrangeAccumulable [val: empty]",
1378            );
1379        let arranged_output = arranged
1380            .clone()
1381            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1382                move |key, input, output| {
1383                    let (ref accums, total) = input[0].1;
1384
1385                    let temp_storage = RowArena::new();
1386                    let datum_iter = key.to_datum_iter();
1387                    let mut datums_local = datums1.borrow();
1388                    datums_local.extend(datum_iter);
1389                    let key_len = datums_local.len();
1390                    for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1391                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1392                    }
1393
1394                    if let Some(row) =
1395                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1396                    {
1397                        output.push((row, Diff::ONE));
1398                    }
1399                }
1400            });
1401        let arranged_errs = arranged
1402            .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1403                "AccumulableErrorCheck",
1404                move |key, input, output| {
1405                    let (ref accums, total) = input[0].1;
1406                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1407                        // We first test here if inputs without net-positive records are present,
1408                        // producing an error to the logs and to the query output if that is the case.
1409                        if total == Diff::ZERO && !accum.is_zero() {
1410                            error_logger.log(
1411                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1412                                &format!("aggr={aggr:?}, accum={accum:?}"),
1413                            );
1414                            let key = key.to_row();
1415                            let message = format!(
1416                                "Invalid data in source, saw net-zero records for key {key} \
1417                                 with non-zero accumulation in accumulable aggregate"
1418                            );
1419                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1420                        }
1421                        match (&aggr.func, &accum) {
1422                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1423                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1424                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1425                                if accum.is_negative() {
1426                                    error_logger.log(
1427                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1428                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1429                                );
1430                                    let key = key.to_row();
1431                                    let message = format!(
1432                                        "Invalid data in source, saw negative accumulation with \
1433                                         unsigned type for key {key}"
1434                                    );
1435                                    let err = EvalError::Internal(message.into());
1436                                    output.push((err.into(), Diff::ONE));
1437                                }
1438                            }
1439                            _ => (), // no more errors to check for at this point!
1440                        }
1441                    }
1442
1443                    // If `mfp_after` can error, then evaluate it here.
1444                    let Some(mfp) = &mfp_after2 else { return };
1445                    let temp_storage = RowArena::new();
1446                    let datum_iter = key.to_datum_iter();
1447                    let mut datums_local = datums2.borrow();
1448                    datums_local.extend(datum_iter);
1449                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1450                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1451                    }
1452
1453                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1454                        output.push((e.into(), Diff::ONE));
1455                    }
1456                },
1457            );
1458        (
1459            arranged_output,
1460            arranged_errs.as_collection(|_key, error| error.clone()),
1461        )
1462    }
1463}
1464
1465/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1466/// containing key and aggregate values, then returns a result `Row` or `None`
1467/// if the MFP filters the result out.
1468fn evaluate_mfp_after<'a, 'b>(
1469    mfp_after: &'a Option<SafeMfpPlan>,
1470    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1471    temp_storage: &'a RowArena,
1472    key_len: usize,
1473) -> Option<Row> {
1474    let mut row_builder = SharedRow::get();
1475    // Apply MFP if it exists and pack a Row of
1476    // aggregate values from `datums_local`.
1477    if let Some(mfp) = mfp_after {
1478        // It must ignore errors here, but they are scanned
1479        // for elsewhere if the MFP can error.
1480        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1481            // The `mfp_after` must preserve the key columns,
1482            // so we can skip them to form aggregation results.
1483            Some(row_builder.pack_using(iter.skip(key_len)))
1484        } else {
1485            None
1486        }
1487    } else {
1488        Some(row_builder.pack_using(&datums_local[key_len..]))
1489    }
1490}
1491
1492fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1493    match aggr_func {
1494        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1495            trues: Diff::ZERO,
1496            falses: Diff::ZERO,
1497        },
1498        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1499            accum: AccumCount::ZERO,
1500            pos_infs: Diff::ZERO,
1501            neg_infs: Diff::ZERO,
1502            nans: Diff::ZERO,
1503            non_nulls: Diff::ZERO,
1504        },
1505        AggregateFunc::SumNumeric => Accum::Numeric {
1506            accum: OrderedDecimal(NumericAgg::zero()),
1507            pos_infs: Diff::ZERO,
1508            neg_infs: Diff::ZERO,
1509            nans: Diff::ZERO,
1510            non_nulls: Diff::ZERO,
1511        },
1512        _ => Accum::SimpleNumber {
1513            accum: AccumCount::ZERO,
1514            non_nulls: Diff::ZERO,
1515        },
1516    }
1517}
1518
1519static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1520
1521fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1522    match aggregate_func {
1523        AggregateFunc::Count => Accum::SimpleNumber {
1524            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1525            non_nulls: if datum.is_null() {
1526                Diff::ZERO
1527            } else {
1528                Diff::ONE
1529            },
1530        },
1531        AggregateFunc::Any | AggregateFunc::All => match datum {
1532            Datum::True => Accum::Bool {
1533                trues: Diff::ONE,
1534                falses: Diff::ZERO,
1535            },
1536            Datum::Null => Accum::Bool {
1537                trues: Diff::ZERO,
1538                falses: Diff::ZERO,
1539            },
1540            Datum::False => Accum::Bool {
1541                trues: Diff::ZERO,
1542                falses: Diff::ONE,
1543            },
1544            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1545        },
1546        AggregateFunc::Dummy => match datum {
1547            Datum::Dummy => Accum::SimpleNumber {
1548                accum: AccumCount::ZERO,
1549                non_nulls: Diff::ZERO,
1550            },
1551            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1552        },
1553        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1554            let n = match datum {
1555                Datum::Float32(n) => f64::from(*n),
1556                Datum::Float64(n) => *n,
1557                Datum::Null => 0f64,
1558                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1559            };
1560
1561            let nans = Diff::from(n.is_nan());
1562            let pos_infs = Diff::from(n == f64::INFINITY);
1563            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1564            let non_nulls = Diff::from(datum != Datum::Null);
1565
1566            // Map the floating point value onto a fixed precision domain
1567            // All special values should map to zero, since they are tracked separately
1568            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1569                AccumCount::ZERO
1570            } else {
1571                // This operation will truncate to i128::MAX if out of range.
1572                // TODO(benesch): rewrite to avoid `as`.
1573                #[allow(clippy::as_conversions)]
1574                { (n * *FLOAT_SCALE) as i128 }.into()
1575            };
1576
1577            Accum::Float {
1578                accum,
1579                pos_infs,
1580                neg_infs,
1581                nans,
1582                non_nulls,
1583            }
1584        }
1585        AggregateFunc::SumNumeric => match datum {
1586            Datum::Numeric(n) => {
1587                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1588                    if n.0.is_negative() {
1589                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1590                    } else {
1591                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1592                    }
1593                } else if n.0.is_nan() {
1594                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1595                } else {
1596                    // Take a narrow decimal (datum) into a wide decimal
1597                    // (aggregator).
1598                    let mut cx_agg = numeric::cx_agg();
1599                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1600                };
1601
1602                Accum::Numeric {
1603                    accum: OrderedDecimal(accum),
1604                    pos_infs,
1605                    neg_infs,
1606                    nans,
1607                    non_nulls: Diff::ONE,
1608                }
1609            }
1610            Datum::Null => Accum::Numeric {
1611                accum: OrderedDecimal(NumericAgg::zero()),
1612                pos_infs: Diff::ZERO,
1613                neg_infs: Diff::ZERO,
1614                nans: Diff::ZERO,
1615                non_nulls: Diff::ZERO,
1616            },
1617            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1618        },
1619        _ => {
1620            // Other accumulations need to disentangle the accumulable
1621            // value from its NULL-ness, which is not quite as easily
1622            // accumulated.
1623            match datum {
1624                Datum::Int16(i) => Accum::SimpleNumber {
1625                    accum: i.into(),
1626                    non_nulls: Diff::ONE,
1627                },
1628                Datum::Int32(i) => Accum::SimpleNumber {
1629                    accum: i.into(),
1630                    non_nulls: Diff::ONE,
1631                },
1632                Datum::Int64(i) => Accum::SimpleNumber {
1633                    accum: i.into(),
1634                    non_nulls: Diff::ONE,
1635                },
1636                Datum::UInt16(u) => Accum::SimpleNumber {
1637                    accum: u.into(),
1638                    non_nulls: Diff::ONE,
1639                },
1640                Datum::UInt32(u) => Accum::SimpleNumber {
1641                    accum: u.into(),
1642                    non_nulls: Diff::ONE,
1643                },
1644                Datum::UInt64(u) => Accum::SimpleNumber {
1645                    accum: u.into(),
1646                    non_nulls: Diff::ONE,
1647                },
1648                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1649                    accum: u64::from(t).into(),
1650                    non_nulls: Diff::ONE,
1651                },
1652                Datum::Null => Accum::SimpleNumber {
1653                    accum: AccumCount::ZERO,
1654                    non_nulls: Diff::ZERO,
1655                },
1656                x => panic!("Accumulating non-integer data: {x:?}"),
1657            }
1658        }
1659    }
1660}
1661
1662fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1663    // The finished value depends on the aggregation function in a variety of ways.
1664    // For all aggregates but count, if only null values were
1665    // accumulated, then the output is null.
1666    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1667        Datum::Null
1668    } else {
1669        match (&aggr_func, &accum) {
1670            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1671                Datum::Int64(non_nulls.into_inner())
1672            }
1673            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1674                // If any false, else if all true, else must be no false and some nulls.
1675                if falses.is_positive() {
1676                    Datum::False
1677                } else if *trues == total {
1678                    Datum::True
1679                } else {
1680                    Datum::Null
1681                }
1682            }
1683            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1684                // If any true, else if all false, else must be no true and some nulls.
1685                if trues.is_positive() {
1686                    Datum::True
1687                } else if *falses == total {
1688                    Datum::False
1689                } else {
1690                    Datum::Null
1691                }
1692            }
1693            (AggregateFunc::Dummy, _) => Datum::Dummy,
1694            // If any non-nulls, just report the aggregate.
1695            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1696            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1697                // This conversion is safe, as long as we have less than 2^32
1698                // summands.
1699                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1700                // If so, rewrite to avoid `as`.
1701                #[allow(clippy::as_conversions)]
1702                Datum::Int64(accum.into_inner() as i64)
1703            }
1704            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1705            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1706            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1707                if !accum.is_negative() {
1708                    // Our semantics of overflow are not clearly articulated wrt.
1709                    // unsigned vs. signed types (database-issues#5172). We adopt an
1710                    // unsigned wrapping behavior to match what we do above for
1711                    // signed types.
1712                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1713                    #[allow(clippy::as_conversions)]
1714                    Datum::UInt64(accum.into_inner() as u64)
1715                } else {
1716                    // Note that we return a value here, but an error in the other
1717                    // operator of the reduce_pair. Therefore, we expect that this
1718                    // value will never be exposed as an output.
1719                    Datum::Null
1720                }
1721            }
1722            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1723                if !accum.is_negative() {
1724                    Datum::from(*accum)
1725                } else {
1726                    // Note that we return a value here, but an error in the other
1727                    // operator of the reduce_pair. Therefore, we expect that this
1728                    // value will never be exposed as an output.
1729                    Datum::Null
1730                }
1731            }
1732            (
1733                AggregateFunc::SumFloat32,
1734                Accum::Float {
1735                    accum,
1736                    pos_infs,
1737                    neg_infs,
1738                    nans,
1739                    non_nulls: _,
1740                },
1741            ) => {
1742                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1743                    // NaNs are NaNs and cases where we've seen a
1744                    // mixture of positive and negative infinities.
1745                    Datum::from(f32::NAN)
1746                } else if pos_infs.is_positive() {
1747                    Datum::from(f32::INFINITY)
1748                } else if neg_infs.is_positive() {
1749                    Datum::from(f32::NEG_INFINITY)
1750                } else {
1751                    // TODO(benesch): remove potentially dangerous usage of `as`.
1752                    #[allow(clippy::as_conversions)]
1753                    {
1754                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1755                    }
1756                }
1757            }
1758            (
1759                AggregateFunc::SumFloat64,
1760                Accum::Float {
1761                    accum,
1762                    pos_infs,
1763                    neg_infs,
1764                    nans,
1765                    non_nulls: _,
1766                },
1767            ) => {
1768                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1769                    // NaNs are NaNs and cases where we've seen a
1770                    // mixture of positive and negative infinities.
1771                    Datum::from(f64::NAN)
1772                } else if pos_infs.is_positive() {
1773                    Datum::from(f64::INFINITY)
1774                } else if neg_infs.is_positive() {
1775                    Datum::from(f64::NEG_INFINITY)
1776                } else {
1777                    // TODO(benesch): remove potentially dangerous usage of `as`.
1778                    #[allow(clippy::as_conversions)]
1779                    {
1780                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1781                    }
1782                }
1783            }
1784            (
1785                AggregateFunc::SumNumeric,
1786                Accum::Numeric {
1787                    accum,
1788                    pos_infs,
1789                    neg_infs,
1790                    nans,
1791                    non_nulls: _,
1792                },
1793            ) => {
1794                let mut cx_datum = numeric::cx_datum();
1795                let d = cx_datum.to_width(accum.0);
1796                // Take a wide decimal (aggregator) into a
1797                // narrow decimal (datum). If this operation
1798                // overflows the datum, this new value will be
1799                // +/- infinity. However, the aggregator tracks
1800                // the amount of overflow, making it invertible.
1801                let inf_d = d.is_infinite();
1802                let neg_d = d.is_negative();
1803                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1804                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1805                if nans.is_positive() || (pos_inf && neg_inf) {
1806                    // NaNs are NaNs and cases where we've seen a
1807                    // mixture of positive and negative infinities.
1808                    Datum::from(Numeric::nan())
1809                } else if pos_inf {
1810                    Datum::from(Numeric::infinity())
1811                } else if neg_inf {
1812                    let mut cx = numeric::cx_datum();
1813                    let mut d = Numeric::infinity();
1814                    cx.neg(&mut d);
1815                    Datum::from(d)
1816                } else {
1817                    Datum::from(d)
1818                }
1819            }
1820            _ => panic!(
1821                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1822                aggr_func
1823            ),
1824        }
1825    }
1826}
1827
1828/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1829type AccumCount = mz_ore::Overflowing<i128>;
1830
1831/// Accumulates values for the various types of accumulable aggregations.
1832///
1833/// We assume that there are not more than 2^32 elements for the aggregation.
1834/// Thus we can perform a summation over i32 in an i64 accumulator
1835/// and not worry about exceeding its bounds.
1836///
1837/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1838/// point representation has less precision than a double. It is entirely possible
1839/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1840/// to preserve group guarantees.
1841#[derive(
1842    Debug,
1843    Clone,
1844    Copy,
1845    PartialEq,
1846    Eq,
1847    PartialOrd,
1848    Ord,
1849    Serialize,
1850    Deserialize
1851)]
1852enum Accum {
1853    /// Accumulates boolean values.
1854    Bool {
1855        /// The number of `true` values observed.
1856        trues: Diff,
1857        /// The number of `false` values observed.
1858        falses: Diff,
1859    },
1860    /// Accumulates simple numeric values.
1861    SimpleNumber {
1862        /// The accumulation of all non-NULL values observed.
1863        accum: AccumCount,
1864        /// The number of non-NULL values observed.
1865        non_nulls: Diff,
1866    },
1867    /// Accumulates float values.
1868    Float {
1869        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
1870        /// preserve associativity and commutativity
1871        accum: AccumCount,
1872        /// Counts +inf
1873        pos_infs: Diff,
1874        /// Counts -inf
1875        neg_infs: Diff,
1876        /// Counts NaNs
1877        nans: Diff,
1878        /// Counts non-NULL values
1879        non_nulls: Diff,
1880    },
1881    /// Accumulates arbitrary precision decimals.
1882    Numeric {
1883        /// Accumulates non-special values
1884        accum: OrderedDecimal<NumericAgg>,
1885        /// Counts +inf
1886        pos_infs: Diff,
1887        /// Counts -inf
1888        neg_infs: Diff,
1889        /// Counts NaNs
1890        nans: Diff,
1891        /// Counts non-NULL values
1892        non_nulls: Diff,
1893    },
1894}
1895
1896impl IsZero for Accum {
1897    fn is_zero(&self) -> bool {
1898        match self {
1899            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1900            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1901            Accum::Float {
1902                accum,
1903                pos_infs,
1904                neg_infs,
1905                nans,
1906                non_nulls,
1907            } => {
1908                accum.is_zero()
1909                    && pos_infs.is_zero()
1910                    && neg_infs.is_zero()
1911                    && nans.is_zero()
1912                    && non_nulls.is_zero()
1913            }
1914            Accum::Numeric {
1915                accum,
1916                pos_infs,
1917                neg_infs,
1918                nans,
1919                non_nulls,
1920            } => {
1921                accum.0.is_zero()
1922                    && pos_infs.is_zero()
1923                    && neg_infs.is_zero()
1924                    && nans.is_zero()
1925                    && non_nulls.is_zero()
1926            }
1927        }
1928    }
1929}
1930
1931impl Semigroup for Accum {
1932    fn plus_equals(&mut self, other: &Accum) {
1933        match (&mut *self, other) {
1934            (
1935                Accum::Bool { trues, falses },
1936                Accum::Bool {
1937                    trues: other_trues,
1938                    falses: other_falses,
1939                },
1940            ) => {
1941                *trues += other_trues;
1942                *falses += other_falses;
1943            }
1944            (
1945                Accum::SimpleNumber { accum, non_nulls },
1946                Accum::SimpleNumber {
1947                    accum: other_accum,
1948                    non_nulls: other_non_nulls,
1949                },
1950            ) => {
1951                *accum += other_accum;
1952                *non_nulls += other_non_nulls;
1953            }
1954            (
1955                Accum::Float {
1956                    accum,
1957                    pos_infs,
1958                    neg_infs,
1959                    nans,
1960                    non_nulls,
1961                },
1962                Accum::Float {
1963                    accum: other_accum,
1964                    pos_infs: other_pos_infs,
1965                    neg_infs: other_neg_infs,
1966                    nans: other_nans,
1967                    non_nulls: other_non_nulls,
1968                },
1969            ) => {
1970                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1971                    warn!("Float accumulator overflow. Incorrect results possible");
1972                    accum.wrapping_add(*other_accum)
1973                });
1974                *pos_infs += other_pos_infs;
1975                *neg_infs += other_neg_infs;
1976                *nans += other_nans;
1977                *non_nulls += other_non_nulls;
1978            }
1979            (
1980                Accum::Numeric {
1981                    accum,
1982                    pos_infs,
1983                    neg_infs,
1984                    nans,
1985                    non_nulls,
1986                },
1987                Accum::Numeric {
1988                    accum: other_accum,
1989                    pos_infs: other_pos_infs,
1990                    neg_infs: other_neg_infs,
1991                    nans: other_nans,
1992                    non_nulls: other_non_nulls,
1993                },
1994            ) => {
1995                let mut cx_agg = numeric::cx_agg();
1996                cx_agg.add(&mut accum.0, &other_accum.0);
1997                // `rounded` signals we have exceeded the aggregator's max
1998                // precision, which means we've lost commutativity and
1999                // associativity; nothing to be done here, so panic. For more
2000                // context, see the DEC_Rounded definition at
2001                // http://speleotrove.com/decimal/dncont.html
2002                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2003                // Reduce to reclaim unused decimal precision. Note that this
2004                // reduction must happen somewhere to make the following
2005                // invertible:
2006                // ```
2007                // CREATE TABLE a (a numeric);
2008                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2009                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2010                // ```
2011                // This will now return infinity. However, we can retract the
2012                // value that blew up its precision:
2013                // ```
2014                // INSERT INTO a VALUES ('-9e-39');
2015                // ```
2016                // This leaves `t`'s aggregator with a value of 9e39. However,
2017                // without doing a reduction, `libdecnum` will store the value
2018                // as 9e39+0e-39, which still exceeds the narrower context's
2019                // precision. By doing the reduction, we can "reclaim" the 39
2020                // digits of precision.
2021                cx_agg.reduce(&mut accum.0);
2022                *pos_infs += other_pos_infs;
2023                *neg_infs += other_neg_infs;
2024                *nans += other_nans;
2025                *non_nulls += other_non_nulls;
2026            }
2027            (l, r) => unreachable!(
2028                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2029            ),
2030        }
2031    }
2032}
2033
2034impl Multiply<Diff> for Accum {
2035    type Output = Accum;
2036
2037    fn multiply(self, factor: &Diff) -> Accum {
2038        let factor = *factor;
2039        match self {
2040            Accum::Bool { trues, falses } => Accum::Bool {
2041                trues: trues * factor,
2042                falses: falses * factor,
2043            },
2044            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2045                accum: accum * AccumCount::from(factor),
2046                non_nulls: non_nulls * factor,
2047            },
2048            Accum::Float {
2049                accum,
2050                pos_infs,
2051                neg_infs,
2052                nans,
2053                non_nulls,
2054            } => Accum::Float {
2055                accum: accum
2056                    .checked_mul(AccumCount::from(factor))
2057                    .unwrap_or_else(|| {
2058                        warn!("Float accumulator overflow. Incorrect results possible");
2059                        accum.wrapping_mul(AccumCount::from(factor))
2060                    }),
2061                pos_infs: pos_infs * factor,
2062                neg_infs: neg_infs * factor,
2063                nans: nans * factor,
2064                non_nulls: non_nulls * factor,
2065            },
2066            Accum::Numeric {
2067                accum,
2068                pos_infs,
2069                neg_infs,
2070                nans,
2071                non_nulls,
2072            } => {
2073                let mut cx = numeric::cx_agg();
2074                let mut f = NumericAgg::from(factor.into_inner());
2075                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2076                // always be an integer, i.e. we are never increasing the
2077                // values' scale.
2078                cx.mul(&mut f, &accum.0);
2079                // `rounded` signals we have exceeded the aggregator's max
2080                // precision, which means we've lost commutativity and
2081                // associativity; nothing to be done here, so panic. For more
2082                // context, see the DEC_Rounded definition at
2083                // http://speleotrove.com/decimal/dncont.html
2084                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2085                Accum::Numeric {
2086                    accum: OrderedDecimal(f),
2087                    pos_infs: pos_infs * factor,
2088                    neg_infs: neg_infs * factor,
2089                    nans: nans * factor,
2090                    non_nulls: non_nulls * factor,
2091                }
2092            }
2093        }
2094    }
2095}
2096
2097impl Columnation for Accum {
2098    type InnerRegion = CopyRegion<Self>;
2099}
2100
2101/// Monoids for in-place compaction of monotonic streams.
2102mod monoids {
2103
2104    // We can improve the performance of some aggregations through the use of algebra.
2105    // In particular, we can move some of the aggregations in to the `diff` field of
2106    // updates, by changing `diff` from integers to a different algebraic structure.
2107    //
2108    // The one we use is called a "semigroup", and it means that the structure has a
2109    // symmetric addition operator. The trait we use also allows the semigroup elements
2110    // to present as "zero", meaning they always act as the identity under +. Here,
2111    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2112    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2113    // of DD, this Semigroup should _not_ have a zero.
2114    //
2115    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2116    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2117    // assumes this.
2118
2119    use columnation::{Columnation, Region};
2120    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2121    use mz_expr::AggregateFunc;
2122    use mz_ore::soft_panic_or_log;
2123    use mz_repr::{Datum, Diff, Row};
2124    use serde::{Deserialize, Serialize};
2125
2126    /// A monoid containing a single-datum row.
2127    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2128    pub enum ReductionMonoid {
2129        Min(Row),
2130        Max(Row),
2131    }
2132
2133    impl ReductionMonoid {
2134        pub fn finalize(&self) -> &Row {
2135            use ReductionMonoid::*;
2136            match self {
2137                Min(row) | Max(row) => row,
2138            }
2139        }
2140    }
2141
2142    impl Clone for ReductionMonoid {
2143        fn clone(&self) -> Self {
2144            use ReductionMonoid::*;
2145            match self {
2146                Min(row) => Min(row.clone()),
2147                Max(row) => Max(row.clone()),
2148            }
2149        }
2150
2151        fn clone_from(&mut self, source: &Self) {
2152            use ReductionMonoid::*;
2153
2154            let mut row = std::mem::take(match self {
2155                Min(row) | Max(row) => row,
2156            });
2157
2158            let source_row = match source {
2159                Min(row) | Max(row) => row,
2160            };
2161
2162            row.clone_from(source_row);
2163
2164            match source {
2165                Min(_) => *self = Min(row),
2166                Max(_) => *self = Max(row),
2167            }
2168        }
2169    }
2170
2171    impl Multiply<Diff> for ReductionMonoid {
2172        type Output = Self;
2173
2174        fn multiply(self, factor: &Diff) -> Self {
2175            // Multiplication in ReductionMonoid is idempotent, and
2176            // its users must ascertain its monotonicity beforehand
2177            // (typically with ensure_monotonic) since it has no zero
2178            // value for us to use here.
2179            assert!(factor.is_positive());
2180            self
2181        }
2182    }
2183
2184    impl Semigroup for ReductionMonoid {
2185        fn plus_equals(&mut self, rhs: &Self) {
2186            match (self, rhs) {
2187                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2188                    let swap = {
2189                        let lhs_val = lhs.unpack_first();
2190                        let rhs_val = rhs.unpack_first();
2191                        // Datum::Null is the identity, not a small element.
2192                        match (lhs_val, rhs_val) {
2193                            (_, Datum::Null) => false,
2194                            (Datum::Null, _) => true,
2195                            (lhs, rhs) => rhs < lhs,
2196                        }
2197                    };
2198                    if swap {
2199                        lhs.clone_from(rhs);
2200                    }
2201                }
2202                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2203                    let swap = {
2204                        let lhs_val = lhs.unpack_first();
2205                        let rhs_val = rhs.unpack_first();
2206                        // Datum::Null is the identity, not a large element.
2207                        match (lhs_val, rhs_val) {
2208                            (_, Datum::Null) => false,
2209                            (Datum::Null, _) => true,
2210                            (lhs, rhs) => rhs > lhs,
2211                        }
2212                    };
2213                    if swap {
2214                        lhs.clone_from(rhs);
2215                    }
2216                }
2217                (lhs, rhs) => {
2218                    soft_panic_or_log!(
2219                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2220                    );
2221                }
2222            }
2223        }
2224    }
2225
2226    impl IsZero for ReductionMonoid {
2227        fn is_zero(&self) -> bool {
2228            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2229            // DD uses true results of this method to make stuff disappear. This makes sense when
2230            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2231            // We don't want funny stuff, like disappearing, happening to reduction results even
2232            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2233            false
2234        }
2235    }
2236
2237    impl Columnation for ReductionMonoid {
2238        type InnerRegion = ReductionMonoidRegion;
2239    }
2240
2241    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2242    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2243    /// the former for simplicity reasons.
2244    #[derive(Default)]
2245    pub struct ReductionMonoidRegion {
2246        inner: <Row as Columnation>::InnerRegion,
2247    }
2248
2249    impl Region for ReductionMonoidRegion {
2250        type Item = ReductionMonoid;
2251
2252        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2253            use ReductionMonoid::*;
2254            match item {
2255                Min(row) => Min(unsafe { self.inner.copy(row) }),
2256                Max(row) => Max(unsafe { self.inner.copy(row) }),
2257            }
2258        }
2259
2260        fn clear(&mut self) {
2261            self.inner.clear();
2262        }
2263
2264        fn reserve_items<'a, I>(&mut self, items: I)
2265        where
2266            Self: 'a,
2267            I: Iterator<Item = &'a Self::Item> + Clone,
2268        {
2269            self.inner
2270                .reserve_items(items.map(ReductionMonoid::finalize));
2271        }
2272
2273        fn reserve_regions<'a, I>(&mut self, regions: I)
2274        where
2275            Self: 'a,
2276            I: Iterator<Item = &'a Self> + Clone,
2277        {
2278            self.inner.reserve_regions(regions.map(|r| &r.inner));
2279        }
2280
2281        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2282            self.inner.heap_size(callback);
2283        }
2284    }
2285
2286    /// Get the correct monoid implementation for a given aggregation function. Note that
2287    /// all hierarchical aggregation functions need to supply a monoid implementation.
2288    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2289        match func {
2290            AggregateFunc::MaxNumeric
2291            | AggregateFunc::MaxInt16
2292            | AggregateFunc::MaxInt32
2293            | AggregateFunc::MaxInt64
2294            | AggregateFunc::MaxUInt16
2295            | AggregateFunc::MaxUInt32
2296            | AggregateFunc::MaxUInt64
2297            | AggregateFunc::MaxMzTimestamp
2298            | AggregateFunc::MaxFloat32
2299            | AggregateFunc::MaxFloat64
2300            | AggregateFunc::MaxBool
2301            | AggregateFunc::MaxString
2302            | AggregateFunc::MaxDate
2303            | AggregateFunc::MaxTimestamp
2304            | AggregateFunc::MaxTimestampTz
2305            | AggregateFunc::MaxInterval
2306            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2307            AggregateFunc::MinNumeric
2308            | AggregateFunc::MinInt16
2309            | AggregateFunc::MinInt32
2310            | AggregateFunc::MinInt64
2311            | AggregateFunc::MinUInt16
2312            | AggregateFunc::MinUInt32
2313            | AggregateFunc::MinUInt64
2314            | AggregateFunc::MinMzTimestamp
2315            | AggregateFunc::MinFloat32
2316            | AggregateFunc::MinFloat64
2317            | AggregateFunc::MinBool
2318            | AggregateFunc::MinString
2319            | AggregateFunc::MinDate
2320            | AggregateFunc::MinTimestamp
2321            | AggregateFunc::MinTimestampTz
2322            | AggregateFunc::MinInterval
2323            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2324            AggregateFunc::SumInt16
2325            | AggregateFunc::SumInt32
2326            | AggregateFunc::SumInt64
2327            | AggregateFunc::SumUInt16
2328            | AggregateFunc::SumUInt32
2329            | AggregateFunc::SumUInt64
2330            | AggregateFunc::SumFloat32
2331            | AggregateFunc::SumFloat64
2332            | AggregateFunc::SumNumeric
2333            | AggregateFunc::Count
2334            | AggregateFunc::Any
2335            | AggregateFunc::All
2336            | AggregateFunc::Dummy
2337            | AggregateFunc::JsonbAgg { .. }
2338            | AggregateFunc::JsonbObjectAgg { .. }
2339            | AggregateFunc::MapAgg { .. }
2340            | AggregateFunc::ArrayConcat { .. }
2341            | AggregateFunc::ListConcat { .. }
2342            | AggregateFunc::StringAgg { .. }
2343            | AggregateFunc::RowNumber { .. }
2344            | AggregateFunc::Rank { .. }
2345            | AggregateFunc::DenseRank { .. }
2346            | AggregateFunc::LagLead { .. }
2347            | AggregateFunc::FirstValue { .. }
2348            | AggregateFunc::LastValue { .. }
2349            | AggregateFunc::WindowAggregate { .. }
2350            | AggregateFunc::FusedValueWindowFunc { .. }
2351            | AggregateFunc::FusedWindowAggregate { .. } => None,
2352        }
2353    }
2354}
2355
2356mod window_agg_helpers {
2357    use crate::render::reduce::*;
2358
2359    /// TODO: It would be better for performance to do the branching that is in the methods of this
2360    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2361    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2362    /// directly.
2363    pub enum OneByOneAggrImpls {
2364        Accumulable(AccumulableOneByOneAggr),
2365        Hierarchical(HierarchicalOneByOneAggr),
2366        Basic(mz_expr::NaiveOneByOneAggr),
2367    }
2368
2369    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2370        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2371            match reduction_type(agg) {
2372                ReductionType::Basic => {
2373                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2374                }
2375                ReductionType::Accumulable => {
2376                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2377                }
2378                ReductionType::Hierarchical => {
2379                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2380                }
2381            }
2382        }
2383
2384        fn give(&mut self, d: &Datum) {
2385            match self {
2386                OneByOneAggrImpls::Basic(i) => i.give(d),
2387                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2388                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2389            }
2390        }
2391
2392        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2393            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2394            match self {
2395                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2396                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2397                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2398            }
2399        }
2400    }
2401
2402    pub struct AccumulableOneByOneAggr {
2403        aggr_func: AggregateFunc,
2404        accum: Accum,
2405        total: Diff,
2406    }
2407
2408    impl AccumulableOneByOneAggr {
2409        fn new(aggr_func: &AggregateFunc) -> Self {
2410            AccumulableOneByOneAggr {
2411                aggr_func: aggr_func.clone(),
2412                accum: accumulable_zero(aggr_func),
2413                total: Diff::ZERO,
2414            }
2415        }
2416
2417        fn give(&mut self, d: &Datum) {
2418            self.accum
2419                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2420            self.total += Diff::ONE;
2421        }
2422
2423        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2424            temp_storage.make_datum(|packer| {
2425                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2426            })
2427        }
2428    }
2429
2430    pub struct HierarchicalOneByOneAggr {
2431        aggr_func: AggregateFunc,
2432        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2433        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2434        monoid: ReductionMonoid,
2435    }
2436
2437    impl HierarchicalOneByOneAggr {
2438        fn new(aggr_func: &AggregateFunc) -> Self {
2439            let mut row_buf = Row::default();
2440            row_buf.packer().push(Datum::Null);
2441            HierarchicalOneByOneAggr {
2442                aggr_func: aggr_func.clone(),
2443                monoid: get_monoid(row_buf, aggr_func)
2444                    .expect("aggr_func should be a hierarchical aggregation function"),
2445            }
2446        }
2447
2448        fn give(&mut self, d: &Datum) {
2449            let mut row_buf = Row::default();
2450            row_buf.packer().push(d);
2451            let m = get_monoid(row_buf, &self.aggr_func)
2452                .expect("aggr_func should be a hierarchical aggregation function");
2453            self.monoid.plus_equals(&m);
2454        }
2455
2456        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2457            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2458        }
2459    }
2460}