Skip to main content

mz_compute/render/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reduction dataflow construction.
11//!
12//! Consult [ReducePlan] documentation for details.
13
14use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use columnation::{Columnation, CopyRegion};
18use dec::OrderedDecimal;
19use differential_dataflow::Diff as _;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::BatchContainer;
26use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
27use differential_dataflow::trace::{Builder, Trace};
28use differential_dataflow::{Data, VecCollection};
29use itertools::Itertools;
30use mz_compute_types::plan::reduce::{
31    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32    ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35    AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
38use mz_repr::fixed_length::ToDatumIter;
39use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use tracing::warn;
45
46use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
47use crate::extensions::reduce::{ClearContainer, MzReduce};
48use crate::render::context::{CollectionBundle, Context};
49use crate::render::errors::DataflowErrorSer;
50use crate::render::errors::MaybeValidatingRow;
51use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
52use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
53use crate::row_spine::{
54    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
55};
56use crate::typedefs::{
57    ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58    RowRowSpine, RowSpine, RowValSpine,
59};
60
61impl<'scope, T: RenderTimestamp> Context<'scope, T> {
62    /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to
63    /// minimize worst-case incremental update times and memory footprint.
64    pub fn render_reduce(
65        &self,
66        input_key: Option<Vec<MirScalarExpr>>,
67        input: CollectionBundle<'scope, T>,
68        key_val_plan: KeyValPlan,
69        reduce_plan: ReducePlan,
70        mfp_after: Option<MapFilterProject>,
71    ) -> CollectionBundle<'scope, T> {
72        // Convert `mfp_after` to an actionable plan.
73        let mfp_after = mfp_after.map(|m| {
74            m.into_plan()
75                .expect("MFP planning must succeed")
76                .into_nontemporal()
77                .expect("Fused Reduce MFPs do not have temporal predicates")
78        });
79
80        input.scope().region_named("Reduce", |inner| {
81            let KeyValPlan {
82                mut key_plan,
83                mut val_plan,
84            } = key_val_plan;
85            let key_arity = key_plan.projection.len();
86            let mut datums = DatumVec::new();
87
88            // Determine the columns we'll need from the row.
89            let mut demand = Vec::new();
90            demand.extend(key_plan.demand());
91            demand.extend(val_plan.demand());
92            demand.sort();
93            demand.dedup();
94
95            // remap column references to the subset we use.
96            let mut demand_map = BTreeMap::new();
97            for column in demand.iter() {
98                demand_map.insert(*column, demand_map.len());
99            }
100            let demand_map_len = demand_map.len();
101            key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
102            val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
103            let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
104            let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
105
106            let (key_val_input, err) = input
107                .enter_region(inner)
108                .flat_map::<_, ConsolidatingContainerBuilder<Vec<((Row, Row), T, Diff)>>, _>(
109                    input_key.map(|k| (k, None)),
110                    max_demand,
111                    move |row_datums, time, diff, ok_session, err_session| {
112                        let mut row_builder = SharedRow::get();
113                        let temp_storage = RowArena::new();
114
115                        let mut row_iter = row_datums.drain(..);
116                        let mut datums_local = datums.borrow();
117                        // Unpack only the demanded columns.
118                        for skip in skips.iter() {
119                            datums_local.push(row_iter.nth(*skip).unwrap());
120                        }
121
122                        // Evaluate the key expressions.
123                        let key = key_plan.evaluate_into(
124                            &mut datums_local,
125                            &temp_storage,
126                            &mut row_builder,
127                        );
128                        let key = match key {
129                            Err(e) => {
130                                err_session.give((e.into(), time, diff));
131                                return 1;
132                            }
133                            Ok(Some(key)) => key.clone(),
134                            Ok(None) => panic!("Row expected as no predicate was used"),
135                        };
136
137                        // Evaluate the value expressions.
138                        // The prior evaluation may have left additional columns we should delete.
139                        datums_local.truncate(skips.len());
140                        let val = val_plan.evaluate_into(
141                            &mut datums_local,
142                            &temp_storage,
143                            &mut row_builder,
144                        );
145                        let val = match val {
146                            Err(e) => {
147                                err_session.give((e.into(), time, diff));
148                                return 1;
149                            }
150                            Ok(Some(val)) => val.clone(),
151                            Ok(None) => panic!("Row expected as no predicate was used"),
152                        };
153
154                        ok_session.give(((key, val), time, diff));
155                        1
156                    },
157                );
158
159            // Render the reduce plan
160            self.render_reduce_plan(
161                reduce_plan,
162                key_val_input.as_collection(),
163                err,
164                key_arity,
165                mfp_after,
166            )
167            .leave_region(self.scope)
168        })
169    }
170
171    /// Render a dataflow based on the provided plan.
172    ///
173    /// The output will be an arrangements that looks the same as if
174    /// we just had a single reduce operator computing everything together, and
175    /// this arrangement can also be re-used.
176    fn render_reduce_plan<'s>(
177        &self,
178        plan: ReducePlan,
179        collection: VecCollection<'s, T, (Row, Row), Diff>,
180        err_input: VecCollection<'s, T, DataflowErrorSer, Diff>,
181        key_arity: usize,
182        mfp_after: Option<SafeMfpPlan>,
183    ) -> CollectionBundle<'s, T> {
184        let mut errors = Default::default();
185        let arrangement =
186            self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187        let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188        CollectionBundle::from_columns(
189            0..key_arity,
190            ArrangementFlavor::Local(
191                arrangement,
192                errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193            ),
194        )
195    }
196
197    fn render_reduce_plan_inner<'s>(
198        &self,
199        plan: ReducePlan,
200        collection: VecCollection<'s, T, (Row, Row), Diff>,
201        errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
202        key_arity: usize,
203        mfp_after: Option<SafeMfpPlan>,
204    ) -> Arranged<'s, RowRowAgent<T, Diff>> {
205        // TODO(vmarcos): Arrangement specialization here could eventually be extended to keys,
206        // not only values (database-issues#6658).
207        let arrangement = match plan {
208            // If we have no aggregations or just a single type of reduction, we
209            // can go ahead and render them directly.
210            ReducePlan::Distinct => {
211                let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
212                errors.push(errs);
213                arranged_output
214            }
215            ReducePlan::Accumulable(expr) => {
216                let (arranged_output, errs) =
217                    self.build_accumulable(collection, expr, key_arity, mfp_after);
218                errors.push(errs);
219                arranged_output
220            }
221            ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
222                let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
223                errors.push(errs);
224                output
225            }
226            ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
227                let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
228                errors.push(errs);
229                output
230            }
231            ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
232                expr,
233                fused_unnest_list,
234            })) => {
235                // Note that we skip validating for negative diffs when we have a fused unnest list,
236                // because this is already a CPU-intensive situation due to the non-incrementalness
237                // of window functions.
238                let validating = !fused_unnest_list;
239                let (output, errs) = self.build_basic_aggregate(
240                    collection,
241                    0,
242                    &expr,
243                    validating,
244                    key_arity,
245                    mfp_after,
246                    fused_unnest_list,
247                );
248                if validating {
249                    errors.push(errs.expect("validation should have occurred as it was requested"));
250                }
251                output
252            }
253            ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
254                let (output, errs) =
255                    self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
256                errors.push(errs);
257                output
258            }
259        };
260        arrangement
261    }
262
263    /// Build the dataflow to compute the set of distinct keys.
264    fn build_distinct<'s>(
265        &self,
266        collection: VecCollection<'s, T, (Row, Row), Diff>,
267        mfp_after: Option<SafeMfpPlan>,
268    ) -> (
269        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
270        VecCollection<'s, T, DataflowErrorSer, Diff>,
271    ) {
272        let error_logger = self.error_logger();
273
274        // Allocations for the two closures.
275        let mut datums1 = DatumVec::new();
276        let mut datums2 = DatumVec::new();
277        let mfp_after1 = mfp_after.clone();
278        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
279
280        let arranged = collection
281            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
282                "Arranged DistinctBy",
283            );
284        let output = arranged
285            .clone()
286            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
287                "DistinctBy",
288                move |key, _input, output| {
289                    let temp_storage = RowArena::new();
290                    let mut datums_local = datums1.borrow();
291                    datums_local.extend(key.to_datum_iter());
292
293                    // Note that the key contains all the columns in a `Distinct` and that `mfp_after` is
294                    // required to preserve the key. Therefore, if `mfp_after` maps, then it must project
295                    // back to the key. As a consequence, we can treat `mfp_after` as a filter here.
296                    if mfp_after1
297                        .as_ref()
298                        .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
299                        .unwrap_or(Ok(true))
300                        == Ok(true)
301                    {
302                        // We're pushing a unit value here because the key is implicitly added by the
303                        // arrangement, and the permutation logic takes care of using the key part of the
304                        // output.
305                        output.push((Row::default(), Diff::ONE));
306                    }
307                },
308            );
309        let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
310            "DistinctByErrorCheck",
311            move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowErrorSer, _)>| {
312                for (_, count) in input.iter() {
313                    if count.is_positive() {
314                        continue;
315                    }
316                    let message = "Non-positive multiplicity in DistinctBy";
317                    error_logger.log(message, &format!("row={key:?}, count={count}"));
318                    output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
319                    return;
320                }
321                // If `mfp_after` can error, then evaluate it here.
322                let Some(mfp) = &mfp_after2 else { return };
323                let temp_storage = RowArena::new();
324                let datum_iter = key.to_datum_iter();
325                let mut datums_local = datums2.borrow();
326                datums_local.extend(datum_iter);
327
328                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
329                    output.push((e.into(), Diff::ONE));
330                }
331            },
332        );
333        (output, errors.as_collection(|_k, v| v.clone()))
334    }
335
336    /// Build the dataflow to compute and arrange multiple non-accumulable,
337    /// non-hierarchical aggregations on `input`.
338    ///
339    /// This function assumes that we are explicitly rendering multiple basic aggregations.
340    /// For each aggregate, we render a different reduce operator, and then fuse
341    /// results together into a final arrangement that presents all the results
342    /// in the order specified by `aggrs`.
343    fn build_basic_aggregates<'s>(
344        &self,
345        input: VecCollection<'s, T, (Row, Row), Diff>,
346        aggrs: Vec<AggregateExpr>,
347        key_arity: usize,
348        mfp_after: Option<SafeMfpPlan>,
349    ) -> (
350        RowRowArrangement<'s, T>,
351        VecCollection<'s, T, DataflowErrorSer, Diff>,
352    ) {
353        // We are only using this function to render multiple basic aggregates and
354        // stitch them together. If that's not true we should complain.
355        if aggrs.len() <= 1 {
356            self.error_logger().soft_panic_or_log(
357                "Too few aggregations when building basic aggregates",
358                &format!("len={}", aggrs.len()),
359            )
360        }
361        let mut err_output = None;
362        let mut to_collect = Vec::new();
363        for (index, aggr) in aggrs.into_iter().enumerate() {
364            let (result, errs) = self.build_basic_aggregate(
365                input.clone(),
366                index,
367                &aggr,
368                err_output.is_none(),
369                key_arity,
370                None,
371                false,
372            );
373            if errs.is_some() {
374                err_output = errs
375            }
376            to_collect
377                .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
378        }
379
380        // Allocations for the two closures.
381        let mut datums1 = DatumVec::new();
382        let mut datums2 = DatumVec::new();
383        let mfp_after1 = mfp_after.clone();
384        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
385
386        let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
387            .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
388            "Arranged ReduceFuseBasic input",
389        );
390
391        let output = arranged
392            .clone()
393            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
394                move |key, input, output| {
395                    let temp_storage = RowArena::new();
396                    let datum_iter = key.to_datum_iter();
397                    let mut datums_local = datums1.borrow();
398                    datums_local.extend(datum_iter);
399                    let key_len = datums_local.len();
400
401                    for ((_, row), _) in input.iter() {
402                        datums_local.push(row.unpack_first());
403                    }
404
405                    if let Some(row) =
406                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
407                    {
408                        output.push((row, Diff::ONE));
409                    }
410                }
411            });
412        // If `mfp_after` can error, then we need to render a paired reduction
413        // to scan for these potential errors. Note that we cannot directly use
414        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
415        // conditionally render the second component of the reduction pair.
416        let validation_errs = err_output.expect("expected to validate in at least one aggregate");
417        if let Some(mfp) = mfp_after2 {
418            let mfp_errs = arranged
419                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
420                    "ReduceFuseBasic Error Check",
421                    move |key, input, output| {
422                        // Since negative accumulations are checked in at least one component
423                        // aggregate, we only need to look for MFP errors here.
424                        let temp_storage = RowArena::new();
425                        let datum_iter = key.to_datum_iter();
426                        let mut datums_local = datums2.borrow();
427                        datums_local.extend(datum_iter);
428
429                        for ((_, row), _) in input.iter() {
430                            datums_local.push(row.unpack_first());
431                        }
432
433                        if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
434                            output.push((e.into(), Diff::ONE));
435                        }
436                    },
437                )
438                .as_collection(|_, v| v.clone());
439            (output, validation_errs.concat(mfp_errs))
440        } else {
441            (output, validation_errs)
442        }
443    }
444
445    /// Build the dataflow to compute a single basic aggregation.
446    ///
447    /// This method also applies distinctness if required.
448    fn build_basic_aggregate<'s>(
449        &self,
450        input: VecCollection<'s, T, (Row, Row), Diff>,
451        index: usize,
452        aggr: &AggregateExpr,
453        validating: bool,
454        key_arity: usize,
455        mfp_after: Option<SafeMfpPlan>,
456        fused_unnest_list: bool,
457    ) -> (
458        RowRowArrangement<'s, T>,
459        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
460    ) {
461        let AggregateExpr {
462            func,
463            expr: _,
464            distinct,
465        } = aggr.clone();
466
467        // Extract the value we were asked to aggregate over.
468        let mut partial = input.map(move |(key, row)| {
469            let mut row_builder = SharedRow::get();
470            let value = row.iter().nth(index).unwrap();
471            row_builder.packer().push(value);
472            (key, row_builder.clone())
473        });
474
475        let mut err_output = None;
476
477        // If `distinct` is set, we restrict ourselves to the distinct `(key, val)`.
478        if distinct {
479            // We map `(Row, Row)` to `Row` to take advantage of `Row*Spine` types.
480            let pairer = Pairer::new(key_arity);
481            let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
482            if validating {
483                let (oks, errs) = self
484                    .build_reduce_inaccumulable_distinct::<
485                        RowValBuilder<Result<(), String>, _, _>,
486                        RowValSpine<Result<(), String>, _, _>,
487                    >(keyed, None)
488                    .as_collection(|k, v| {
489                        (
490                            k.to_row(),
491                            v.as_ref()
492                                .map(|&()| ())
493                                .map_err(|m| m.as_str().into()),
494                        )
495                    })
496                    .map_fallible::<
497                        CapacityContainerBuilder<_>,
498                        CapacityContainerBuilder<_>,
499                        _,
500                        _,
501                        _,
502                    >(
503                        "Demux Errors",
504                        move |(key_val, result)| match result {
505                            Ok(()) => Ok(pairer.split(&key_val)),
506                            Err(m) => {
507                                Err(EvalError::Internal(m).into())
508                            }
509                        },
510                    );
511                err_output = Some(errs);
512                partial = oks;
513            } else {
514                partial = self
515                    .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
516                        keyed,
517                        Some(" [val: empty]"),
518                    )
519                    .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
520            }
521        }
522
523        // Allocations for the two closures.
524        let mut datums1 = DatumVec::new();
525        let mut datums2 = DatumVec::new();
526        let mut datums_key_1 = DatumVec::new();
527        let mut datums_key_2 = DatumVec::new();
528        let mfp_after1 = mfp_after.clone();
529        let func2 = func.clone();
530
531        let name = if !fused_unnest_list {
532            "ReduceInaccumulable"
533        } else {
534            "FusedReduceUnnestList"
535        };
536        let arranged = partial
537            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
538                "Arranged {name}"
539            ));
540        let oks = if !fused_unnest_list {
541            arranged
542                .clone()
543                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
544                    move |key, source, target| {
545                        // We respect the multiplicity here (unlike in hierarchical aggregation)
546                        // because we don't know that the aggregation method is not sensitive
547                        // to the number of records.
548                        let iter = source.iter().flat_map(|(v, w)| {
549                            // Note that in the non-positive case, this is wrong, but harmless because
550                            // our other reduction will produce an error.
551                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
552                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
553                        });
554
555                        let temp_storage = RowArena::new();
556                        let datum_iter = key.to_datum_iter();
557                        let mut datums_local = datums1.borrow();
558                        datums_local.extend(datum_iter);
559                        let key_len = datums_local.len();
560                        datums_local.push(
561                        // Note that this is not necessarily a window aggregation, in which case
562                        // `eval_with_fast_window_agg` delegates to the normal `eval`.
563                        func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
564                            iter,
565                            &temp_storage,
566                        ),
567                    );
568
569                        if let Some(row) = evaluate_mfp_after(
570                            &mfp_after1,
571                            &mut datums_local,
572                            &temp_storage,
573                            key_len,
574                        ) {
575                            target.push((row, Diff::ONE));
576                        }
577                    }
578                })
579        } else {
580            arranged
581                .clone()
582                .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
583                    move |key, source, target| {
584                        // This part is the same as in the `!fused_unnest_list` if branch above.
585                        let iter = source.iter().flat_map(|(v, w)| {
586                            let count = usize::try_from(w.into_inner()).unwrap_or(0);
587                            std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
588                        });
589
590                        // This is the part that is specific to the `fused_unnest_list` branch.
591                        let temp_storage = RowArena::new();
592                        let mut datums_local = datums_key_1.borrow();
593                        datums_local.extend(key.to_datum_iter());
594                        let key_len = datums_local.len();
595                        for datum in func
596                            .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
597                                iter,
598                                &temp_storage,
599                            )
600                        {
601                            datums_local.truncate(key_len);
602                            datums_local.push(datum);
603                            if let Some(row) = evaluate_mfp_after(
604                                &mfp_after1,
605                                &mut datums_local,
606                                &temp_storage,
607                                key_len,
608                            ) {
609                                target.push((row, Diff::ONE));
610                            }
611                        }
612                    }
613                })
614        };
615
616        // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here, but
617        // we then wouldn't be able to do this error check conditionally.  See its documentation for the
618        // rationale around using a second reduction here.
619        let must_validate = validating && err_output.is_none();
620        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
621        if must_validate || mfp_after2.is_some() {
622            let error_logger = self.error_logger();
623
624            let errs = if !fused_unnest_list {
625                arranged
626                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
627                        &format!("{name} Error Check"),
628                        move |key, source, target| {
629                            // Negative counts would be surprising, but until we are 100% certain we won't
630                            // see them, we should report when we do. We may want to bake even more info
631                            // in here in the future.
632                            if must_validate {
633                                for (value, count) in source.iter() {
634                                    if count.is_positive() {
635                                        continue;
636                                    }
637                                    let value = value.to_row();
638                                    let message =
639                                        "Non-positive accumulation in ReduceInaccumulable";
640                                    error_logger
641                                        .log(message, &format!("value={value:?}, count={count}"));
642                                    let err = EvalError::Internal(message.into());
643                                    target.push((err.into(), Diff::ONE));
644                                    return;
645                                }
646                            }
647
648                            // We know that `mfp_after` can error if it exists, so try to evaluate it here.
649                            let Some(mfp) = &mfp_after2 else { return };
650                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
651                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
652                                // This would ideally use `to_datum_iter` but we cannot as it needs to
653                                // borrow `v` and only presents datums with that lifetime, not any longer.
654                                std::iter::repeat(v.next().unwrap()).take(count)
655                            });
656
657                            let temp_storage = RowArena::new();
658                            let datum_iter = key.to_datum_iter();
659                            let mut datums_local = datums2.borrow();
660                            datums_local.extend(datum_iter);
661                            datums_local.push(
662                                func2.eval_with_fast_window_agg::<
663                                    _,
664                                    window_agg_helpers::OneByOneAggrImpls,
665                                >(
666                                    iter, &temp_storage
667                                ),
668                            );
669                            if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
670                                target.push((e.into(), Diff::ONE));
671                            }
672                        },
673                    )
674                    .as_collection(|_, v| v.clone())
675            } else {
676                // `render_reduce_plan_inner` doesn't request validation when `fused_unnest_list`.
677                assert!(!must_validate);
678                // We couldn't have got into this if branch due to `must_validate`, so it must be
679                // because of the `mfp_after2.is_some()`.
680                let Some(mfp) = mfp_after2 else {
681                    unreachable!()
682                };
683                arranged
684                    .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
685                        &format!("{name} Error Check"),
686                        move |key, source, target| {
687                            let iter = source.iter().flat_map(|&(mut v, ref w)| {
688                                let count = usize::try_from(w.into_inner()).unwrap_or(0);
689                                // This would ideally use `to_datum_iter` but we cannot as it needs to
690                                // borrow `v` and only presents datums with that lifetime, not any longer.
691                                std::iter::repeat(v.next().unwrap()).take(count)
692                            });
693
694                            let temp_storage = RowArena::new();
695                            let mut datums_local = datums_key_2.borrow();
696                            datums_local.extend(key.to_datum_iter());
697                            let key_len = datums_local.len();
698                            for datum in func2
699                                .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
700                                    iter,
701                                    &temp_storage,
702                                )
703                            {
704                                datums_local.truncate(key_len);
705                                datums_local.push(datum);
706                                // We know that `mfp` can error (because of the `could_error` call
707                                // above), so try to evaluate it here.
708                                if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
709                                {
710                                    target.push((e.into(), Diff::ONE));
711                                }
712                            }
713                        },
714                    )
715                    .as_collection(|_, v| v.clone())
716            };
717
718            if let Some(e) = err_output {
719                err_output = Some(e.concat(errs));
720            } else {
721                err_output = Some(errs);
722            }
723        }
724        (oks, err_output)
725    }
726
727    fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
728        &self,
729        input: VecCollection<'s, T, Row, Diff>,
730        name_tag: Option<&str>,
731    ) -> Arranged<'s, TraceAgent<Tr>>
732    where
733        Tr: for<'a> Trace<
734                Key<'a> = DatumSeq<'a>,
735                KeyContainer: BatchContainer<Owned = Row>,
736                Time = T,
737                Diff = Diff,
738                ValOwn: Data + MaybeValidatingRow<(), String>,
739            > + 'static,
740        Bu: Builder<
741                Time = T,
742                Input: Container
743                           + InternalMerge
744                           + ClearContainer
745                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
746                Output = Tr::Batch,
747            >,
748        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
749    {
750        let error_logger = self.error_logger();
751
752        let output_name = format!(
753            "ReduceInaccumulable Distinct{}",
754            name_tag.unwrap_or_default()
755        );
756
757        let input: KeyCollection<_, _, _> = input.into();
758        input
759            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
760                "Arranged ReduceInaccumulable Distinct [val: empty]",
761            )
762            .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
763                if let Some(err) = Tr::ValOwn::into_error() {
764                    for (value, count) in source.iter() {
765                        if count.is_positive() {
766                            continue;
767                        }
768
769                        let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
770                        error_logger.log(message, &format!("value={value:?}, count={count}"));
771                        t.push((err(message.to_string()), Diff::ONE));
772                        return;
773                    }
774                }
775                t.push((Tr::ValOwn::ok(()), Diff::ONE))
776            })
777    }
778
779    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
780    /// on non-monotonic inputs.
781    ///
782    /// This function renders a single reduction tree that computes aggregations with
783    /// a priority queue implemented with a series of reduce operators that partition
784    /// the input into buckets, and compute the aggregation over very small buckets
785    /// and feed the results up to larger buckets.
786    ///
787    /// Note that this implementation currently ignores the distinct bit because we
788    /// currently only perform min / max hierarchically and the reduction tree
789    /// efficiently suppresses non-distinct updates.
790    ///
791    /// `buckets` indicates the number of buckets in this stage. We do some non-obvious
792    /// trickery here to limit the memory usage per layer by internally
793    /// holding only the elements that were rejected by this stage. However, the
794    /// output collection maintains the `((key, bucket), (passing value)` for this
795    /// stage.
796    fn build_bucketed<'s>(
797        &self,
798        input: VecCollection<'s, T, (Row, Row), Diff>,
799        BucketedPlan {
800            aggr_funcs,
801            buckets,
802        }: BucketedPlan,
803        key_arity: usize,
804        mfp_after: Option<SafeMfpPlan>,
805    ) -> (
806        RowRowArrangement<'s, T>,
807        VecCollection<'s, T, DataflowErrorSer, Diff>,
808    ) {
809        let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
810        let outer_scope = input.scope();
811        let arranged_output = outer_scope
812            .clone()
813            .region_named("ReduceHierarchical", |inner| {
814                let input = input.enter(inner);
815
816                // The first mod to apply to the hash.
817                let first_mod = buckets.get(0).copied().unwrap_or(1);
818                let aggregations = aggr_funcs.len();
819
820                // Gather the relevant keys with their hashes along with values ordered by aggregation_index.
821                let mut stage = input.map(move |(key, row)| {
822                    let mut row_builder = SharedRow::get();
823                    let mut row_packer = row_builder.packer();
824                    row_packer.extend(row.iter().take(aggregations));
825                    let values = row_builder.clone();
826
827                    // Apply the initial mod here.
828                    let hash = values.hashed() % first_mod;
829                    let hash_key =
830                        row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
831                    (hash_key, values)
832                });
833
834                // Repeatedly apply hierarchical reduction with a progressively coarser key.
835                for (index, b) in buckets.into_iter().enumerate() {
836                    // Apply subsequent bucket mods for all but the first round.
837                    let input = if index == 0 {
838                        stage
839                    } else {
840                        stage.map(move |(hash_key, values)| {
841                            let mut hash_key_iter = hash_key.iter();
842                            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
843                            // TODO: Convert the `chain(hash_key_iter...)` into a memcpy.
844                            let hash_key = SharedRow::pack(
845                                std::iter::once(Datum::from(hash))
846                                    .chain(hash_key_iter.take(key_arity)),
847                            );
848                            (hash_key, values)
849                        })
850                    };
851
852                    // We only want the first stage to perform validation of whether invalid accumulations
853                    // were observed in the input. Subsequently, we will either produce an error in the error
854                    // stream or produce correct data in the output stream.
855                    let validating = err_output.is_none();
856
857                    let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
858                    if let Some(errs) = errs {
859                        err_output = Some(errs.leave_region(outer_scope));
860                    }
861                    stage = oks
862                }
863
864                // Discard the hash from the key and return to the format of the input data.
865                let partial = stage.map(move |(hash_key, values)| {
866                    let mut hash_key_iter = hash_key.iter();
867                    let _hash = hash_key_iter.next();
868                    (SharedRow::pack(hash_key_iter.take(key_arity)), values)
869                });
870
871                // Allocations for the two closures.
872                let mut datums1 = DatumVec::new();
873                let mut datums2 = DatumVec::new();
874                let mfp_after1 = mfp_after.clone();
875                let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
876                let aggr_funcs2 = aggr_funcs.clone();
877
878                // Build a series of stages for the reduction
879                // Arrange the final result into (key, Row)
880                let error_logger = self.error_logger();
881                // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
882                // view mz_introspection.mz_expected_group_size_advice.
883                let arranged = partial
884                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
885                        "Arrange ReduceMinsMaxes",
886                    );
887                // Note that we would prefer to use `mz_timely_util::reduce::ReduceExt::reduce_pair` here,
888                // but we then wouldn't be able to do this error check conditionally.  See its documentation
889                // for the rationale around using a second reduction here.
890                let must_validate = err_output.is_none();
891                if must_validate || mfp_after2.is_some() {
892                    let errs = arranged
893                        .clone()
894                        .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
895                            "ReduceMinsMaxes Error Check",
896                            move |key, source, target| {
897                                // Negative counts would be surprising, but until we are 100% certain we wont
898                                // see them, we should report when we do. We may want to bake even more info
899                                // in here in the future.
900                                if must_validate {
901                                    for (val, count) in source.iter() {
902                                        if count.is_positive() {
903                                            continue;
904                                        }
905                                        let val = val.to_row();
906                                        let message =
907                                            "Non-positive accumulation in ReduceMinsMaxes";
908                                        error_logger
909                                            .log(message, &format!("val={val:?}, count={count}"));
910                                        target.push((
911                                            EvalError::Internal(message.into()).into(),
912                                            Diff::ONE,
913                                        ));
914                                        return;
915                                    }
916                                }
917
918                                // We know that `mfp_after` can error if it exists, so try to evaluate it here.
919                                let Some(mfp) = &mfp_after2 else { return };
920                                let temp_storage = RowArena::new();
921                                let datum_iter = key.to_datum_iter();
922                                let mut datums_local = datums2.borrow();
923                                datums_local.extend(datum_iter);
924
925                                let mut source_iters = source
926                                    .iter()
927                                    .map(|(values, _cnt)| *values)
928                                    .collect::<Vec<_>>();
929                                for func in aggr_funcs2.iter() {
930                                    let column_iter = (0..source_iters.len())
931                                        .map(|i| source_iters[i].next().unwrap());
932                                    datums_local.push(func.eval(column_iter, &temp_storage));
933                                }
934                                if let Result::Err(e) =
935                                    mfp.evaluate_inner(&mut datums_local, &temp_storage)
936                                {
937                                    target.push((e.into(), Diff::ONE));
938                                }
939                            },
940                        )
941                        .as_collection(|_, v| v.clone())
942                        .leave_region(outer_scope);
943                    if let Some(e) = err_output.take() {
944                        err_output = Some(e.concat(errs));
945                    } else {
946                        err_output = Some(errs);
947                    }
948                }
949                arranged
950                    .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
951                        "ReduceMinsMaxes",
952                        move |key, source, target| {
953                            let temp_storage = RowArena::new();
954                            let datum_iter = key.to_datum_iter();
955                            let mut datums_local = datums1.borrow();
956                            datums_local.extend(datum_iter);
957                            let key_len = datums_local.len();
958
959                            let mut source_iters = source
960                                .iter()
961                                .map(|(values, _cnt)| *values)
962                                .collect::<Vec<_>>();
963                            for func in aggr_funcs.iter() {
964                                let column_iter = (0..source_iters.len())
965                                    .map(|i| source_iters[i].next().unwrap());
966                                datums_local.push(func.eval(column_iter, &temp_storage));
967                            }
968
969                            if let Some(row) = evaluate_mfp_after(
970                                &mfp_after1,
971                                &mut datums_local,
972                                &temp_storage,
973                                key_len,
974                            ) {
975                                target.push((row, Diff::ONE));
976                            }
977                        },
978                    )
979                    .leave_region(outer_scope)
980            });
981        (
982            arranged_output,
983            err_output.expect("expected to validate in one level of the hierarchy"),
984        )
985    }
986
987    /// Build a bucketed stage fragment that wraps [`Self::build_bucketed_negated_output`], and
988    /// adds validation if `validating` is true. It returns the consolidated inputs concatenated
989    /// with the negation of what's produced by the reduction.
990    /// `validating` indicates whether we want this stage to perform error detection
991    /// for invalid accumulations. Once a stage is clean of such errors, subsequent
992    /// stages can skip validation.
993    fn build_bucketed_stage<'s>(
994        &self,
995        aggr_funcs: &Vec<AggregateFunc>,
996        input: VecCollection<'s, T, (Row, Row), Diff>,
997        validating: bool,
998    ) -> (
999        VecCollection<'s, T, (Row, Row), Diff>,
1000        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
1001    ) {
1002        let (input, negated_output, errs) = if validating {
1003            let (input, reduced) = self
1004                .build_bucketed_negated_output::<
1005                    RowValBuilder<_, _, _>,
1006                    RowValSpine<Result<Row, Row>, _, _>,
1007                >(
1008                    input.clone(),
1009                    aggr_funcs.clone(),
1010                );
1011            let (oks, errs) = reduced
1012                .as_collection(|k, v| (k.to_row(), v.clone()))
1013                .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1014                "Checked Invalid Accumulations",
1015                |(hash_key, result)| match result {
1016                    Err(hash_key) => {
1017                        let mut hash_key_iter = hash_key.iter();
1018                        let _hash = hash_key_iter.next();
1019                        let key = SharedRow::pack(hash_key_iter);
1020                        let message = format!(
1021                            "Invalid data in source, saw non-positive accumulation \
1022                                         for key {key:?} in hierarchical mins-maxes aggregate"
1023                        );
1024                        Err(EvalError::Internal(message.into()).into())
1025                    }
1026                    Ok(values) => Ok((hash_key, values)),
1027                },
1028            );
1029            (input, oks, Some(errs))
1030        } else {
1031            let (input, reduced) = self
1032                .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1033                    input,
1034                    aggr_funcs.clone(),
1035                );
1036            // TODO: Here is a good moment where we could apply the next `mod` calculation. Note
1037            // that we need to apply the mod on both input and oks.
1038            let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1039            (input, oks, None)
1040        };
1041
1042        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1043        let oks = negated_output.concat(input);
1044        (oks, errs)
1045    }
1046
1047    /// Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical
1048    /// aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction,
1049    /// with all diffs in the reduction's output negated.
1050    fn build_bucketed_negated_output<'s, Bu, Tr>(
1051        &self,
1052        input: VecCollection<'s, T, (Row, Row), Diff>,
1053        aggrs: Vec<AggregateFunc>,
1054    ) -> (
1055        Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1056        Arranged<'s, TraceAgent<Tr>>,
1057    )
1058    where
1059        Tr: for<'a> Trace<
1060                Key<'a> = DatumSeq<'a>,
1061                KeyContainer: BatchContainer<Owned = Row>,
1062                ValOwn: Data + MaybeValidatingRow<Row, Row>,
1063                Time = T,
1064                Diff = Diff,
1065            > + 'static,
1066        Bu: Builder<
1067                Time = T,
1068                Input: Container
1069                           + InternalMerge
1070                           + ClearContainer
1071                           + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1072                Output = Tr::Batch,
1073            >,
1074        Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1075    {
1076        let error_logger = self.error_logger();
1077        // NOTE(vmarcos): The input operator name below is used in the tuning advice built-in
1078        // view mz_introspection.mz_expected_group_size_advice.
1079        let arranged_input = input
1080            .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1081                "Arranged MinsMaxesHierarchical input",
1082            );
1083
1084        let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1085            "Reduced Fallibly MinsMaxesHierarchical",
1086            move |key, source, target| {
1087                if let Some(err) = Tr::ValOwn::into_error() {
1088                    // Should negative accumulations reach us, we should loudly complain.
1089                    for (value, count) in source.iter() {
1090                        if count.is_positive() {
1091                            continue;
1092                        }
1093                        error_logger.log(
1094                            "Non-positive accumulation in MinsMaxesHierarchical",
1095                            &format!("key={key:?}, value={value:?}, count={count}"),
1096                        );
1097                        // After complaining, output an error here so that we can eventually
1098                        // report it in an error stream.
1099                        target.push((
1100                            err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1101                            Diff::ONE,
1102                        ));
1103                        return;
1104                    }
1105                }
1106
1107                let mut row_builder = SharedRow::get();
1108                let mut row_packer = row_builder.packer();
1109
1110                let mut source_iters = source
1111                    .iter()
1112                    .map(|(values, _cnt)| *values)
1113                    .collect::<Vec<_>>();
1114                for func in aggrs.iter() {
1115                    let column_iter =
1116                        (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1117                    row_packer.push(func.eval(column_iter, &RowArena::new()));
1118                }
1119                // We only want to arrange the parts of the input that are not part of the output.
1120                // More specifically, we want to arrange it so that `input.concat(&output.negate())`
1121                // gives us the intended value of this aggregate function. Also we assume that regardless
1122                // of the multiplicity of the final result in the input, we only want to have one copy
1123                // in the output.
1124                target.reserve(source.len().saturating_add(1));
1125                target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1126                target.extend(source.iter().map(|(values, cnt)| {
1127                    let mut cnt = *cnt;
1128                    cnt.negate();
1129                    (Tr::ValOwn::ok(values.to_row()), cnt)
1130                }));
1131            },
1132        );
1133        (arranged_input, reduced)
1134    }
1135
1136    /// Build the dataflow to compute and arrange multiple hierarchical aggregations
1137    /// on monotonic inputs.
1138    fn build_monotonic<'s>(
1139        &self,
1140        collection: VecCollection<'s, T, (Row, Row), Diff>,
1141        MonotonicPlan {
1142            aggr_funcs,
1143            must_consolidate,
1144        }: MonotonicPlan,
1145        mfp_after: Option<SafeMfpPlan>,
1146    ) -> (
1147        RowRowArrangement<'s, T>,
1148        VecCollection<'s, T, DataflowErrorSer, Diff>,
1149    ) {
1150        let aggregations = aggr_funcs.len();
1151        // Gather the relevant values into a vec of rows ordered by aggregation_index
1152        let collection = collection
1153            .map(move |(key, row)| {
1154                let mut row_builder = SharedRow::get();
1155                let mut values = Vec::with_capacity(aggregations);
1156                values.extend(
1157                    row.iter()
1158                        .take(aggregations)
1159                        .map(|v| row_builder.pack_using(std::iter::once(v))),
1160                );
1161
1162                (key, values)
1163            })
1164            .consolidate_named_if::<KeyBatcher<_, _, _>>(
1165                must_consolidate,
1166                "Consolidated ReduceMonotonic input",
1167            );
1168
1169        // It should be now possible to ensure that we have a monotonic collection.
1170        let error_logger = self.error_logger();
1171        let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1172            error_logger.log(
1173                "Non-monotonic input to ReduceMonotonic",
1174                &format!("data={data:?}, diff={diff}"),
1175            );
1176            let m = "tried to build a monotonic reduction on non-monotonic input".into();
1177            (EvalError::Internal(m).into(), Diff::ONE)
1178        });
1179        // We can place our rows directly into the diff field, and
1180        // only keep the relevant one corresponding to evaluating our
1181        // aggregate, instead of having to do a hierarchical reduction.
1182        let partial = partial.explode_one(move |(key, values)| {
1183            let mut output = Vec::new();
1184            for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1185                output.push(monoids::get_monoid(row, func).expect(
1186                    "hierarchical aggregations are expected to have monoid implementations",
1187                ));
1188            }
1189            (key, output)
1190        });
1191
1192        // Allocations for the two closures.
1193        let mut datums1 = DatumVec::new();
1194        let mut datums2 = DatumVec::new();
1195        let mfp_after1 = mfp_after.clone();
1196        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1197
1198        let partial: KeyCollection<_, _, _> = partial.into();
1199        let arranged = partial
1200            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1201                "ArrangeMonotonic [val: empty]",
1202            );
1203        let output = arranged
1204            .clone()
1205            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1206                move |key, input, output| {
1207                    let temp_storage = RowArena::new();
1208                    let datum_iter = key.to_datum_iter();
1209                    let mut datums_local = datums1.borrow();
1210                    datums_local.extend(datum_iter);
1211                    let key_len = datums_local.len();
1212                    let accum = &input[0].1;
1213                    for monoid in accum.iter() {
1214                        datums_local.extend(monoid.finalize().iter());
1215                    }
1216
1217                    if let Some(row) =
1218                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1219                    {
1220                        output.push((row, Diff::ONE));
1221                    }
1222                }
1223            });
1224
1225        // If `mfp_after` can error, then we need to render a paired reduction
1226        // to scan for these potential errors. Note that we cannot directly use
1227        // `mz_timely_util::reduce::ReduceExt::reduce_pair` here because we only
1228        // conditionally render the second component of the reduction pair.
1229        if let Some(mfp) = mfp_after2 {
1230            let mfp_errs = arranged
1231                .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1232                    "ReduceMonotonic Error Check",
1233                    move |key, input, output| {
1234                        let temp_storage = RowArena::new();
1235                        let datum_iter = key.to_datum_iter();
1236                        let mut datums_local = datums2.borrow();
1237                        datums_local.extend(datum_iter);
1238                        let accum = &input[0].1;
1239                        for monoid in accum.iter() {
1240                            datums_local.extend(monoid.finalize().iter());
1241                        }
1242                        if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1243                        {
1244                            output.push((e.into(), Diff::ONE));
1245                        }
1246                    },
1247                )
1248                .as_collection(|_k, v| v.clone());
1249            (output, validation_errs.concat(mfp_errs))
1250        } else {
1251            (output, validation_errs)
1252        }
1253    }
1254
1255    /// Build the dataflow to compute and arrange multiple accumulable aggregations.
1256    ///
1257    /// The incoming values are moved to the update's "difference" field, at which point
1258    /// they can be accumulated in place. The `count` operator promotes the accumulated
1259    /// values to data, at which point a final map applies operator-specific logic to
1260    /// yield the final aggregate.
1261    fn build_accumulable<'s>(
1262        &self,
1263        collection: VecCollection<'s, T, (Row, Row), Diff>,
1264        AccumulablePlan {
1265            full_aggrs,
1266            simple_aggrs,
1267            distinct_aggrs,
1268        }: AccumulablePlan,
1269        key_arity: usize,
1270        mfp_after: Option<SafeMfpPlan>,
1271    ) -> (
1272        RowRowArrangement<'s, T>,
1273        VecCollection<'s, T, DataflowErrorSer, Diff>,
1274    ) {
1275        let collection_scope = collection.scope();
1276
1277        // we must have called this function with something to reduce
1278        if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1279            self.error_logger().soft_panic_or_log(
1280                "Incorrect numbers of aggregates in accummulable reduction rendering",
1281                &format!(
1282                    "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1283                    full_aggrs.len(),
1284                    simple_aggrs.len(),
1285                    distinct_aggrs.len(),
1286                ),
1287            );
1288        }
1289
1290        // Some of the aggregations may have the `distinct` bit set, which means that they'll
1291        // need to be extracted from `collection` and be subjected to `distinct` with `key`.
1292        // Other aggregations can be directly moved in to the `diff` field.
1293        //
1294        // In each case, the resulting collection should have `data` shaped as `(key, ())`
1295        // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
1296        // generally the count, and then two aggregation-specific values. The size could be
1297        // reduced if we want to specialize for the aggregations.
1298
1299        // Instantiate a default vector for diffs with the correct types at each
1300        // position.
1301        let zero_diffs: (Vec<_>, Diff) = (
1302            full_aggrs
1303                .iter()
1304                .map(|f| accumulable_zero(&f.func))
1305                .collect(),
1306            Diff::ZERO,
1307        );
1308
1309        let mut to_aggregate = Vec::new();
1310        if simple_aggrs.len() > 0 {
1311            // First, collect all non-distinct aggregations in one pass.
1312            let collection = collection.clone();
1313            let easy_cases = collection.explode_one({
1314                let zero_diffs = zero_diffs.clone();
1315                move |(key, row)| {
1316                    let mut diffs = zero_diffs.clone();
1317                    // Try to unpack only the datums we need. Unfortunately, since we
1318                    // can't random access into a Row, we have to iterate through one by one.
1319                    // TODO: Even though we don't have random access, we could still avoid unpacking
1320                    // everything that we don't care about, and it might be worth it to extend the
1321                    // Row API to do that.
1322                    let mut row_iter = row.iter().enumerate();
1323                    for (datum_index, aggr) in simple_aggrs.iter() {
1324                        let mut datum = row_iter.next().unwrap();
1325                        while datum_index != &datum.0 {
1326                            datum = row_iter.next().unwrap();
1327                        }
1328                        let datum = datum.1;
1329                        diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1330                        diffs.1 = Diff::ONE;
1331                    }
1332                    ((key, ()), diffs)
1333                }
1334            });
1335            to_aggregate.push(easy_cases);
1336        }
1337
1338        // Next, collect all aggregations that require distinctness.
1339        for (datum_index, aggr) in distinct_aggrs.into_iter() {
1340            let pairer = Pairer::new(key_arity);
1341            let collection = collection
1342                .clone()
1343                .map(move |(key, row)| {
1344                    let value = row.iter().nth(datum_index).unwrap();
1345                    (pairer.merge(&key, std::iter::once(value)), ())
1346                })
1347                .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1348                    "Arranged Accumulable Distinct [val: empty]",
1349                )
1350                .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1351                    "Reduced Accumulable Distinct [val: empty]",
1352                    move |_k, _s, t| t.push(((), Diff::ONE)),
1353                )
1354                .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1355                .explode_one({
1356                    let zero_diffs = zero_diffs.clone();
1357                    move |(key, row)| {
1358                        let datum = row.iter().next().unwrap();
1359                        let mut diffs = zero_diffs.clone();
1360                        diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1361                        diffs.1 = Diff::ONE;
1362                        ((key, ()), diffs)
1363                    }
1364                });
1365            to_aggregate.push(collection);
1366        }
1367
1368        // now concatenate, if necessary, multiple aggregations
1369        let collection = if to_aggregate.len() == 1 {
1370            to_aggregate.remove(0)
1371        } else {
1372            differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1373        };
1374
1375        // Allocations for the two closures.
1376        let mut datums1 = DatumVec::new();
1377        let mut datums2 = DatumVec::new();
1378        let mfp_after1 = mfp_after.clone();
1379        let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1380        let full_aggrs2 = full_aggrs.clone();
1381
1382        let error_logger = self.error_logger();
1383        let err_full_aggrs = full_aggrs.clone();
1384        let arranged = collection
1385            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, (Vec<Accum>, Diff)>>(
1386                "ArrangeAccumulable [val: empty]",
1387            );
1388        let arranged_output = arranged
1389            .clone()
1390            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1391                move |key, input, output| {
1392                    let (ref accums, total) = input[0].1;
1393
1394                    let temp_storage = RowArena::new();
1395                    let datum_iter = key.to_datum_iter();
1396                    let mut datums_local = datums1.borrow();
1397                    datums_local.extend(datum_iter);
1398                    let key_len = datums_local.len();
1399                    for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1400                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1401                    }
1402
1403                    if let Some(row) =
1404                        evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1405                    {
1406                        output.push((row, Diff::ONE));
1407                    }
1408                }
1409            });
1410        let arranged_errs = arranged
1411            .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1412                "AccumulableErrorCheck",
1413                move |key, input, output| {
1414                    let (ref accums, total) = input[0].1;
1415                    for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1416                        // We first test here if inputs without net-positive records are present,
1417                        // producing an error to the logs and to the query output if that is the case.
1418                        if total == Diff::ZERO && !accum.is_zero() {
1419                            error_logger.log(
1420                                "Net-zero records with non-zero accumulation in ReduceAccumulable",
1421                                &format!("aggr={aggr:?}, accum={accum:?}"),
1422                            );
1423                            let key = key.to_row();
1424                            let message = format!(
1425                                "Invalid data in source, saw net-zero records for key {key} \
1426                                 with non-zero accumulation in accumulable aggregate"
1427                            );
1428                            output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1429                        }
1430                        match (&aggr.func, &accum) {
1431                            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1432                            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1433                            | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1434                                if accum.is_negative() {
1435                                    error_logger.log(
1436                                    "Invalid negative unsigned aggregation in ReduceAccumulable",
1437                                    &format!("aggr={aggr:?}, accum={accum:?}"),
1438                                );
1439                                    let key = key.to_row();
1440                                    let message = format!(
1441                                        "Invalid data in source, saw negative accumulation with \
1442                                         unsigned type for key {key}"
1443                                    );
1444                                    let err = EvalError::Internal(message.into());
1445                                    output.push((err.into(), Diff::ONE));
1446                                }
1447                            }
1448                            _ => (), // no more errors to check for at this point!
1449                        }
1450                    }
1451
1452                    // If `mfp_after` can error, then evaluate it here.
1453                    let Some(mfp) = &mfp_after2 else { return };
1454                    let temp_storage = RowArena::new();
1455                    let datum_iter = key.to_datum_iter();
1456                    let mut datums_local = datums2.borrow();
1457                    datums_local.extend(datum_iter);
1458                    for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1459                        datums_local.push(finalize_accum(&aggr.func, accum, total));
1460                    }
1461
1462                    if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1463                        output.push((e.into(), Diff::ONE));
1464                    }
1465                },
1466            );
1467        (
1468            arranged_output,
1469            arranged_errs.as_collection(|_key, error| error.clone()),
1470        )
1471    }
1472}
1473
1474/// Evaluates the fused MFP, if one exists, on a reconstructed `DatumVecBorrow`
1475/// containing key and aggregate values, then returns a result `Row` or `None`
1476/// if the MFP filters the result out.
1477fn evaluate_mfp_after<'a, 'b>(
1478    mfp_after: &'a Option<SafeMfpPlan>,
1479    datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1480    temp_storage: &'a RowArena,
1481    key_len: usize,
1482) -> Option<Row> {
1483    let mut row_builder = SharedRow::get();
1484    // Apply MFP if it exists and pack a Row of
1485    // aggregate values from `datums_local`.
1486    if let Some(mfp) = mfp_after {
1487        // It must ignore errors here, but they are scanned
1488        // for elsewhere if the MFP can error.
1489        if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1490            // The `mfp_after` must preserve the key columns,
1491            // so we can skip them to form aggregation results.
1492            Some(row_builder.pack_using(iter.skip(key_len)))
1493        } else {
1494            None
1495        }
1496    } else {
1497        Some(row_builder.pack_using(&datums_local[key_len..]))
1498    }
1499}
1500
1501fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1502    match aggr_func {
1503        AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1504            trues: Diff::ZERO,
1505            falses: Diff::ZERO,
1506        },
1507        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1508            accum: AccumCount::ZERO,
1509            pos_infs: Diff::ZERO,
1510            neg_infs: Diff::ZERO,
1511            nans: Diff::ZERO,
1512            non_nulls: Diff::ZERO,
1513        },
1514        AggregateFunc::SumNumeric => Accum::Numeric {
1515            accum: OrderedDecimal(NumericAgg::zero()),
1516            pos_infs: Diff::ZERO,
1517            neg_infs: Diff::ZERO,
1518            nans: Diff::ZERO,
1519            non_nulls: Diff::ZERO,
1520        },
1521        _ => Accum::SimpleNumber {
1522            accum: AccumCount::ZERO,
1523            non_nulls: Diff::ZERO,
1524        },
1525    }
1526}
1527
1528static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1529
1530fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1531    match aggregate_func {
1532        AggregateFunc::Count => Accum::SimpleNumber {
1533            accum: AccumCount::ZERO, // unused for AggregateFunc::Count
1534            non_nulls: if datum.is_null() {
1535                Diff::ZERO
1536            } else {
1537                Diff::ONE
1538            },
1539        },
1540        AggregateFunc::Any | AggregateFunc::All => match datum {
1541            Datum::True => Accum::Bool {
1542                trues: Diff::ONE,
1543                falses: Diff::ZERO,
1544            },
1545            Datum::Null => Accum::Bool {
1546                trues: Diff::ZERO,
1547                falses: Diff::ZERO,
1548            },
1549            Datum::False => Accum::Bool {
1550                trues: Diff::ZERO,
1551                falses: Diff::ONE,
1552            },
1553            x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1554        },
1555        AggregateFunc::Dummy => match datum {
1556            Datum::Dummy => Accum::SimpleNumber {
1557                accum: AccumCount::ZERO,
1558                non_nulls: Diff::ZERO,
1559            },
1560            x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1561        },
1562        AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1563            let n = match datum {
1564                Datum::Float32(n) => f64::from(*n),
1565                Datum::Float64(n) => *n,
1566                Datum::Null => 0f64,
1567                x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1568            };
1569
1570            let nans = Diff::from(n.is_nan());
1571            let pos_infs = Diff::from(n == f64::INFINITY);
1572            let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1573            let non_nulls = Diff::from(datum != Datum::Null);
1574
1575            // Map the floating point value onto a fixed precision domain
1576            // All special values should map to zero, since they are tracked separately
1577            let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1578                AccumCount::ZERO
1579            } else {
1580                // This operation will truncate to i128::MAX if out of range.
1581                // TODO(benesch): rewrite to avoid `as`.
1582                #[allow(clippy::as_conversions)]
1583                { (n * *FLOAT_SCALE) as i128 }.into()
1584            };
1585
1586            Accum::Float {
1587                accum,
1588                pos_infs,
1589                neg_infs,
1590                nans,
1591                non_nulls,
1592            }
1593        }
1594        AggregateFunc::SumNumeric => match datum {
1595            Datum::Numeric(n) => {
1596                let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1597                    if n.0.is_negative() {
1598                        (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1599                    } else {
1600                        (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1601                    }
1602                } else if n.0.is_nan() {
1603                    (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1604                } else {
1605                    // Take a narrow decimal (datum) into a wide decimal
1606                    // (aggregator).
1607                    let mut cx_agg = numeric::cx_agg();
1608                    (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1609                };
1610
1611                Accum::Numeric {
1612                    accum: OrderedDecimal(accum),
1613                    pos_infs,
1614                    neg_infs,
1615                    nans,
1616                    non_nulls: Diff::ONE,
1617                }
1618            }
1619            Datum::Null => Accum::Numeric {
1620                accum: OrderedDecimal(NumericAgg::zero()),
1621                pos_infs: Diff::ZERO,
1622                neg_infs: Diff::ZERO,
1623                nans: Diff::ZERO,
1624                non_nulls: Diff::ZERO,
1625            },
1626            x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1627        },
1628        _ => {
1629            // Other accumulations need to disentangle the accumulable
1630            // value from its NULL-ness, which is not quite as easily
1631            // accumulated.
1632            match datum {
1633                Datum::Int16(i) => Accum::SimpleNumber {
1634                    accum: i.into(),
1635                    non_nulls: Diff::ONE,
1636                },
1637                Datum::Int32(i) => Accum::SimpleNumber {
1638                    accum: i.into(),
1639                    non_nulls: Diff::ONE,
1640                },
1641                Datum::Int64(i) => Accum::SimpleNumber {
1642                    accum: i.into(),
1643                    non_nulls: Diff::ONE,
1644                },
1645                Datum::UInt16(u) => Accum::SimpleNumber {
1646                    accum: u.into(),
1647                    non_nulls: Diff::ONE,
1648                },
1649                Datum::UInt32(u) => Accum::SimpleNumber {
1650                    accum: u.into(),
1651                    non_nulls: Diff::ONE,
1652                },
1653                Datum::UInt64(u) => Accum::SimpleNumber {
1654                    accum: u.into(),
1655                    non_nulls: Diff::ONE,
1656                },
1657                Datum::MzTimestamp(t) => Accum::SimpleNumber {
1658                    accum: u64::from(t).into(),
1659                    non_nulls: Diff::ONE,
1660                },
1661                Datum::Null => Accum::SimpleNumber {
1662                    accum: AccumCount::ZERO,
1663                    non_nulls: Diff::ZERO,
1664                },
1665                x => panic!("Accumulating non-integer data: {x:?}"),
1666            }
1667        }
1668    }
1669}
1670
1671fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1672    // The finished value depends on the aggregation function in a variety of ways.
1673    // For all aggregates but count, if only null values were
1674    // accumulated, then the output is null.
1675    if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1676        Datum::Null
1677    } else {
1678        match (&aggr_func, &accum) {
1679            (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1680                Datum::Int64(non_nulls.into_inner())
1681            }
1682            (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1683                // If any false, else if all true, else must be no false and some nulls.
1684                if falses.is_positive() {
1685                    Datum::False
1686                } else if *trues == total {
1687                    Datum::True
1688                } else {
1689                    Datum::Null
1690                }
1691            }
1692            (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1693                // If any true, else if all false, else must be no true and some nulls.
1694                if trues.is_positive() {
1695                    Datum::True
1696                } else if *falses == total {
1697                    Datum::False
1698                } else {
1699                    Datum::Null
1700                }
1701            }
1702            (AggregateFunc::Dummy, _) => Datum::Dummy,
1703            // If any non-nulls, just report the aggregate.
1704            (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1705            | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1706                // This conversion is safe, as long as we have less than 2^32
1707                // summands.
1708                // TODO(benesch): are we guaranteed to have less than 2^32 summands?
1709                // If so, rewrite to avoid `as`.
1710                #[allow(clippy::as_conversions)]
1711                Datum::Int64(accum.into_inner() as i64)
1712            }
1713            (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1714            (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1715            | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1716                if !accum.is_negative() {
1717                    // Our semantics of overflow are not clearly articulated wrt.
1718                    // unsigned vs. signed types (database-issues#5172). We adopt an
1719                    // unsigned wrapping behavior to match what we do above for
1720                    // signed types.
1721                    // TODO(vmarcos): remove potentially dangerous usage of `as`.
1722                    #[allow(clippy::as_conversions)]
1723                    Datum::UInt64(accum.into_inner() as u64)
1724                } else {
1725                    // Note that we return a value here, but an error in the other
1726                    // operator of the reduce_pair. Therefore, we expect that this
1727                    // value will never be exposed as an output.
1728                    Datum::Null
1729                }
1730            }
1731            (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1732                if !accum.is_negative() {
1733                    Datum::from(*accum)
1734                } else {
1735                    // Note that we return a value here, but an error in the other
1736                    // operator of the reduce_pair. Therefore, we expect that this
1737                    // value will never be exposed as an output.
1738                    Datum::Null
1739                }
1740            }
1741            (
1742                AggregateFunc::SumFloat32,
1743                Accum::Float {
1744                    accum,
1745                    pos_infs,
1746                    neg_infs,
1747                    nans,
1748                    non_nulls: _,
1749                },
1750            ) => {
1751                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1752                    // NaNs are NaNs and cases where we've seen a
1753                    // mixture of positive and negative infinities.
1754                    Datum::from(f32::NAN)
1755                } else if pos_infs.is_positive() {
1756                    Datum::from(f32::INFINITY)
1757                } else if neg_infs.is_positive() {
1758                    Datum::from(f32::NEG_INFINITY)
1759                } else {
1760                    // TODO(benesch): remove potentially dangerous usage of `as`.
1761                    #[allow(clippy::as_conversions)]
1762                    {
1763                        Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1764                    }
1765                }
1766            }
1767            (
1768                AggregateFunc::SumFloat64,
1769                Accum::Float {
1770                    accum,
1771                    pos_infs,
1772                    neg_infs,
1773                    nans,
1774                    non_nulls: _,
1775                },
1776            ) => {
1777                if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1778                    // NaNs are NaNs and cases where we've seen a
1779                    // mixture of positive and negative infinities.
1780                    Datum::from(f64::NAN)
1781                } else if pos_infs.is_positive() {
1782                    Datum::from(f64::INFINITY)
1783                } else if neg_infs.is_positive() {
1784                    Datum::from(f64::NEG_INFINITY)
1785                } else {
1786                    // TODO(benesch): remove potentially dangerous usage of `as`.
1787                    #[allow(clippy::as_conversions)]
1788                    {
1789                        Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1790                    }
1791                }
1792            }
1793            (
1794                AggregateFunc::SumNumeric,
1795                Accum::Numeric {
1796                    accum,
1797                    pos_infs,
1798                    neg_infs,
1799                    nans,
1800                    non_nulls: _,
1801                },
1802            ) => {
1803                let mut cx_datum = numeric::cx_datum();
1804                let d = cx_datum.to_width(accum.0);
1805                // Take a wide decimal (aggregator) into a
1806                // narrow decimal (datum). If this operation
1807                // overflows the datum, this new value will be
1808                // +/- infinity. However, the aggregator tracks
1809                // the amount of overflow, making it invertible.
1810                let inf_d = d.is_infinite();
1811                let neg_d = d.is_negative();
1812                let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1813                let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1814                if nans.is_positive() || (pos_inf && neg_inf) {
1815                    // NaNs are NaNs and cases where we've seen a
1816                    // mixture of positive and negative infinities.
1817                    Datum::from(Numeric::nan())
1818                } else if pos_inf {
1819                    Datum::from(Numeric::infinity())
1820                } else if neg_inf {
1821                    let mut cx = numeric::cx_datum();
1822                    let mut d = Numeric::infinity();
1823                    cx.neg(&mut d);
1824                    Datum::from(d)
1825                } else {
1826                    Datum::from(d)
1827                }
1828            }
1829            _ => panic!(
1830                "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1831                aggr_func
1832            ),
1833        }
1834    }
1835}
1836
1837/// The type for accumulator counting. Set to [`Overflowing<u128>`](mz_ore::Overflowing).
1838type AccumCount = mz_ore::Overflowing<i128>;
1839
1840/// Accumulates values for the various types of accumulable aggregations.
1841///
1842/// We assume that there are not more than 2^32 elements for the aggregation.
1843/// Thus we can perform a summation over i32 in an i64 accumulator
1844/// and not worry about exceeding its bounds.
1845///
1846/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
1847/// point representation has less precision than a double. It is entirely possible
1848/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
1849/// to preserve group guarantees.
1850#[derive(
1851    Debug,
1852    Clone,
1853    Copy,
1854    PartialEq,
1855    Eq,
1856    PartialOrd,
1857    Ord,
1858    Serialize,
1859    Deserialize
1860)]
1861enum Accum {
1862    /// Accumulates boolean values.
1863    Bool {
1864        /// The number of `true` values observed.
1865        trues: Diff,
1866        /// The number of `false` values observed.
1867        falses: Diff,
1868    },
1869    /// Accumulates simple numeric values.
1870    SimpleNumber {
1871        /// The accumulation of all non-NULL values observed.
1872        accum: AccumCount,
1873        /// The number of non-NULL values observed.
1874        non_nulls: Diff,
1875    },
1876    /// Accumulates float values.
1877    Float {
1878        /// Accumulates non-special float values, mapped to a fixed precision i128 domain to
1879        /// preserve associativity and commutativity
1880        accum: AccumCount,
1881        /// Counts +inf
1882        pos_infs: Diff,
1883        /// Counts -inf
1884        neg_infs: Diff,
1885        /// Counts NaNs
1886        nans: Diff,
1887        /// Counts non-NULL values
1888        non_nulls: Diff,
1889    },
1890    /// Accumulates arbitrary precision decimals.
1891    Numeric {
1892        /// Accumulates non-special values
1893        accum: OrderedDecimal<NumericAgg>,
1894        /// Counts +inf
1895        pos_infs: Diff,
1896        /// Counts -inf
1897        neg_infs: Diff,
1898        /// Counts NaNs
1899        nans: Diff,
1900        /// Counts non-NULL values
1901        non_nulls: Diff,
1902    },
1903}
1904
1905impl IsZero for Accum {
1906    fn is_zero(&self) -> bool {
1907        match self {
1908            Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1909            Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1910            Accum::Float {
1911                accum,
1912                pos_infs,
1913                neg_infs,
1914                nans,
1915                non_nulls,
1916            } => {
1917                accum.is_zero()
1918                    && pos_infs.is_zero()
1919                    && neg_infs.is_zero()
1920                    && nans.is_zero()
1921                    && non_nulls.is_zero()
1922            }
1923            Accum::Numeric {
1924                accum,
1925                pos_infs,
1926                neg_infs,
1927                nans,
1928                non_nulls,
1929            } => {
1930                accum.0.is_zero()
1931                    && pos_infs.is_zero()
1932                    && neg_infs.is_zero()
1933                    && nans.is_zero()
1934                    && non_nulls.is_zero()
1935            }
1936        }
1937    }
1938}
1939
1940impl Semigroup for Accum {
1941    fn plus_equals(&mut self, other: &Accum) {
1942        match (&mut *self, other) {
1943            (
1944                Accum::Bool { trues, falses },
1945                Accum::Bool {
1946                    trues: other_trues,
1947                    falses: other_falses,
1948                },
1949            ) => {
1950                *trues += other_trues;
1951                *falses += other_falses;
1952            }
1953            (
1954                Accum::SimpleNumber { accum, non_nulls },
1955                Accum::SimpleNumber {
1956                    accum: other_accum,
1957                    non_nulls: other_non_nulls,
1958                },
1959            ) => {
1960                *accum += other_accum;
1961                *non_nulls += other_non_nulls;
1962            }
1963            (
1964                Accum::Float {
1965                    accum,
1966                    pos_infs,
1967                    neg_infs,
1968                    nans,
1969                    non_nulls,
1970                },
1971                Accum::Float {
1972                    accum: other_accum,
1973                    pos_infs: other_pos_infs,
1974                    neg_infs: other_neg_infs,
1975                    nans: other_nans,
1976                    non_nulls: other_non_nulls,
1977                },
1978            ) => {
1979                *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1980                    warn!("Float accumulator overflow. Incorrect results possible");
1981                    accum.wrapping_add(*other_accum)
1982                });
1983                *pos_infs += other_pos_infs;
1984                *neg_infs += other_neg_infs;
1985                *nans += other_nans;
1986                *non_nulls += other_non_nulls;
1987            }
1988            (
1989                Accum::Numeric {
1990                    accum,
1991                    pos_infs,
1992                    neg_infs,
1993                    nans,
1994                    non_nulls,
1995                },
1996                Accum::Numeric {
1997                    accum: other_accum,
1998                    pos_infs: other_pos_infs,
1999                    neg_infs: other_neg_infs,
2000                    nans: other_nans,
2001                    non_nulls: other_non_nulls,
2002                },
2003            ) => {
2004                let mut cx_agg = numeric::cx_agg();
2005                cx_agg.add(&mut accum.0, &other_accum.0);
2006                // `rounded` signals we have exceeded the aggregator's max
2007                // precision, which means we've lost commutativity and
2008                // associativity; nothing to be done here, so panic. For more
2009                // context, see the DEC_Rounded definition at
2010                // http://speleotrove.com/decimal/dncont.html
2011                assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2012                // Reduce to reclaim unused decimal precision. Note that this
2013                // reduction must happen somewhere to make the following
2014                // invertible:
2015                // ```
2016                // CREATE TABLE a (a numeric);
2017                // CREATE MATERIALIZED VIEW t as SELECT sum(a) FROM a;
2018                // INSERT INTO a VALUES ('9e39'), ('9e-39');
2019                // ```
2020                // This will now return infinity. However, we can retract the
2021                // value that blew up its precision:
2022                // ```
2023                // INSERT INTO a VALUES ('-9e-39');
2024                // ```
2025                // This leaves `t`'s aggregator with a value of 9e39. However,
2026                // without doing a reduction, `libdecnum` will store the value
2027                // as 9e39+0e-39, which still exceeds the narrower context's
2028                // precision. By doing the reduction, we can "reclaim" the 39
2029                // digits of precision.
2030                cx_agg.reduce(&mut accum.0);
2031                *pos_infs += other_pos_infs;
2032                *neg_infs += other_neg_infs;
2033                *nans += other_nans;
2034                *non_nulls += other_non_nulls;
2035            }
2036            (l, r) => unreachable!(
2037                "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2038            ),
2039        }
2040    }
2041}
2042
2043impl Multiply<Diff> for Accum {
2044    type Output = Accum;
2045
2046    fn multiply(self, factor: &Diff) -> Accum {
2047        let factor = *factor;
2048        match self {
2049            Accum::Bool { trues, falses } => Accum::Bool {
2050                trues: trues * factor,
2051                falses: falses * factor,
2052            },
2053            Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2054                accum: accum * AccumCount::from(factor),
2055                non_nulls: non_nulls * factor,
2056            },
2057            Accum::Float {
2058                accum,
2059                pos_infs,
2060                neg_infs,
2061                nans,
2062                non_nulls,
2063            } => Accum::Float {
2064                accum: accum
2065                    .checked_mul(AccumCount::from(factor))
2066                    .unwrap_or_else(|| {
2067                        warn!("Float accumulator overflow. Incorrect results possible");
2068                        accum.wrapping_mul(AccumCount::from(factor))
2069                    }),
2070                pos_infs: pos_infs * factor,
2071                neg_infs: neg_infs * factor,
2072                nans: nans * factor,
2073                non_nulls: non_nulls * factor,
2074            },
2075            Accum::Numeric {
2076                accum,
2077                pos_infs,
2078                neg_infs,
2079                nans,
2080                non_nulls,
2081            } => {
2082                let mut cx = numeric::cx_agg();
2083                let mut f = NumericAgg::from(factor.into_inner());
2084                // Unlike `plus_equals`, not necessary to reduce after this operation because `f` will
2085                // always be an integer, i.e. we are never increasing the
2086                // values' scale.
2087                cx.mul(&mut f, &accum.0);
2088                // `rounded` signals we have exceeded the aggregator's max
2089                // precision, which means we've lost commutativity and
2090                // associativity; nothing to be done here, so panic. For more
2091                // context, see the DEC_Rounded definition at
2092                // http://speleotrove.com/decimal/dncont.html
2093                assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2094                Accum::Numeric {
2095                    accum: OrderedDecimal(f),
2096                    pos_infs: pos_infs * factor,
2097                    neg_infs: neg_infs * factor,
2098                    nans: nans * factor,
2099                    non_nulls: non_nulls * factor,
2100                }
2101            }
2102        }
2103    }
2104}
2105
2106impl Columnation for Accum {
2107    type InnerRegion = CopyRegion<Self>;
2108}
2109
2110/// Monoids for in-place compaction of monotonic streams.
2111mod monoids {
2112
2113    // We can improve the performance of some aggregations through the use of algebra.
2114    // In particular, we can move some of the aggregations in to the `diff` field of
2115    // updates, by changing `diff` from integers to a different algebraic structure.
2116    //
2117    // The one we use is called a "semigroup", and it means that the structure has a
2118    // symmetric addition operator. The trait we use also allows the semigroup elements
2119    // to present as "zero", meaning they always act as the identity under +. Here,
2120    // `Datum::Null` acts as the identity under +, _but_ we don't want to make this
2121    // known to DD by the `is_zero` method, see comment there. So, from the point of view
2122    // of DD, this Semigroup should _not_ have a zero.
2123    //
2124    // WARNING: `Datum::Null` should continue to act as the identity of our + (even if we
2125    // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
2126    // assumes this.
2127
2128    use columnation::{Columnation, Region};
2129    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2130    use mz_expr::AggregateFunc;
2131    use mz_ore::soft_panic_or_log;
2132    use mz_repr::{Datum, Diff, Row};
2133    use serde::{Deserialize, Serialize};
2134
2135    /// A monoid containing a single-datum row.
2136    #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2137    pub enum ReductionMonoid {
2138        Min(Row),
2139        Max(Row),
2140    }
2141
2142    impl ReductionMonoid {
2143        pub fn finalize(&self) -> &Row {
2144            use ReductionMonoid::*;
2145            match self {
2146                Min(row) | Max(row) => row,
2147            }
2148        }
2149    }
2150
2151    impl Clone for ReductionMonoid {
2152        fn clone(&self) -> Self {
2153            use ReductionMonoid::*;
2154            match self {
2155                Min(row) => Min(row.clone()),
2156                Max(row) => Max(row.clone()),
2157            }
2158        }
2159
2160        fn clone_from(&mut self, source: &Self) {
2161            use ReductionMonoid::*;
2162
2163            let mut row = std::mem::take(match self {
2164                Min(row) | Max(row) => row,
2165            });
2166
2167            let source_row = match source {
2168                Min(row) | Max(row) => row,
2169            };
2170
2171            row.clone_from(source_row);
2172
2173            match source {
2174                Min(_) => *self = Min(row),
2175                Max(_) => *self = Max(row),
2176            }
2177        }
2178    }
2179
2180    impl Multiply<Diff> for ReductionMonoid {
2181        type Output = Self;
2182
2183        fn multiply(self, factor: &Diff) -> Self {
2184            // Multiplication in ReductionMonoid is idempotent, and
2185            // its users must ascertain its monotonicity beforehand
2186            // (typically with ensure_monotonic) since it has no zero
2187            // value for us to use here.
2188            assert!(factor.is_positive());
2189            self
2190        }
2191    }
2192
2193    impl Semigroup for ReductionMonoid {
2194        fn plus_equals(&mut self, rhs: &Self) {
2195            match (self, rhs) {
2196                (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2197                    let swap = {
2198                        let lhs_val = lhs.unpack_first();
2199                        let rhs_val = rhs.unpack_first();
2200                        // Datum::Null is the identity, not a small element.
2201                        match (lhs_val, rhs_val) {
2202                            (_, Datum::Null) => false,
2203                            (Datum::Null, _) => true,
2204                            (lhs, rhs) => rhs < lhs,
2205                        }
2206                    };
2207                    if swap {
2208                        lhs.clone_from(rhs);
2209                    }
2210                }
2211                (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2212                    let swap = {
2213                        let lhs_val = lhs.unpack_first();
2214                        let rhs_val = rhs.unpack_first();
2215                        // Datum::Null is the identity, not a large element.
2216                        match (lhs_val, rhs_val) {
2217                            (_, Datum::Null) => false,
2218                            (Datum::Null, _) => true,
2219                            (lhs, rhs) => rhs > lhs,
2220                        }
2221                    };
2222                    if swap {
2223                        lhs.clone_from(rhs);
2224                    }
2225                }
2226                (lhs, rhs) => {
2227                    soft_panic_or_log!(
2228                        "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2229                    );
2230                }
2231            }
2232        }
2233    }
2234
2235    impl IsZero for ReductionMonoid {
2236        fn is_zero(&self) -> bool {
2237            // It totally looks like we could return true here for `Datum::Null`, but don't do this!
2238            // DD uses true results of this method to make stuff disappear. This makes sense when
2239            // diffs mean really just diffs, but for `ReductionMonoid` diffs hold reduction results.
2240            // We don't want funny stuff, like disappearing, happening to reduction results even
2241            // when they are null. (This would confuse, e.g., `ReduceCollation` for null inputs.)
2242            false
2243        }
2244    }
2245
2246    impl Columnation for ReductionMonoid {
2247        type InnerRegion = ReductionMonoidRegion;
2248    }
2249
2250    /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
2251    /// in the same backing region. Alternatively, it could store it in two regions, but we select
2252    /// the former for simplicity reasons.
2253    #[derive(Default)]
2254    pub struct ReductionMonoidRegion {
2255        inner: <Row as Columnation>::InnerRegion,
2256    }
2257
2258    impl Region for ReductionMonoidRegion {
2259        type Item = ReductionMonoid;
2260
2261        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2262            use ReductionMonoid::*;
2263            match item {
2264                Min(row) => Min(unsafe { self.inner.copy(row) }),
2265                Max(row) => Max(unsafe { self.inner.copy(row) }),
2266            }
2267        }
2268
2269        fn clear(&mut self) {
2270            self.inner.clear();
2271        }
2272
2273        fn reserve_items<'a, I>(&mut self, items: I)
2274        where
2275            Self: 'a,
2276            I: Iterator<Item = &'a Self::Item> + Clone,
2277        {
2278            self.inner
2279                .reserve_items(items.map(ReductionMonoid::finalize));
2280        }
2281
2282        fn reserve_regions<'a, I>(&mut self, regions: I)
2283        where
2284            Self: 'a,
2285            I: Iterator<Item = &'a Self> + Clone,
2286        {
2287            self.inner.reserve_regions(regions.map(|r| &r.inner));
2288        }
2289
2290        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2291            self.inner.heap_size(callback);
2292        }
2293    }
2294
2295    /// Get the correct monoid implementation for a given aggregation function. Note that
2296    /// all hierarchical aggregation functions need to supply a monoid implementation.
2297    pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2298        match func {
2299            AggregateFunc::MaxNumeric
2300            | AggregateFunc::MaxInt16
2301            | AggregateFunc::MaxInt32
2302            | AggregateFunc::MaxInt64
2303            | AggregateFunc::MaxUInt16
2304            | AggregateFunc::MaxUInt32
2305            | AggregateFunc::MaxUInt64
2306            | AggregateFunc::MaxMzTimestamp
2307            | AggregateFunc::MaxFloat32
2308            | AggregateFunc::MaxFloat64
2309            | AggregateFunc::MaxBool
2310            | AggregateFunc::MaxString
2311            | AggregateFunc::MaxDate
2312            | AggregateFunc::MaxTimestamp
2313            | AggregateFunc::MaxTimestampTz
2314            | AggregateFunc::MaxInterval
2315            | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2316            AggregateFunc::MinNumeric
2317            | AggregateFunc::MinInt16
2318            | AggregateFunc::MinInt32
2319            | AggregateFunc::MinInt64
2320            | AggregateFunc::MinUInt16
2321            | AggregateFunc::MinUInt32
2322            | AggregateFunc::MinUInt64
2323            | AggregateFunc::MinMzTimestamp
2324            | AggregateFunc::MinFloat32
2325            | AggregateFunc::MinFloat64
2326            | AggregateFunc::MinBool
2327            | AggregateFunc::MinString
2328            | AggregateFunc::MinDate
2329            | AggregateFunc::MinTimestamp
2330            | AggregateFunc::MinTimestampTz
2331            | AggregateFunc::MinInterval
2332            | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2333            AggregateFunc::SumInt16
2334            | AggregateFunc::SumInt32
2335            | AggregateFunc::SumInt64
2336            | AggregateFunc::SumUInt16
2337            | AggregateFunc::SumUInt32
2338            | AggregateFunc::SumUInt64
2339            | AggregateFunc::SumFloat32
2340            | AggregateFunc::SumFloat64
2341            | AggregateFunc::SumNumeric
2342            | AggregateFunc::Count
2343            | AggregateFunc::Any
2344            | AggregateFunc::All
2345            | AggregateFunc::Dummy
2346            | AggregateFunc::JsonbAgg { .. }
2347            | AggregateFunc::JsonbObjectAgg { .. }
2348            | AggregateFunc::MapAgg { .. }
2349            | AggregateFunc::ArrayConcat { .. }
2350            | AggregateFunc::ListConcat { .. }
2351            | AggregateFunc::StringAgg { .. }
2352            | AggregateFunc::RowNumber { .. }
2353            | AggregateFunc::Rank { .. }
2354            | AggregateFunc::DenseRank { .. }
2355            | AggregateFunc::LagLead { .. }
2356            | AggregateFunc::FirstValue { .. }
2357            | AggregateFunc::LastValue { .. }
2358            | AggregateFunc::WindowAggregate { .. }
2359            | AggregateFunc::FusedValueWindowFunc { .. }
2360            | AggregateFunc::FusedWindowAggregate { .. } => None,
2361        }
2362    }
2363}
2364
2365mod window_agg_helpers {
2366    use crate::render::reduce::*;
2367
2368    /// TODO: It would be better for performance to do the branching that is in the methods of this
2369    /// enum at the place where we are calling `eval_fast_window_agg`. Then we wouldn't need an enum
2370    /// here, and would parameterize `eval_fast_window_agg` with one of the implementations
2371    /// directly.
2372    pub enum OneByOneAggrImpls {
2373        Accumulable(AccumulableOneByOneAggr),
2374        Hierarchical(HierarchicalOneByOneAggr),
2375        Basic(mz_expr::NaiveOneByOneAggr),
2376    }
2377
2378    impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2379        fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2380            match reduction_type(agg) {
2381                ReductionType::Basic => {
2382                    OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2383                }
2384                ReductionType::Accumulable => {
2385                    OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2386                }
2387                ReductionType::Hierarchical => {
2388                    OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2389                }
2390            }
2391        }
2392
2393        fn give(&mut self, d: &Datum) {
2394            match self {
2395                OneByOneAggrImpls::Basic(i) => i.give(d),
2396                OneByOneAggrImpls::Accumulable(i) => i.give(d),
2397                OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2398            }
2399        }
2400
2401        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2402            // Note that the `reverse` parameter is currently forwarded only for Basic aggregations.
2403            match self {
2404                OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2405                OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2406                OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2407            }
2408        }
2409    }
2410
2411    pub struct AccumulableOneByOneAggr {
2412        aggr_func: AggregateFunc,
2413        accum: Accum,
2414        total: Diff,
2415    }
2416
2417    impl AccumulableOneByOneAggr {
2418        fn new(aggr_func: &AggregateFunc) -> Self {
2419            AccumulableOneByOneAggr {
2420                aggr_func: aggr_func.clone(),
2421                accum: accumulable_zero(aggr_func),
2422                total: Diff::ZERO,
2423            }
2424        }
2425
2426        fn give(&mut self, d: &Datum) {
2427            self.accum
2428                .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2429            self.total += Diff::ONE;
2430        }
2431
2432        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2433            temp_storage.make_datum(|packer| {
2434                packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2435            })
2436        }
2437    }
2438
2439    pub struct HierarchicalOneByOneAggr {
2440        aggr_func: AggregateFunc,
2441        // Warning: We are assuming that `Datum::Null` acts as the identity for `ReductionMonoid`'s
2442        // `plus_equals`. (But _not_ relying here on `ReductionMonoid::is_zero`.)
2443        monoid: ReductionMonoid,
2444    }
2445
2446    impl HierarchicalOneByOneAggr {
2447        fn new(aggr_func: &AggregateFunc) -> Self {
2448            let mut row_buf = Row::default();
2449            row_buf.packer().push(Datum::Null);
2450            HierarchicalOneByOneAggr {
2451                aggr_func: aggr_func.clone(),
2452                monoid: get_monoid(row_buf, aggr_func)
2453                    .expect("aggr_func should be a hierarchical aggregation function"),
2454            }
2455        }
2456
2457        fn give(&mut self, d: &Datum) {
2458            let mut row_buf = Row::default();
2459            row_buf.packer().push(d);
2460            let m = get_monoid(row_buf, &self.aggr_func)
2461                .expect("aggr_func should be a hierarchical aggregation function");
2462            self.monoid.plus_equals(&m);
2463        }
2464
2465        fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2466            temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2467        }
2468    }
2469}