1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Data;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Collection, Diff as _};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58 ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59 RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64 G: Scope,
65 G::Timestamp: MzTimestamp + Refines<T>,
66 T: MzTimestamp,
67{
68 pub fn render_reduce(
71 &self,
72 input_key: Option<Vec<MirScalarExpr>>,
73 input: CollectionBundle<G, T>,
74 key_val_plan: KeyValPlan,
75 reduce_plan: ReducePlan,
76 mfp_after: Option<MapFilterProject>,
77 ) -> CollectionBundle<G, T> {
78 let mfp_after = mfp_after.map(|m| {
80 m.into_plan()
81 .expect("MFP planning must succeed")
82 .into_nontemporal()
83 .expect("Fused Reduce MFPs do not have temporal predicates")
84 });
85
86 input.scope().region_named("Reduce", |inner| {
87 let KeyValPlan {
88 mut key_plan,
89 mut val_plan,
90 } = key_val_plan;
91 let key_arity = key_plan.projection.len();
92 let mut datums = DatumVec::new();
93
94 let mut demand = Vec::new();
96 demand.extend(key_plan.demand());
97 demand.extend(val_plan.demand());
98 demand.sort();
99 demand.dedup();
100
101 let mut demand_map = BTreeMap::new();
103 for column in demand.iter() {
104 demand_map.insert(*column, demand_map.len());
105 }
106 let demand_map_len = demand_map.len();
107 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113 input_key.map(|k| (k, None)),
114 max_demand,
115 move |row_datums, time, diff| {
116 let mut row_builder = SharedRow::get();
117 let temp_storage = RowArena::new();
118
119 let mut row_iter = row_datums.drain(..);
120 let mut datums_local = datums.borrow();
121 for skip in skips.iter() {
123 datums_local.push(row_iter.nth(*skip).unwrap());
124 }
125
126 let key =
128 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129 let key = match key {
130 Err(e) => {
131 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132 }
133 Ok(Some(key)) => key.clone(),
134 Ok(None) => panic!("Row expected as no predicate was used"),
135 };
136
137 datums_local.truncate(skips.len());
140 let val =
141 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142 let val = match val {
143 Err(e) => {
144 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145 }
146 Ok(Some(val)) => val.clone(),
147 Ok(None) => panic!("Row expected as no predicate was used"),
148 };
149
150 Some((Ok((key, val)), time.clone(), diff.clone()))
151 },
152 );
153
154 type CB<T> = ConsolidatingContainerBuilder<T>;
156 let (ok, mut err) = key_val_input
157 .as_collection()
158 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160 err = err.concat(&err_input);
161
162 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164 .leave_region()
165 })
166 }
167
168 fn render_reduce_plan<S>(
174 &self,
175 plan: ReducePlan,
176 collection: Collection<S, (Row, Row), Diff>,
177 err_input: Collection<S, DataflowError, Diff>,
178 key_arity: usize,
179 mfp_after: Option<SafeMfpPlan>,
180 ) -> CollectionBundle<S, T>
181 where
182 S: Scope<Timestamp = G::Timestamp>,
183 {
184 let mut errors = Default::default();
185 let arrangement =
186 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188 CollectionBundle::from_columns(
189 0..key_arity,
190 ArrangementFlavor::Local(
191 arrangement,
192 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193 ),
194 )
195 }
196
197 fn render_reduce_plan_inner<S>(
198 &self,
199 plan: ReducePlan,
200 collection: Collection<S, (Row, Row), Diff>,
201 errors: &mut Vec<Collection<S, DataflowError, Diff>>,
202 key_arity: usize,
203 mfp_after: Option<SafeMfpPlan>,
204 ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205 where
206 S: Scope<Timestamp = G::Timestamp>,
207 {
208 let arrangement = match plan {
211 ReducePlan::Distinct => {
214 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215 errors.push(errs);
216 arranged_output
217 }
218 ReducePlan::Accumulable(expr) => {
219 let (arranged_output, errs) =
220 self.build_accumulable(collection, expr, key_arity, mfp_after);
221 errors.push(errs);
222 arranged_output
223 }
224 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226 errors.push(errs);
227 output
228 }
229 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231 errors.push(errs);
232 output
233 }
234 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235 index,
236 expr,
237 fused_unnest_list,
238 })) => {
239 let validating = !fused_unnest_list;
243 let (output, errs) = self.build_basic_aggregate(
244 collection,
245 index,
246 &expr,
247 validating,
248 key_arity,
249 mfp_after,
250 fused_unnest_list,
251 );
252 if validating {
253 errors.push(errs.expect("validation should have occurred as it was requested"));
254 }
255 output
256 }
257 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
258 let (output, errs) =
259 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
260 errors.push(errs);
261 output
262 }
263 ReducePlan::Collation(expr) => {
266 let mut to_collate = vec![];
268
269 for plan in [
270 expr.hierarchical.map(ReducePlan::Hierarchical),
271 expr.accumulable.map(ReducePlan::Accumulable),
272 expr.basic.map(ReducePlan::Basic),
273 ]
274 .into_iter()
275 .flat_map(std::convert::identity)
276 {
277 let r#type = ReductionType::try_from(&plan)
278 .expect("only representable reduction types were used above");
279
280 let arrangement = self.render_reduce_plan_inner(
281 plan,
282 collection.clone(),
283 errors,
284 key_arity,
285 None,
286 );
287 to_collate.push((r#type, arrangement));
288 }
289
290 let (oks, errs) = self.build_collation(
292 to_collate,
293 expr.aggregate_types,
294 &mut collection.scope(),
295 mfp_after,
296 );
297 errors.push(errs);
298 oks
299 }
300 };
301 arrangement
302 }
303
304 fn build_collation<S>(
312 &self,
313 arrangements: Vec<(ReductionType, RowRowArrangement<S>)>,
314 aggregate_types: Vec<ReductionType>,
315 scope: &mut S,
316 mfp_after: Option<SafeMfpPlan>,
317 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
318 where
319 S: Scope<Timestamp = G::Timestamp>,
320 {
321 let error_logger = self.error_logger();
322
323 if arrangements.len() <= 1 {
325 error_logger.soft_panic_or_log(
326 "Incorrect number of arrangements in reduce collation",
327 &format!("len={}", arrangements.len()),
328 );
329 }
330
331 let mut to_concat = vec![];
332
333 for (reduction_type, arrangement) in arrangements.into_iter() {
335 let collection = arrangement
336 .as_collection(move |key, val| (key.to_row(), (reduction_type, val.to_row())));
337 to_concat.push(collection);
338 }
339
340 let mut distinct_aggregate_types = aggregate_types.clone();
344 distinct_aggregate_types.sort_unstable();
345 distinct_aggregate_types.dedup();
346 let n_distinct_aggregate_types = distinct_aggregate_types.len();
347
348 let mut datums1 = DatumVec::new();
350 let mut datums2 = DatumVec::new();
351 let mfp_after1 = mfp_after.clone();
352 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
353
354 let aggregate_types_err = aggregate_types.clone();
355 let (oks, errs) = differential_dataflow::collection::concatenate(scope, to_concat)
356 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_,_,_>, RowValSpine<_, _, _>>("Arrange ReduceCollation")
357 .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
358 "ReduceCollation",
359 "ReduceCollation Errors",
360 {
361 move |key, input, output| {
362 let mut accumulable = DatumList::empty().iter();
371 let mut hierarchical = DatumList::empty().iter();
372 let mut basic = DatumList::empty().iter();
373
374 if input.len() != n_distinct_aggregate_types {
378 return;
379 }
380
381 for (item, _) in input.iter() {
382 let reduction_type = &item.0;
383 let row = &item.1;
384 match reduction_type {
385 ReductionType::Accumulable => accumulable = row.iter(),
386 ReductionType::Hierarchical => hierarchical = row.iter(),
387 ReductionType::Basic => basic = row.iter(),
388 }
389 }
390
391 let temp_storage = RowArena::new();
392 let datum_iter = key.to_datum_iter();
393 let mut datums_local = datums1.borrow();
394 datums_local.extend(datum_iter);
395 let key_len = datums_local.len();
396
397 for typ in aggregate_types.iter() {
399 let datum = match typ {
400 ReductionType::Accumulable => accumulable.next(),
401 ReductionType::Hierarchical => hierarchical.next(),
402 ReductionType::Basic => basic.next(),
403 };
404 let Some(datum) = datum else { return };
405 datums_local.push(datum);
406 }
407
408 if (accumulable.next(), hierarchical.next(), basic.next())
416 == (None, None, None)
417 {
418 if let Some(row) = evaluate_mfp_after(
419 &mfp_after1,
420 &mut datums_local,
421 &temp_storage,
422 key_len,
423 ) {
424 output.push((row, Diff::ONE));
425 }
426 }
427 }
428 },
429 move |key, input, output| {
430 if input.len() != n_distinct_aggregate_types {
431 let message = "Mismatched aggregates for key in ReduceCollation";
435 error_logger.log(
436 message,
437 &format!(
438 "key={key:?}, n_aggregates_requested={requested}, \
439 n_distinct_aggregate_types={n_distinct_aggregate_types}",
440 requested = input.len(),
441 ),
442 );
443 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
444 return;
445 }
446
447 let mut accumulable = DatumList::empty().iter();
448 let mut hierarchical = DatumList::empty().iter();
449 let mut basic = DatumList::empty().iter();
450 for (item, _) in input.iter() {
451 let reduction_type = &item.0;
452 let row = &item.1;
453 match reduction_type {
454 ReductionType::Accumulable => accumulable = row.iter(),
455 ReductionType::Hierarchical => hierarchical = row.iter(),
456 ReductionType::Basic => basic = row.iter(),
457 }
458 }
459
460 let temp_storage = RowArena::new();
461 let datum_iter = key.to_datum_iter();
462 let mut datums_local = datums2.borrow();
463 datums_local.extend(datum_iter);
464
465 for typ in aggregate_types_err.iter() {
466 let datum = match typ {
467 ReductionType::Accumulable => accumulable.next(),
468 ReductionType::Hierarchical => hierarchical.next(),
469 ReductionType::Basic => basic.next(),
470 };
471 if let Some(datum) = datum {
472 datums_local.push(datum);
473 } else {
474 let message = "Missing value for key in ReduceCollation";
477 error_logger.log(message, &format!("typ={typ:?}, key={key:?}"));
478 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
479 return;
480 }
481 }
482
483 if (accumulable.next(), hierarchical.next(), basic.next()) != (None, None, None)
486 {
487 let message = "Rows too large for key in ReduceCollation";
488 error_logger.log(message, &format!("key={key:?}"));
489 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
490 }
491
492 let Some(mfp) = &mfp_after2 else { return };
495 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
496 output.push((e.into(), Diff::ONE));
497 }
498 },
499 );
500 (oks, errs.as_collection(|_, v| v.clone()))
501 }
502
503 fn build_distinct<S>(
505 &self,
506 collection: Collection<S, (Row, Row), Diff>,
507 mfp_after: Option<SafeMfpPlan>,
508 ) -> (
509 Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
510 Collection<S, DataflowError, Diff>,
511 )
512 where
513 S: Scope<Timestamp = G::Timestamp>,
514 {
515 let error_logger = self.error_logger();
516
517 let mut datums1 = DatumVec::new();
519 let mut datums2 = DatumVec::new();
520 let mfp_after1 = mfp_after.clone();
521 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
522
523 let (output, errors) = collection
524 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _,>, RowRowSpine<_, _>>("Arranged DistinctBy")
525 .reduce_pair::<_, RowRowBuilder<_, _,>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
526 "DistinctBy",
527 "DistinctByErrorCheck",
528 move |key, _input, output| {
529 let temp_storage = RowArena::new();
530 let mut datums_local = datums1.borrow();
531 datums_local.extend(key.to_datum_iter());
532
533 if mfp_after1
537 .as_ref()
538 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
539 .unwrap_or(Ok(true))
540 == Ok(true)
541 {
542 output.push((Row::default(), Diff::ONE));
546 }
547 },
548 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
549 for (_, count) in input.iter() {
550 if count.is_positive() {
551 continue;
552 }
553 let message = "Non-positive multiplicity in DistinctBy";
554 error_logger.log(message, &format!("row={key:?}, count={count}"));
555 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
556 return;
557 }
558 let Some(mfp) = &mfp_after2 else { return };
560 let temp_storage = RowArena::new();
561 let datum_iter = key.to_datum_iter();
562 let mut datums_local = datums2.borrow();
563 datums_local.extend(datum_iter);
564
565 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
566 output.push((e.into(), Diff::ONE));
567 }
568 },
569 );
570 (output, errors.as_collection(|_k, v| v.clone()))
571 }
572
573 fn build_basic_aggregates<S>(
581 &self,
582 input: Collection<S, (Row, Row), Diff>,
583 aggrs: Vec<(usize, AggregateExpr)>,
584 key_arity: usize,
585 mfp_after: Option<SafeMfpPlan>,
586 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
587 where
588 S: Scope<Timestamp = G::Timestamp>,
589 {
590 if aggrs.len() <= 1 {
593 self.error_logger().soft_panic_or_log(
594 "Too few aggregations when building basic aggregates",
595 &format!("len={}", aggrs.len()),
596 )
597 }
598 let mut err_output = None;
599 let mut to_collect = Vec::new();
600 for (index, aggr) in aggrs {
601 let (result, errs) = self.build_basic_aggregate(
602 input.clone(),
603 index,
604 &aggr,
605 err_output.is_none(),
606 key_arity,
607 None,
608 false,
609 );
610 if errs.is_some() {
611 err_output = errs
612 }
613 to_collect
614 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
615 }
616
617 let mut datums1 = DatumVec::new();
619 let mut datums2 = DatumVec::new();
620 let mfp_after1 = mfp_after.clone();
621 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
622
623 let arranged =
624 differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
625 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
626 "Arranged ReduceFuseBasic input",
627 );
628
629 let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
630 "ReduceFuseBasic",
631 {
632 move |key, input, output| {
633 let temp_storage = RowArena::new();
634 let datum_iter = key.to_datum_iter();
635 let mut datums_local = datums1.borrow();
636 datums_local.extend(datum_iter);
637 let key_len = datums_local.len();
638
639 for ((_, row), _) in input.iter() {
640 datums_local.push(row.unpack_first());
641 }
642
643 if let Some(row) =
644 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
645 {
646 output.push((row, Diff::ONE));
647 }
648 }
649 },
650 );
651 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
656 if let Some(mfp) = mfp_after2 {
657 let mfp_errs = arranged
658 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
659 "ReduceFuseBasic Error Check",
660 move |key, input, output| {
661 let temp_storage = RowArena::new();
664 let datum_iter = key.to_datum_iter();
665 let mut datums_local = datums2.borrow();
666 datums_local.extend(datum_iter);
667
668 for ((_, row), _) in input.iter() {
669 datums_local.push(row.unpack_first());
670 }
671
672 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
673 output.push((e.into(), Diff::ONE));
674 }
675 },
676 )
677 .as_collection(|_, v| v.clone());
678 (output, validation_errs.concat(&mfp_errs))
679 } else {
680 (output, validation_errs)
681 }
682 }
683
684 fn build_basic_aggregate<S>(
688 &self,
689 input: Collection<S, (Row, Row), Diff>,
690 index: usize,
691 aggr: &AggregateExpr,
692 validating: bool,
693 key_arity: usize,
694 mfp_after: Option<SafeMfpPlan>,
695 fused_unnest_list: bool,
696 ) -> (
697 RowRowArrangement<S>,
698 Option<Collection<S, DataflowError, Diff>>,
699 )
700 where
701 S: Scope<Timestamp = G::Timestamp>,
702 {
703 let AggregateExpr {
704 func,
705 expr: _,
706 distinct,
707 } = aggr.clone();
708
709 let mut partial = input.map(move |(key, row)| {
711 let mut row_builder = SharedRow::get();
712 let value = row.iter().nth(index).unwrap();
713 row_builder.packer().push(value);
714 (key, row_builder.clone())
715 });
716
717 let mut err_output = None;
718
719 if distinct {
721 let pairer = Pairer::new(key_arity);
723 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
724 if validating {
725 let (oks, errs) = self
726 .build_reduce_inaccumulable_distinct::<_,RowValBuilder<Result<(), String>, _,_>, RowValSpine<Result<(), String>, _, _>>(keyed, None)
727 .as_collection(|k, v| (k.to_row(), v.as_ref().map(|&()| ()).map_err(|m| m.as_str().into())))
728 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>("Demux Errors", move |(key_val, result)| match result {
729 Ok(()) => Ok(pairer.split(&key_val)),
730 Err(m) => Err(EvalError::Internal(m).into()),
731 });
732 err_output = Some(errs);
733 partial = oks;
734 } else {
735 partial = self
736 .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
737 keyed,
738 Some(" [val: empty]"),
739 )
740 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
741 }
742 }
743
744 let mut datums1 = DatumVec::new();
746 let mut datums2 = DatumVec::new();
747 let mut datums_key_1 = DatumVec::new();
748 let mut datums_key_2 = DatumVec::new();
749 let mfp_after1 = mfp_after.clone();
750 let func2 = func.clone();
751
752 let name = if !fused_unnest_list {
753 "ReduceInaccumulable"
754 } else {
755 "FusedReduceUnnestList"
756 };
757 let arranged = partial
758 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
759 "Arranged {name}"
760 ));
761 let oks = if !fused_unnest_list {
762 arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
763 move |key, source, target| {
764 let iter = source.iter().flat_map(|(v, w)| {
768 let count = usize::try_from(w.into_inner()).unwrap_or(0);
771 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
772 });
773
774 let temp_storage = RowArena::new();
775 let datum_iter = key.to_datum_iter();
776 let mut datums_local = datums1.borrow();
777 datums_local.extend(datum_iter);
778 let key_len = datums_local.len();
779 datums_local.push(
780 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
783 iter,
784 &temp_storage,
785 ),
786 );
787
788 if let Some(row) =
789 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
790 {
791 target.push((row, Diff::ONE));
792 }
793 }
794 })
795 } else {
796 arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
797 move |key, source, target| {
798 let iter = source.iter().flat_map(|(v, w)| {
800 let count = usize::try_from(w.into_inner()).unwrap_or(0);
801 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
802 });
803
804 let temp_storage = RowArena::new();
806 let mut datums_local = datums_key_1.borrow();
807 datums_local.extend(key.to_datum_iter());
808 let key_len = datums_local.len();
809 for datum in func
810 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
811 iter,
812 &temp_storage,
813 )
814 {
815 datums_local.truncate(key_len);
816 datums_local.push(datum);
817 if let Some(row) = evaluate_mfp_after(
818 &mfp_after1,
819 &mut datums_local,
820 &temp_storage,
821 key_len,
822 ) {
823 target.push((row, Diff::ONE));
824 }
825 }
826 }
827 })
828 };
829
830 let must_validate = validating && err_output.is_none();
834 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
835 if must_validate || mfp_after2.is_some() {
836 let error_logger = self.error_logger();
837
838 let errs = if !fused_unnest_list {
839 arranged
840 .mz_reduce_abelian::<_, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
841 &format!("{name} Error Check"),
842 move |key, source, target| {
843 if must_validate {
847 for (value, count) in source.iter() {
848 if count.is_positive() {
849 continue;
850 }
851 let value = value.to_row();
852 let message = "Non-positive accumulation in ReduceInaccumulable";
853 error_logger
854 .log(message, &format!("value={value:?}, count={count}"));
855 target.push((EvalError::Internal(message.into()).into(), Diff::ONE));
856 return;
857 }
858 }
859
860 let Some(mfp) = &mfp_after2 else { return };
862 let iter = source.iter().flat_map(|&(mut v, ref w)| {
863 let count = usize::try_from(w.into_inner()).unwrap_or(0);
864 std::iter::repeat(v.next().unwrap()).take(count)
867 });
868
869 let temp_storage = RowArena::new();
870 let datum_iter = key.to_datum_iter();
871 let mut datums_local = datums2.borrow();
872 datums_local.extend(datum_iter);
873 datums_local.push(
874 func2.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
875 iter,
876 &temp_storage,
877 ),
878 );
879 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
880 {
881 target.push((e.into(), Diff::ONE));
882 }
883 },
884 )
885 .as_collection(|_, v| v.clone())
886 } else {
887 assert!(!must_validate);
889 let Some(mfp) = mfp_after2 else {
892 unreachable!()
893 };
894 arranged
895 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
896 &format!("{name} Error Check"),
897 move |key, source, target| {
898 let iter = source.iter().flat_map(|&(mut v, ref w)| {
899 let count = usize::try_from(w.into_inner()).unwrap_or(0);
900 std::iter::repeat(v.next().unwrap()).take(count)
903 });
904
905 let temp_storage = RowArena::new();
906 let mut datums_local = datums_key_2.borrow();
907 datums_local.extend(key.to_datum_iter());
908 let key_len = datums_local.len();
909 for datum in func2
910 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
911 iter,
912 &temp_storage,
913 )
914 {
915 datums_local.truncate(key_len);
916 datums_local.push(datum);
917 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
920 {
921 target.push((e.into(), Diff::ONE));
922 }
923 }
924 },
925 )
926 .as_collection(|_, v| v.clone())
927 };
928
929 if let Some(e) = err_output {
930 err_output = Some(e.concat(&errs));
931 } else {
932 err_output = Some(errs);
933 }
934 }
935 (oks, err_output)
936 }
937
938 fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
939 &self,
940 input: Collection<S, Row, Diff>,
941 name_tag: Option<&str>,
942 ) -> Arranged<S, TraceAgent<Tr>>
943 where
944 S: Scope<Timestamp = G::Timestamp>,
945 Tr: for<'a> Trace<
946 Key<'a> = DatumSeq<'a>,
947 KeyOwn = Row,
948 Time = G::Timestamp,
949 Diff = Diff,
950 ValOwn: Data + MaybeValidatingRow<(), String>,
951 > + 'static,
952 Bu: Builder<
953 Time = G::Timestamp,
954 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
955 Output = Tr::Batch,
956 >,
957 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
958 {
959 let error_logger = self.error_logger();
960
961 let output_name = format!(
962 "ReduceInaccumulable Distinct{}",
963 name_tag.unwrap_or_default()
964 );
965
966 let input: KeyCollection<_, _, _> = input.into();
967 input
968 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
969 "Arranged ReduceInaccumulable Distinct [val: empty]",
970 )
971 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
972 if let Some(err) = Tr::ValOwn::into_error() {
973 for (value, count) in source.iter() {
974 if count.is_positive() {
975 continue;
976 }
977
978 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
979 error_logger.log(message, &format!("value={value:?}, count={count}"));
980 t.push((err(message.to_string()), Diff::ONE));
981 return;
982 }
983 }
984 t.push((Tr::ValOwn::ok(()), Diff::ONE))
985 })
986 }
987
988 fn build_bucketed<S>(
1006 &self,
1007 input: Collection<S, (Row, Row), Diff>,
1008 BucketedPlan {
1009 aggr_funcs,
1010 skips,
1011 buckets,
1012 }: BucketedPlan,
1013 key_arity: usize,
1014 mfp_after: Option<SafeMfpPlan>,
1015 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1016 where
1017 S: Scope<Timestamp = G::Timestamp>,
1018 {
1019 let mut err_output: Option<Collection<S, _, _>> = None;
1020 let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
1021 let input = input.enter(inner);
1022
1023 let first_mod = buckets.get(0).copied().unwrap_or(1);
1025
1026 let mut stage = input.map(move |(key, row)| {
1028 let mut row_builder = SharedRow::get();
1029 let mut row_packer = row_builder.packer();
1030 let mut row_iter = row.iter();
1031 for skip in skips.iter() {
1032 row_packer.push(row_iter.nth(*skip).unwrap());
1033 }
1034 let values = row_builder.clone();
1035
1036 let hash = values.hashed() % first_mod;
1038 let hash_key =
1039 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
1040 (hash_key, values)
1041 });
1042
1043 for (index, b) in buckets.into_iter().enumerate() {
1045 let input = if index == 0 {
1047 stage
1048 } else {
1049 stage.map(move |(hash_key, values)| {
1050 let mut hash_key_iter = hash_key.iter();
1051 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
1052 let hash_key = SharedRow::pack(
1054 std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
1055 );
1056 (hash_key, values)
1057 })
1058 };
1059
1060 let validating = err_output.is_none();
1064
1065 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
1066 if let Some(errs) = errs {
1067 err_output = Some(errs.leave_region());
1068 }
1069 stage = oks
1070 }
1071
1072 let partial = stage.map(move |(hash_key, values)| {
1074 let mut hash_key_iter = hash_key.iter();
1075 let _hash = hash_key_iter.next();
1076 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
1077 });
1078
1079 let mut datums1 = DatumVec::new();
1081 let mut datums2 = DatumVec::new();
1082 let mfp_after1 = mfp_after.clone();
1083 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1084 let aggr_funcs2 = aggr_funcs.clone();
1085
1086 let error_logger = self.error_logger();
1089 let arranged = partial
1092 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1093 "Arrange ReduceMinsMaxes",
1094 );
1095 let must_validate = err_output.is_none();
1099 if must_validate || mfp_after2.is_some() {
1100 let errs = arranged
1101 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1102 "ReduceMinsMaxes Error Check",
1103 move |key, source, target| {
1104 if must_validate {
1108 for (val, count) in source.iter() {
1109 if count.is_positive() {
1110 continue;
1111 }
1112 let val = val.to_row();
1113 let message = "Non-positive accumulation in ReduceMinsMaxes";
1114 error_logger
1115 .log(message, &format!("val={val:?}, count={count}"));
1116 target.push((
1117 EvalError::Internal(message.into()).into(),
1118 Diff::ONE,
1119 ));
1120 return;
1121 }
1122 }
1123
1124 let Some(mfp) = &mfp_after2 else { return };
1126 let temp_storage = RowArena::new();
1127 let datum_iter = key.to_datum_iter();
1128 let mut datums_local = datums2.borrow();
1129 datums_local.extend(datum_iter);
1130
1131 let mut source_iters = source
1132 .iter()
1133 .map(|(values, _cnt)| *values)
1134 .collect::<Vec<_>>();
1135 for func in aggr_funcs2.iter() {
1136 let column_iter = (0..source_iters.len())
1137 .map(|i| source_iters[i].next().unwrap());
1138 datums_local.push(func.eval(column_iter, &temp_storage));
1139 }
1140 if let Result::Err(e) =
1141 mfp.evaluate_inner(&mut datums_local, &temp_storage)
1142 {
1143 target.push((e.into(), Diff::ONE));
1144 }
1145 },
1146 )
1147 .as_collection(|_, v| v.clone())
1148 .leave_region();
1149 if let Some(e) = &err_output {
1150 err_output = Some(e.concat(&errs));
1151 } else {
1152 err_output = Some(errs);
1153 }
1154 }
1155 arranged
1156 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1157 "ReduceMinsMaxes",
1158 move |key, source, target| {
1159 let temp_storage = RowArena::new();
1160 let datum_iter = key.to_datum_iter();
1161 let mut datums_local = datums1.borrow();
1162 datums_local.extend(datum_iter);
1163 let key_len = datums_local.len();
1164
1165 let mut source_iters = source
1166 .iter()
1167 .map(|(values, _cnt)| *values)
1168 .collect::<Vec<_>>();
1169 for func in aggr_funcs.iter() {
1170 let column_iter =
1171 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1172 datums_local.push(func.eval(column_iter, &temp_storage));
1173 }
1174
1175 if let Some(row) = evaluate_mfp_after(
1176 &mfp_after1,
1177 &mut datums_local,
1178 &temp_storage,
1179 key_len,
1180 ) {
1181 target.push((row, Diff::ONE));
1182 }
1183 },
1184 )
1185 .leave_region()
1186 });
1187 (
1188 arranged_output,
1189 err_output.expect("expected to validate in one level of the hierarchy"),
1190 )
1191 }
1192
1193 fn build_bucketed_stage<S>(
1200 &self,
1201 aggr_funcs: &Vec<AggregateFunc>,
1202 input: &Collection<S, (Row, Row), Diff>,
1203 validating: bool,
1204 ) -> (
1205 Collection<S, (Row, Row), Diff>,
1206 Option<Collection<S, DataflowError, Diff>>,
1207 )
1208 where
1209 S: Scope<Timestamp = G::Timestamp>,
1210 {
1211 let (input, negated_output, errs) = if validating {
1212 let (input, reduced) = self
1213 .build_bucketed_negated_output::<_, RowValBuilder<_,_,_>, RowValSpine<Result<Row, Row>, _, _>>(
1214 input,
1215 aggr_funcs.clone(),
1216 );
1217 let (oks, errs) = reduced
1218 .as_collection(|k, v| (k.to_row(), v.clone()))
1219 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1220 "Checked Invalid Accumulations",
1221 |(hash_key, result)| match result {
1222 Err(hash_key) => {
1223 let mut hash_key_iter = hash_key.iter();
1224 let _hash = hash_key_iter.next();
1225 let key = SharedRow::pack(hash_key_iter);
1226 let message = format!(
1227 "Invalid data in source, saw non-positive accumulation \
1228 for key {key:?} in hierarchical mins-maxes aggregate"
1229 );
1230 Err(EvalError::Internal(message.into()).into())
1231 }
1232 Ok(values) => Ok((hash_key, values)),
1233 },
1234 );
1235 (input, oks, Some(errs))
1236 } else {
1237 let (input, reduced) = self
1238 .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1239 input,
1240 aggr_funcs.clone(),
1241 );
1242 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1245 (input, oks, None)
1246 };
1247
1248 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1249 let oks = negated_output.concat(&input);
1250 (oks, errs)
1251 }
1252
1253 fn build_bucketed_negated_output<S, Bu, Tr>(
1257 &self,
1258 input: &Collection<S, (Row, Row), Diff>,
1259 aggrs: Vec<AggregateFunc>,
1260 ) -> (
1261 Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1262 Arranged<S, TraceAgent<Tr>>,
1263 )
1264 where
1265 S: Scope<Timestamp = G::Timestamp>,
1266 Tr: for<'a> Trace<
1267 Key<'a> = DatumSeq<'a>,
1268 KeyOwn = Row,
1269 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1270 Time = G::Timestamp,
1271 Diff = Diff,
1272 > + 'static,
1273 Bu: Builder<
1274 Time = G::Timestamp,
1275 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1276 Output = Tr::Batch,
1277 >,
1278 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1279 {
1280 let error_logger = self.error_logger();
1281 let arranged_input = input
1284 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1285 "Arranged MinsMaxesHierarchical input",
1286 );
1287
1288 let reduced = arranged_input.mz_reduce_abelian::<_, Bu, Tr>(
1289 "Reduced Fallibly MinsMaxesHierarchical",
1290 move |key, source, target| {
1291 if let Some(err) = Tr::ValOwn::into_error() {
1292 for (value, count) in source.iter() {
1294 if count.is_positive() {
1295 continue;
1296 }
1297 error_logger.log(
1298 "Non-positive accumulation in MinsMaxesHierarchical",
1299 &format!("key={key:?}, value={value:?}, count={count}"),
1300 );
1301 target.push((err(Tr::owned_key(key)), Diff::ONE));
1304 return;
1305 }
1306 }
1307
1308 let mut row_builder = SharedRow::get();
1309 let mut row_packer = row_builder.packer();
1310
1311 let mut source_iters = source
1312 .iter()
1313 .map(|(values, _cnt)| *values)
1314 .collect::<Vec<_>>();
1315 for func in aggrs.iter() {
1316 let column_iter =
1317 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1318 row_packer.push(func.eval(column_iter, &RowArena::new()));
1319 }
1320 target.reserve(source.len().saturating_add(1));
1326 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1327 target.extend(source.iter().map(|(values, cnt)| {
1328 let mut cnt = *cnt;
1329 cnt.negate();
1330 (Tr::ValOwn::ok(values.to_row()), cnt)
1331 }));
1332 },
1333 );
1334 (arranged_input, reduced)
1335 }
1336
1337 fn build_monotonic<S>(
1340 &self,
1341 collection: Collection<S, (Row, Row), Diff>,
1342 MonotonicPlan {
1343 aggr_funcs,
1344 skips,
1345 must_consolidate,
1346 }: MonotonicPlan,
1347 mfp_after: Option<SafeMfpPlan>,
1348 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1349 where
1350 S: Scope<Timestamp = G::Timestamp>,
1351 {
1352 let collection = collection
1354 .map(move |(key, row)| {
1355 let mut row_builder = SharedRow::get();
1356 let mut values = Vec::with_capacity(skips.len());
1357 let mut row_iter = row.iter();
1358 for skip in skips.iter() {
1359 values.push(
1360 row_builder.pack_using(std::iter::once(row_iter.nth(*skip).unwrap())),
1361 );
1362 }
1363
1364 (key, values)
1365 })
1366 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1367 must_consolidate,
1368 "Consolidated ReduceMonotonic input",
1369 );
1370
1371 let error_logger = self.error_logger();
1373 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1374 error_logger.log(
1375 "Non-monotonic input to ReduceMonotonic",
1376 &format!("data={data:?}, diff={diff}"),
1377 );
1378 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1379 (EvalError::Internal(m).into(), Diff::ONE)
1380 });
1381 let partial = partial.explode_one(move |(key, values)| {
1385 let mut output = Vec::new();
1386 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1387 output.push(monoids::get_monoid(row, func).expect(
1388 "hierarchical aggregations are expected to have monoid implementations",
1389 ));
1390 }
1391 (key, output)
1392 });
1393
1394 let mut datums1 = DatumVec::new();
1396 let mut datums2 = DatumVec::new();
1397 let mfp_after1 = mfp_after.clone();
1398 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1399
1400 let partial: KeyCollection<_, _, _> = partial.into();
1401 let arranged = partial
1402 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1403 "ArrangeMonotonic [val: empty]",
1404 );
1405 let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1406 "ReduceMonotonic",
1407 {
1408 move |key, input, output| {
1409 let temp_storage = RowArena::new();
1410 let datum_iter = key.to_datum_iter();
1411 let mut datums_local = datums1.borrow();
1412 datums_local.extend(datum_iter);
1413 let key_len = datums_local.len();
1414 let accum = &input[0].1;
1415 for monoid in accum.iter() {
1416 datums_local.extend(monoid.finalize().iter());
1417 }
1418
1419 if let Some(row) =
1420 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1421 {
1422 output.push((row, Diff::ONE));
1423 }
1424 }
1425 },
1426 );
1427
1428 if let Some(mfp) = mfp_after2 {
1433 let mfp_errs = arranged
1434 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1435 "ReduceMonotonic Error Check",
1436 move |key, input, output| {
1437 let temp_storage = RowArena::new();
1438 let datum_iter = key.to_datum_iter();
1439 let mut datums_local = datums2.borrow();
1440 datums_local.extend(datum_iter);
1441 let accum = &input[0].1;
1442 for monoid in accum.iter() {
1443 datums_local.extend(monoid.finalize().iter());
1444 }
1445 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1446 {
1447 output.push((e.into(), Diff::ONE));
1448 }
1449 },
1450 )
1451 .as_collection(|_k, v| v.clone());
1452 (output, validation_errs.concat(&mfp_errs))
1453 } else {
1454 (output, validation_errs)
1455 }
1456 }
1457
1458 fn build_accumulable<S>(
1465 &self,
1466 collection: Collection<S, (Row, Row), Diff>,
1467 AccumulablePlan {
1468 full_aggrs,
1469 simple_aggrs,
1470 distinct_aggrs,
1471 }: AccumulablePlan,
1472 key_arity: usize,
1473 mfp_after: Option<SafeMfpPlan>,
1474 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1475 where
1476 S: Scope<Timestamp = G::Timestamp>,
1477 {
1478 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1480 self.error_logger().soft_panic_or_log(
1481 "Incorrect numbers of aggregates in accummulable reduction rendering",
1482 &format!(
1483 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1484 full_aggrs.len(),
1485 simple_aggrs.len(),
1486 distinct_aggrs.len(),
1487 ),
1488 );
1489 }
1490
1491 let zero_diffs: (Vec<_>, Diff) = (
1503 full_aggrs
1504 .iter()
1505 .map(|f| accumulable_zero(&f.func))
1506 .collect(),
1507 Diff::ZERO,
1508 );
1509
1510 let mut to_aggregate = Vec::new();
1511 if simple_aggrs.len() > 0 {
1512 let easy_cases = collection.explode_one({
1514 let zero_diffs = zero_diffs.clone();
1515 move |(key, row)| {
1516 let mut diffs = zero_diffs.clone();
1517 let mut row_iter = row.iter().enumerate();
1523 for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
1524 let mut datum = row_iter.next().unwrap();
1525 while datum_index != &datum.0 {
1526 datum = row_iter.next().unwrap();
1527 }
1528 let datum = datum.1;
1529 diffs.0[*accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1530 diffs.1 = Diff::ONE;
1531 }
1532 ((key, ()), diffs)
1533 }
1534 });
1535 to_aggregate.push(easy_cases);
1536 }
1537
1538 for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
1540 let pairer = Pairer::new(key_arity);
1541 let collection = collection
1542 .map(move |(key, row)| {
1543 let value = row.iter().nth(datum_index).unwrap();
1544 (pairer.merge(&key, std::iter::once(value)), ())
1545 })
1546 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1547 "Arranged Accumulable Distinct [val: empty]",
1548 )
1549 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1550 "Reduced Accumulable Distinct [val: empty]",
1551 move |_k, _s, t| t.push(((), Diff::ONE)),
1552 )
1553 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1554 .explode_one({
1555 let zero_diffs = zero_diffs.clone();
1556 move |(key, row)| {
1557 let datum = row.iter().next().unwrap();
1558 let mut diffs = zero_diffs.clone();
1559 diffs.0[accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1560 diffs.1 = Diff::ONE;
1561 ((key, ()), diffs)
1562 }
1563 });
1564 to_aggregate.push(collection);
1565 }
1566
1567 let collection = if to_aggregate.len() == 1 {
1569 to_aggregate.remove(0)
1570 } else {
1571 differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1572 };
1573
1574 let mut datums1 = DatumVec::new();
1576 let mut datums2 = DatumVec::new();
1577 let mfp_after1 = mfp_after.clone();
1578 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1579 let full_aggrs2 = full_aggrs.clone();
1580
1581 let error_logger = self.error_logger();
1582 let err_full_aggrs = full_aggrs.clone();
1583 let (arranged_output, arranged_errs) = collection
1584 .mz_arrange::<RowBatcher<_,_>, RowBuilder<_,_>, RowSpine<_, (Vec<Accum>, Diff)>>("ArrangeAccumulable [val: empty]")
1585 .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
1586 "ReduceAccumulable",
1587 "AccumulableErrorCheck",
1588 {
1589 move |key, input, output| {
1590 let (ref accums, total) = input[0].1;
1591
1592 let temp_storage = RowArena::new();
1593 let datum_iter = key.to_datum_iter();
1594 let mut datums_local = datums1.borrow();
1595 datums_local.extend(datum_iter);
1596 let key_len = datums_local.len();
1597 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1598 datums_local.push(finalize_accum(&aggr.func, accum, total));
1599 }
1600
1601 if let Some(row) = evaluate_mfp_after(
1602 &mfp_after1,
1603 &mut datums_local,
1604 &temp_storage,
1605 key_len,
1606 ) {
1607 output.push((row, Diff::ONE));
1608 }
1609 }
1610 },
1611 move |key, input, output| {
1612 let (ref accums, total) = input[0].1;
1613 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1614 if total == Diff::ZERO && !accum.is_zero() {
1617 error_logger.log(
1618 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1619 &format!("aggr={aggr:?}, accum={accum:?}"),
1620 );
1621 let key = key.to_row();
1622 let message = format!(
1623 "Invalid data in source, saw net-zero records for key {key} \
1624 with non-zero accumulation in accumulable aggregate"
1625 );
1626 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1627 }
1628 match (&aggr.func, &accum) {
1629 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1630 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1631 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1632 if accum.is_negative() {
1633 error_logger.log(
1634 "Invalid negative unsigned aggregation in ReduceAccumulable",
1635 &format!("aggr={aggr:?}, accum={accum:?}"),
1636 );
1637 let key = key.to_row();
1638 let message = format!(
1639 "Invalid data in source, saw negative accumulation with \
1640 unsigned type for key {key}"
1641 );
1642 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1643 }
1644 }
1645 _ => (), }
1647 }
1648
1649 let Some(mfp) = &mfp_after2 else { return };
1651 let temp_storage = RowArena::new();
1652 let datum_iter = key.to_datum_iter();
1653 let mut datums_local = datums2.borrow();
1654 datums_local.extend(datum_iter);
1655 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1656 datums_local.push(finalize_accum(&aggr.func, accum, total));
1657 }
1658
1659 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1660 output.push((e.into(), Diff::ONE));
1661 }
1662 },
1663 );
1664 (
1665 arranged_output,
1666 arranged_errs.as_collection(|_key, error| error.clone()),
1667 )
1668 }
1669}
1670
1671fn evaluate_mfp_after<'a, 'b>(
1675 mfp_after: &'a Option<SafeMfpPlan>,
1676 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1677 temp_storage: &'a RowArena,
1678 key_len: usize,
1679) -> Option<Row> {
1680 let mut row_builder = SharedRow::get();
1681 if let Some(mfp) = mfp_after {
1684 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1687 Some(row_builder.pack_using(iter.skip(key_len)))
1690 } else {
1691 None
1692 }
1693 } else {
1694 Some(row_builder.pack_using(&datums_local[key_len..]))
1695 }
1696}
1697
1698fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1699 match aggr_func {
1700 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1701 trues: Diff::ZERO,
1702 falses: Diff::ZERO,
1703 },
1704 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1705 accum: AccumCount::ZERO,
1706 pos_infs: Diff::ZERO,
1707 neg_infs: Diff::ZERO,
1708 nans: Diff::ZERO,
1709 non_nulls: Diff::ZERO,
1710 },
1711 AggregateFunc::SumNumeric => Accum::Numeric {
1712 accum: OrderedDecimal(NumericAgg::zero()),
1713 pos_infs: Diff::ZERO,
1714 neg_infs: Diff::ZERO,
1715 nans: Diff::ZERO,
1716 non_nulls: Diff::ZERO,
1717 },
1718 _ => Accum::SimpleNumber {
1719 accum: AccumCount::ZERO,
1720 non_nulls: Diff::ZERO,
1721 },
1722 }
1723}
1724
1725static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1726
1727fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1728 match aggregate_func {
1729 AggregateFunc::Count => Accum::SimpleNumber {
1730 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1732 Diff::ZERO
1733 } else {
1734 Diff::ONE
1735 },
1736 },
1737 AggregateFunc::Any | AggregateFunc::All => match datum {
1738 Datum::True => Accum::Bool {
1739 trues: Diff::ONE,
1740 falses: Diff::ZERO,
1741 },
1742 Datum::Null => Accum::Bool {
1743 trues: Diff::ZERO,
1744 falses: Diff::ZERO,
1745 },
1746 Datum::False => Accum::Bool {
1747 trues: Diff::ZERO,
1748 falses: Diff::ONE,
1749 },
1750 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1751 },
1752 AggregateFunc::Dummy => match datum {
1753 Datum::Dummy => Accum::SimpleNumber {
1754 accum: AccumCount::ZERO,
1755 non_nulls: Diff::ZERO,
1756 },
1757 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1758 },
1759 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1760 let n = match datum {
1761 Datum::Float32(n) => f64::from(*n),
1762 Datum::Float64(n) => *n,
1763 Datum::Null => 0f64,
1764 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1765 };
1766
1767 let nans = Diff::from(n.is_nan());
1768 let pos_infs = Diff::from(n == f64::INFINITY);
1769 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1770 let non_nulls = Diff::from(datum != Datum::Null);
1771
1772 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1775 AccumCount::ZERO
1776 } else {
1777 #[allow(clippy::as_conversions)]
1780 { (n * *FLOAT_SCALE) as i128 }.into()
1781 };
1782
1783 Accum::Float {
1784 accum,
1785 pos_infs,
1786 neg_infs,
1787 nans,
1788 non_nulls,
1789 }
1790 }
1791 AggregateFunc::SumNumeric => match datum {
1792 Datum::Numeric(n) => {
1793 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1794 if n.0.is_negative() {
1795 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1796 } else {
1797 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1798 }
1799 } else if n.0.is_nan() {
1800 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1801 } else {
1802 let mut cx_agg = numeric::cx_agg();
1805 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1806 };
1807
1808 Accum::Numeric {
1809 accum: OrderedDecimal(accum),
1810 pos_infs,
1811 neg_infs,
1812 nans,
1813 non_nulls: Diff::ONE,
1814 }
1815 }
1816 Datum::Null => Accum::Numeric {
1817 accum: OrderedDecimal(NumericAgg::zero()),
1818 pos_infs: Diff::ZERO,
1819 neg_infs: Diff::ZERO,
1820 nans: Diff::ZERO,
1821 non_nulls: Diff::ZERO,
1822 },
1823 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1824 },
1825 _ => {
1826 match datum {
1830 Datum::Int16(i) => Accum::SimpleNumber {
1831 accum: i.into(),
1832 non_nulls: Diff::ONE,
1833 },
1834 Datum::Int32(i) => Accum::SimpleNumber {
1835 accum: i.into(),
1836 non_nulls: Diff::ONE,
1837 },
1838 Datum::Int64(i) => Accum::SimpleNumber {
1839 accum: i.into(),
1840 non_nulls: Diff::ONE,
1841 },
1842 Datum::UInt16(u) => Accum::SimpleNumber {
1843 accum: u.into(),
1844 non_nulls: Diff::ONE,
1845 },
1846 Datum::UInt32(u) => Accum::SimpleNumber {
1847 accum: u.into(),
1848 non_nulls: Diff::ONE,
1849 },
1850 Datum::UInt64(u) => Accum::SimpleNumber {
1851 accum: u.into(),
1852 non_nulls: Diff::ONE,
1853 },
1854 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1855 accum: u64::from(t).into(),
1856 non_nulls: Diff::ONE,
1857 },
1858 Datum::Null => Accum::SimpleNumber {
1859 accum: AccumCount::ZERO,
1860 non_nulls: Diff::ZERO,
1861 },
1862 x => panic!("Accumulating non-integer data: {x:?}"),
1863 }
1864 }
1865 }
1866}
1867
1868fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1869 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1873 Datum::Null
1874 } else {
1875 match (&aggr_func, &accum) {
1876 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1877 Datum::Int64(non_nulls.into_inner())
1878 }
1879 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1880 if falses.is_positive() {
1882 Datum::False
1883 } else if *trues == total {
1884 Datum::True
1885 } else {
1886 Datum::Null
1887 }
1888 }
1889 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1890 if trues.is_positive() {
1892 Datum::True
1893 } else if *falses == total {
1894 Datum::False
1895 } else {
1896 Datum::Null
1897 }
1898 }
1899 (AggregateFunc::Dummy, _) => Datum::Dummy,
1900 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1902 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1903 #[allow(clippy::as_conversions)]
1908 Datum::Int64(accum.into_inner() as i64)
1909 }
1910 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1911 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1912 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1913 if !accum.is_negative() {
1914 #[allow(clippy::as_conversions)]
1920 Datum::UInt64(accum.into_inner() as u64)
1921 } else {
1922 Datum::Null
1926 }
1927 }
1928 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1929 if !accum.is_negative() {
1930 Datum::from(*accum)
1931 } else {
1932 Datum::Null
1936 }
1937 }
1938 (
1939 AggregateFunc::SumFloat32,
1940 Accum::Float {
1941 accum,
1942 pos_infs,
1943 neg_infs,
1944 nans,
1945 non_nulls: _,
1946 },
1947 ) => {
1948 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1949 Datum::from(f32::NAN)
1952 } else if pos_infs.is_positive() {
1953 Datum::from(f32::INFINITY)
1954 } else if neg_infs.is_positive() {
1955 Datum::from(f32::NEG_INFINITY)
1956 } else {
1957 #[allow(clippy::as_conversions)]
1959 {
1960 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1961 }
1962 }
1963 }
1964 (
1965 AggregateFunc::SumFloat64,
1966 Accum::Float {
1967 accum,
1968 pos_infs,
1969 neg_infs,
1970 nans,
1971 non_nulls: _,
1972 },
1973 ) => {
1974 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1975 Datum::from(f64::NAN)
1978 } else if pos_infs.is_positive() {
1979 Datum::from(f64::INFINITY)
1980 } else if neg_infs.is_positive() {
1981 Datum::from(f64::NEG_INFINITY)
1982 } else {
1983 #[allow(clippy::as_conversions)]
1985 {
1986 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1987 }
1988 }
1989 }
1990 (
1991 AggregateFunc::SumNumeric,
1992 Accum::Numeric {
1993 accum,
1994 pos_infs,
1995 neg_infs,
1996 nans,
1997 non_nulls: _,
1998 },
1999 ) => {
2000 let mut cx_datum = numeric::cx_datum();
2001 let d = cx_datum.to_width(accum.0);
2002 let inf_d = d.is_infinite();
2008 let neg_d = d.is_negative();
2009 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
2010 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
2011 if nans.is_positive() || (pos_inf && neg_inf) {
2012 Datum::from(Numeric::nan())
2015 } else if pos_inf {
2016 Datum::from(Numeric::infinity())
2017 } else if neg_inf {
2018 let mut cx = numeric::cx_datum();
2019 let mut d = Numeric::infinity();
2020 cx.neg(&mut d);
2021 Datum::from(d)
2022 } else {
2023 Datum::from(d)
2024 }
2025 }
2026 _ => panic!(
2027 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
2028 aggr_func
2029 ),
2030 }
2031 }
2032}
2033
2034type AccumCount = mz_ore::Overflowing<i128>;
2036
2037#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2048enum Accum {
2049 Bool {
2051 trues: Diff,
2053 falses: Diff,
2055 },
2056 SimpleNumber {
2058 accum: AccumCount,
2060 non_nulls: Diff,
2062 },
2063 Float {
2065 accum: AccumCount,
2068 pos_infs: Diff,
2070 neg_infs: Diff,
2072 nans: Diff,
2074 non_nulls: Diff,
2076 },
2077 Numeric {
2079 accum: OrderedDecimal<NumericAgg>,
2081 pos_infs: Diff,
2083 neg_infs: Diff,
2085 nans: Diff,
2087 non_nulls: Diff,
2089 },
2090}
2091
2092impl IsZero for Accum {
2093 fn is_zero(&self) -> bool {
2094 match self {
2095 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2096 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2097 Accum::Float {
2098 accum,
2099 pos_infs,
2100 neg_infs,
2101 nans,
2102 non_nulls,
2103 } => {
2104 accum.is_zero()
2105 && pos_infs.is_zero()
2106 && neg_infs.is_zero()
2107 && nans.is_zero()
2108 && non_nulls.is_zero()
2109 }
2110 Accum::Numeric {
2111 accum,
2112 pos_infs,
2113 neg_infs,
2114 nans,
2115 non_nulls,
2116 } => {
2117 accum.0.is_zero()
2118 && pos_infs.is_zero()
2119 && neg_infs.is_zero()
2120 && nans.is_zero()
2121 && non_nulls.is_zero()
2122 }
2123 }
2124 }
2125}
2126
2127impl Semigroup for Accum {
2128 fn plus_equals(&mut self, other: &Accum) {
2129 match (&mut *self, other) {
2130 (
2131 Accum::Bool { trues, falses },
2132 Accum::Bool {
2133 trues: other_trues,
2134 falses: other_falses,
2135 },
2136 ) => {
2137 *trues += other_trues;
2138 *falses += other_falses;
2139 }
2140 (
2141 Accum::SimpleNumber { accum, non_nulls },
2142 Accum::SimpleNumber {
2143 accum: other_accum,
2144 non_nulls: other_non_nulls,
2145 },
2146 ) => {
2147 *accum += other_accum;
2148 *non_nulls += other_non_nulls;
2149 }
2150 (
2151 Accum::Float {
2152 accum,
2153 pos_infs,
2154 neg_infs,
2155 nans,
2156 non_nulls,
2157 },
2158 Accum::Float {
2159 accum: other_accum,
2160 pos_infs: other_pos_infs,
2161 neg_infs: other_neg_infs,
2162 nans: other_nans,
2163 non_nulls: other_non_nulls,
2164 },
2165 ) => {
2166 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2167 warn!("Float accumulator overflow. Incorrect results possible");
2168 accum.wrapping_add(*other_accum)
2169 });
2170 *pos_infs += other_pos_infs;
2171 *neg_infs += other_neg_infs;
2172 *nans += other_nans;
2173 *non_nulls += other_non_nulls;
2174 }
2175 (
2176 Accum::Numeric {
2177 accum,
2178 pos_infs,
2179 neg_infs,
2180 nans,
2181 non_nulls,
2182 },
2183 Accum::Numeric {
2184 accum: other_accum,
2185 pos_infs: other_pos_infs,
2186 neg_infs: other_neg_infs,
2187 nans: other_nans,
2188 non_nulls: other_non_nulls,
2189 },
2190 ) => {
2191 let mut cx_agg = numeric::cx_agg();
2192 cx_agg.add(&mut accum.0, &other_accum.0);
2193 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2199 cx_agg.reduce(&mut accum.0);
2218 *pos_infs += other_pos_infs;
2219 *neg_infs += other_neg_infs;
2220 *nans += other_nans;
2221 *non_nulls += other_non_nulls;
2222 }
2223 (l, r) => unreachable!(
2224 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2225 ),
2226 }
2227 }
2228}
2229
2230impl Multiply<Diff> for Accum {
2231 type Output = Accum;
2232
2233 fn multiply(self, factor: &Diff) -> Accum {
2234 let factor = *factor;
2235 match self {
2236 Accum::Bool { trues, falses } => Accum::Bool {
2237 trues: trues * factor,
2238 falses: falses * factor,
2239 },
2240 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2241 accum: accum * AccumCount::from(factor),
2242 non_nulls: non_nulls * factor,
2243 },
2244 Accum::Float {
2245 accum,
2246 pos_infs,
2247 neg_infs,
2248 nans,
2249 non_nulls,
2250 } => Accum::Float {
2251 accum: accum
2252 .checked_mul(AccumCount::from(factor))
2253 .unwrap_or_else(|| {
2254 warn!("Float accumulator overflow. Incorrect results possible");
2255 accum.wrapping_mul(AccumCount::from(factor))
2256 }),
2257 pos_infs: pos_infs * factor,
2258 neg_infs: neg_infs * factor,
2259 nans: nans * factor,
2260 non_nulls: non_nulls * factor,
2261 },
2262 Accum::Numeric {
2263 accum,
2264 pos_infs,
2265 neg_infs,
2266 nans,
2267 non_nulls,
2268 } => {
2269 let mut cx = numeric::cx_agg();
2270 let mut f = NumericAgg::from(factor.into_inner());
2271 cx.mul(&mut f, &accum.0);
2275 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2281 Accum::Numeric {
2282 accum: OrderedDecimal(f),
2283 pos_infs: pos_infs * factor,
2284 neg_infs: neg_infs * factor,
2285 nans: nans * factor,
2286 non_nulls: non_nulls * factor,
2287 }
2288 }
2289 }
2290 }
2291}
2292
2293impl Columnation for Accum {
2294 type InnerRegion = CopyRegion<Self>;
2295}
2296
2297mod monoids {
2299
2300 use differential_dataflow::containers::{Columnation, Region};
2316 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2317 use mz_expr::AggregateFunc;
2318 use mz_ore::soft_panic_or_log;
2319 use mz_repr::{Datum, Diff, Row};
2320 use serde::{Deserialize, Serialize};
2321
2322 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2324 pub enum ReductionMonoid {
2325 Min(Row),
2326 Max(Row),
2327 }
2328
2329 impl ReductionMonoid {
2330 pub fn finalize(&self) -> &Row {
2331 use ReductionMonoid::*;
2332 match self {
2333 Min(row) | Max(row) => row,
2334 }
2335 }
2336 }
2337
2338 impl Clone for ReductionMonoid {
2339 fn clone(&self) -> Self {
2340 use ReductionMonoid::*;
2341 match self {
2342 Min(row) => Min(row.clone()),
2343 Max(row) => Max(row.clone()),
2344 }
2345 }
2346
2347 fn clone_from(&mut self, source: &Self) {
2348 use ReductionMonoid::*;
2349
2350 let mut row = std::mem::take(match self {
2351 Min(row) | Max(row) => row,
2352 });
2353
2354 let source_row = match source {
2355 Min(row) | Max(row) => row,
2356 };
2357
2358 row.clone_from(source_row);
2359
2360 match source {
2361 Min(_) => *self = Min(row),
2362 Max(_) => *self = Max(row),
2363 }
2364 }
2365 }
2366
2367 impl Multiply<Diff> for ReductionMonoid {
2368 type Output = Self;
2369
2370 fn multiply(self, factor: &Diff) -> Self {
2371 assert!(factor.is_positive());
2376 self
2377 }
2378 }
2379
2380 impl Semigroup for ReductionMonoid {
2381 fn plus_equals(&mut self, rhs: &Self) {
2382 match (self, rhs) {
2383 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2384 let swap = {
2385 let lhs_val = lhs.unpack_first();
2386 let rhs_val = rhs.unpack_first();
2387 match (lhs_val, rhs_val) {
2389 (_, Datum::Null) => false,
2390 (Datum::Null, _) => true,
2391 (lhs, rhs) => rhs < lhs,
2392 }
2393 };
2394 if swap {
2395 lhs.clone_from(rhs);
2396 }
2397 }
2398 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2399 let swap = {
2400 let lhs_val = lhs.unpack_first();
2401 let rhs_val = rhs.unpack_first();
2402 match (lhs_val, rhs_val) {
2404 (_, Datum::Null) => false,
2405 (Datum::Null, _) => true,
2406 (lhs, rhs) => rhs > lhs,
2407 }
2408 };
2409 if swap {
2410 lhs.clone_from(rhs);
2411 }
2412 }
2413 (lhs, rhs) => {
2414 soft_panic_or_log!(
2415 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2416 );
2417 }
2418 }
2419 }
2420 }
2421
2422 impl IsZero for ReductionMonoid {
2423 fn is_zero(&self) -> bool {
2424 false
2430 }
2431 }
2432
2433 impl Columnation for ReductionMonoid {
2434 type InnerRegion = ReductionMonoidRegion;
2435 }
2436
2437 #[derive(Default)]
2441 pub struct ReductionMonoidRegion {
2442 inner: <Row as Columnation>::InnerRegion,
2443 }
2444
2445 impl Region for ReductionMonoidRegion {
2446 type Item = ReductionMonoid;
2447
2448 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2449 use ReductionMonoid::*;
2450 match item {
2451 Min(row) => Min(unsafe { self.inner.copy(row) }),
2452 Max(row) => Max(unsafe { self.inner.copy(row) }),
2453 }
2454 }
2455
2456 fn clear(&mut self) {
2457 self.inner.clear();
2458 }
2459
2460 fn reserve_items<'a, I>(&mut self, items: I)
2461 where
2462 Self: 'a,
2463 I: Iterator<Item = &'a Self::Item> + Clone,
2464 {
2465 self.inner
2466 .reserve_items(items.map(ReductionMonoid::finalize));
2467 }
2468
2469 fn reserve_regions<'a, I>(&mut self, regions: I)
2470 where
2471 Self: 'a,
2472 I: Iterator<Item = &'a Self> + Clone,
2473 {
2474 self.inner.reserve_regions(regions.map(|r| &r.inner));
2475 }
2476
2477 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2478 self.inner.heap_size(callback);
2479 }
2480 }
2481
2482 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2485 match func {
2486 AggregateFunc::MaxNumeric
2487 | AggregateFunc::MaxInt16
2488 | AggregateFunc::MaxInt32
2489 | AggregateFunc::MaxInt64
2490 | AggregateFunc::MaxUInt16
2491 | AggregateFunc::MaxUInt32
2492 | AggregateFunc::MaxUInt64
2493 | AggregateFunc::MaxMzTimestamp
2494 | AggregateFunc::MaxFloat32
2495 | AggregateFunc::MaxFloat64
2496 | AggregateFunc::MaxBool
2497 | AggregateFunc::MaxString
2498 | AggregateFunc::MaxDate
2499 | AggregateFunc::MaxTimestamp
2500 | AggregateFunc::MaxTimestampTz
2501 | AggregateFunc::MaxInterval
2502 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2503 AggregateFunc::MinNumeric
2504 | AggregateFunc::MinInt16
2505 | AggregateFunc::MinInt32
2506 | AggregateFunc::MinInt64
2507 | AggregateFunc::MinUInt16
2508 | AggregateFunc::MinUInt32
2509 | AggregateFunc::MinUInt64
2510 | AggregateFunc::MinMzTimestamp
2511 | AggregateFunc::MinFloat32
2512 | AggregateFunc::MinFloat64
2513 | AggregateFunc::MinBool
2514 | AggregateFunc::MinString
2515 | AggregateFunc::MinDate
2516 | AggregateFunc::MinTimestamp
2517 | AggregateFunc::MinTimestampTz
2518 | AggregateFunc::MinInterval
2519 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2520 AggregateFunc::SumInt16
2521 | AggregateFunc::SumInt32
2522 | AggregateFunc::SumInt64
2523 | AggregateFunc::SumUInt16
2524 | AggregateFunc::SumUInt32
2525 | AggregateFunc::SumUInt64
2526 | AggregateFunc::SumFloat32
2527 | AggregateFunc::SumFloat64
2528 | AggregateFunc::SumNumeric
2529 | AggregateFunc::Count
2530 | AggregateFunc::Any
2531 | AggregateFunc::All
2532 | AggregateFunc::Dummy
2533 | AggregateFunc::JsonbAgg { .. }
2534 | AggregateFunc::JsonbObjectAgg { .. }
2535 | AggregateFunc::MapAgg { .. }
2536 | AggregateFunc::ArrayConcat { .. }
2537 | AggregateFunc::ListConcat { .. }
2538 | AggregateFunc::StringAgg { .. }
2539 | AggregateFunc::RowNumber { .. }
2540 | AggregateFunc::Rank { .. }
2541 | AggregateFunc::DenseRank { .. }
2542 | AggregateFunc::LagLead { .. }
2543 | AggregateFunc::FirstValue { .. }
2544 | AggregateFunc::LastValue { .. }
2545 | AggregateFunc::WindowAggregate { .. }
2546 | AggregateFunc::FusedValueWindowFunc { .. }
2547 | AggregateFunc::FusedWindowAggregate { .. } => None,
2548 }
2549 }
2550}
2551
2552mod window_agg_helpers {
2553 use crate::render::reduce::*;
2554
2555 pub enum OneByOneAggrImpls {
2560 Accumulable(AccumulableOneByOneAggr),
2561 Hierarchical(HierarchicalOneByOneAggr),
2562 Basic(mz_expr::NaiveOneByOneAggr),
2563 }
2564
2565 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2566 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2567 match reduction_type(agg) {
2568 ReductionType::Basic => {
2569 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2570 }
2571 ReductionType::Accumulable => {
2572 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2573 }
2574 ReductionType::Hierarchical => {
2575 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2576 }
2577 }
2578 }
2579
2580 fn give(&mut self, d: &Datum) {
2581 match self {
2582 OneByOneAggrImpls::Basic(i) => i.give(d),
2583 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2584 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2585 }
2586 }
2587
2588 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2589 match self {
2591 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2592 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2593 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2594 }
2595 }
2596 }
2597
2598 pub struct AccumulableOneByOneAggr {
2599 aggr_func: AggregateFunc,
2600 accum: Accum,
2601 total: Diff,
2602 }
2603
2604 impl AccumulableOneByOneAggr {
2605 fn new(aggr_func: &AggregateFunc) -> Self {
2606 AccumulableOneByOneAggr {
2607 aggr_func: aggr_func.clone(),
2608 accum: accumulable_zero(aggr_func),
2609 total: Diff::ZERO,
2610 }
2611 }
2612
2613 fn give(&mut self, d: &Datum) {
2614 self.accum
2615 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2616 self.total += Diff::ONE;
2617 }
2618
2619 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2620 temp_storage.make_datum(|packer| {
2621 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2622 })
2623 }
2624 }
2625
2626 pub struct HierarchicalOneByOneAggr {
2627 aggr_func: AggregateFunc,
2628 monoid: ReductionMonoid,
2631 }
2632
2633 impl HierarchicalOneByOneAggr {
2634 fn new(aggr_func: &AggregateFunc) -> Self {
2635 let mut row_buf = Row::default();
2636 row_buf.packer().push(Datum::Null);
2637 HierarchicalOneByOneAggr {
2638 aggr_func: aggr_func.clone(),
2639 monoid: get_monoid(row_buf, aggr_func)
2640 .expect("aggr_func should be a hierarchical aggregation function"),
2641 }
2642 }
2643
2644 fn give(&mut self, d: &Datum) {
2645 let mut row_buf = Row::default();
2646 row_buf.packer().push(d);
2647 let m = get_monoid(row_buf, &self.aggr_func)
2648 .expect("aggr_func should be a hierarchical aggregation function");
2649 self.monoid.plus_equals(&m);
2650 }
2651
2652 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2653 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2654 }
2655 }
2656}