1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use columnation::{Columnation, CopyRegion};
18use dec::OrderedDecimal;
19use differential_dataflow::Diff as _;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::BatchContainer;
26use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
27use differential_dataflow::trace::{Builder, Trace};
28use differential_dataflow::{Data, VecCollection};
29use itertools::Itertools;
30use mz_compute_types::plan::reduce::{
31 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
38use mz_repr::fixed_length::ToDatumIter;
39use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use tracing::warn;
45
46use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
47use crate::extensions::reduce::{ClearContainer, MzReduce};
48use crate::render::context::{CollectionBundle, Context};
49use crate::render::errors::DataflowErrorSer;
50use crate::render::errors::MaybeValidatingRow;
51use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
52use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
53use crate::row_spine::{
54 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
55};
56use crate::typedefs::{
57 ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58 RowRowSpine, RowSpine, RowValSpine,
59};
60
61impl<'scope, T: RenderTimestamp> Context<'scope, T> {
62 pub fn render_reduce(
65 &self,
66 input_key: Option<Vec<MirScalarExpr>>,
67 input: CollectionBundle<'scope, T>,
68 key_val_plan: KeyValPlan,
69 reduce_plan: ReducePlan,
70 mfp_after: Option<MapFilterProject>,
71 ) -> CollectionBundle<'scope, T> {
72 let mfp_after = mfp_after.map(|m| {
74 m.into_plan()
75 .expect("MFP planning must succeed")
76 .into_nontemporal()
77 .expect("Fused Reduce MFPs do not have temporal predicates")
78 });
79
80 input.scope().region_named("Reduce", |inner| {
81 let KeyValPlan {
82 mut key_plan,
83 mut val_plan,
84 } = key_val_plan;
85 let key_arity = key_plan.projection.len();
86 let mut datums = DatumVec::new();
87
88 let mut demand = Vec::new();
90 demand.extend(key_plan.demand());
91 demand.extend(val_plan.demand());
92 demand.sort();
93 demand.dedup();
94
95 let mut demand_map = BTreeMap::new();
97 for column in demand.iter() {
98 demand_map.insert(*column, demand_map.len());
99 }
100 let demand_map_len = demand_map.len();
101 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
102 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
103 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
104 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
105
106 let (key_val_input, err) = input
107 .enter_region(inner)
108 .flat_map::<_, ConsolidatingContainerBuilder<Vec<((Row, Row), T, Diff)>>, _>(
109 input_key.map(|k| (k, None)),
110 max_demand,
111 move |row_datums, time, diff, ok_session, err_session| {
112 let mut row_builder = SharedRow::get();
113 let temp_storage = RowArena::new();
114
115 let mut row_iter = row_datums.drain(..);
116 let mut datums_local = datums.borrow();
117 for skip in skips.iter() {
119 datums_local.push(row_iter.nth(*skip).unwrap());
120 }
121
122 let key = key_plan.evaluate_into(
124 &mut datums_local,
125 &temp_storage,
126 &mut row_builder,
127 );
128 let key = match key {
129 Err(e) => {
130 err_session.give((e.into(), time, diff));
131 return 1;
132 }
133 Ok(Some(key)) => key.clone(),
134 Ok(None) => panic!("Row expected as no predicate was used"),
135 };
136
137 datums_local.truncate(skips.len());
140 let val = val_plan.evaluate_into(
141 &mut datums_local,
142 &temp_storage,
143 &mut row_builder,
144 );
145 let val = match val {
146 Err(e) => {
147 err_session.give((e.into(), time, diff));
148 return 1;
149 }
150 Ok(Some(val)) => val.clone(),
151 Ok(None) => panic!("Row expected as no predicate was used"),
152 };
153
154 ok_session.give(((key, val), time, diff));
155 1
156 },
157 );
158
159 self.render_reduce_plan(
161 reduce_plan,
162 key_val_input.as_collection(),
163 err,
164 key_arity,
165 mfp_after,
166 )
167 .leave_region(self.scope)
168 })
169 }
170
171 fn render_reduce_plan<'s>(
177 &self,
178 plan: ReducePlan,
179 collection: VecCollection<'s, T, (Row, Row), Diff>,
180 err_input: VecCollection<'s, T, DataflowErrorSer, Diff>,
181 key_arity: usize,
182 mfp_after: Option<SafeMfpPlan>,
183 ) -> CollectionBundle<'s, T> {
184 let mut errors = Default::default();
185 let arrangement =
186 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188 CollectionBundle::from_columns(
189 0..key_arity,
190 ArrangementFlavor::Local(
191 arrangement,
192 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193 ),
194 )
195 }
196
197 fn render_reduce_plan_inner<'s>(
198 &self,
199 plan: ReducePlan,
200 collection: VecCollection<'s, T, (Row, Row), Diff>,
201 errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
202 key_arity: usize,
203 mfp_after: Option<SafeMfpPlan>,
204 ) -> Arranged<'s, RowRowAgent<T, Diff>> {
205 let arrangement = match plan {
208 ReducePlan::Distinct => {
211 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
212 errors.push(errs);
213 arranged_output
214 }
215 ReducePlan::Accumulable(expr) => {
216 let (arranged_output, errs) =
217 self.build_accumulable(collection, expr, key_arity, mfp_after);
218 errors.push(errs);
219 arranged_output
220 }
221 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
222 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
223 errors.push(errs);
224 output
225 }
226 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
227 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
228 errors.push(errs);
229 output
230 }
231 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
232 expr,
233 fused_unnest_list,
234 })) => {
235 let validating = !fused_unnest_list;
239 let (output, errs) = self.build_basic_aggregate(
240 collection,
241 0,
242 &expr,
243 validating,
244 key_arity,
245 mfp_after,
246 fused_unnest_list,
247 );
248 if validating {
249 errors.push(errs.expect("validation should have occurred as it was requested"));
250 }
251 output
252 }
253 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
254 let (output, errs) =
255 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
256 errors.push(errs);
257 output
258 }
259 };
260 arrangement
261 }
262
263 fn build_distinct<'s>(
265 &self,
266 collection: VecCollection<'s, T, (Row, Row), Diff>,
267 mfp_after: Option<SafeMfpPlan>,
268 ) -> (
269 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
270 VecCollection<'s, T, DataflowErrorSer, Diff>,
271 ) {
272 let error_logger = self.error_logger();
273
274 let mut datums1 = DatumVec::new();
276 let mut datums2 = DatumVec::new();
277 let mfp_after1 = mfp_after.clone();
278 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
279
280 let arranged = collection
281 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
282 "Arranged DistinctBy",
283 );
284 let output = arranged
285 .clone()
286 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
287 "DistinctBy",
288 move |key, _input, output| {
289 let temp_storage = RowArena::new();
290 let mut datums_local = datums1.borrow();
291 datums_local.extend(key.to_datum_iter());
292
293 if mfp_after1
297 .as_ref()
298 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
299 .unwrap_or(Ok(true))
300 == Ok(true)
301 {
302 output.push((Row::default(), Diff::ONE));
306 }
307 },
308 );
309 let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
310 "DistinctByErrorCheck",
311 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowErrorSer, _)>| {
312 for (_, count) in input.iter() {
313 if count.is_positive() {
314 continue;
315 }
316 let message = "Non-positive multiplicity in DistinctBy";
317 error_logger.log(message, &format!("row={key:?}, count={count}"));
318 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
319 return;
320 }
321 let Some(mfp) = &mfp_after2 else { return };
323 let temp_storage = RowArena::new();
324 let datum_iter = key.to_datum_iter();
325 let mut datums_local = datums2.borrow();
326 datums_local.extend(datum_iter);
327
328 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
329 output.push((e.into(), Diff::ONE));
330 }
331 },
332 );
333 (output, errors.as_collection(|_k, v| v.clone()))
334 }
335
336 fn build_basic_aggregates<'s>(
344 &self,
345 input: VecCollection<'s, T, (Row, Row), Diff>,
346 aggrs: Vec<AggregateExpr>,
347 key_arity: usize,
348 mfp_after: Option<SafeMfpPlan>,
349 ) -> (
350 RowRowArrangement<'s, T>,
351 VecCollection<'s, T, DataflowErrorSer, Diff>,
352 ) {
353 if aggrs.len() <= 1 {
356 self.error_logger().soft_panic_or_log(
357 "Too few aggregations when building basic aggregates",
358 &format!("len={}", aggrs.len()),
359 )
360 }
361 let mut err_output = None;
362 let mut to_collect = Vec::new();
363 for (index, aggr) in aggrs.into_iter().enumerate() {
364 let (result, errs) = self.build_basic_aggregate(
365 input.clone(),
366 index,
367 &aggr,
368 err_output.is_none(),
369 key_arity,
370 None,
371 false,
372 );
373 if errs.is_some() {
374 err_output = errs
375 }
376 to_collect
377 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
378 }
379
380 let mut datums1 = DatumVec::new();
382 let mut datums2 = DatumVec::new();
383 let mfp_after1 = mfp_after.clone();
384 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
385
386 let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
387 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
388 "Arranged ReduceFuseBasic input",
389 );
390
391 let output = arranged
392 .clone()
393 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
394 move |key, input, output| {
395 let temp_storage = RowArena::new();
396 let datum_iter = key.to_datum_iter();
397 let mut datums_local = datums1.borrow();
398 datums_local.extend(datum_iter);
399 let key_len = datums_local.len();
400
401 for ((_, row), _) in input.iter() {
402 datums_local.push(row.unpack_first());
403 }
404
405 if let Some(row) =
406 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
407 {
408 output.push((row, Diff::ONE));
409 }
410 }
411 });
412 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
417 if let Some(mfp) = mfp_after2 {
418 let mfp_errs = arranged
419 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
420 "ReduceFuseBasic Error Check",
421 move |key, input, output| {
422 let temp_storage = RowArena::new();
425 let datum_iter = key.to_datum_iter();
426 let mut datums_local = datums2.borrow();
427 datums_local.extend(datum_iter);
428
429 for ((_, row), _) in input.iter() {
430 datums_local.push(row.unpack_first());
431 }
432
433 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
434 output.push((e.into(), Diff::ONE));
435 }
436 },
437 )
438 .as_collection(|_, v| v.clone());
439 (output, validation_errs.concat(mfp_errs))
440 } else {
441 (output, validation_errs)
442 }
443 }
444
445 fn build_basic_aggregate<'s>(
449 &self,
450 input: VecCollection<'s, T, (Row, Row), Diff>,
451 index: usize,
452 aggr: &AggregateExpr,
453 validating: bool,
454 key_arity: usize,
455 mfp_after: Option<SafeMfpPlan>,
456 fused_unnest_list: bool,
457 ) -> (
458 RowRowArrangement<'s, T>,
459 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
460 ) {
461 let AggregateExpr {
462 func,
463 expr: _,
464 distinct,
465 } = aggr.clone();
466
467 let mut partial = input.map(move |(key, row)| {
469 let mut row_builder = SharedRow::get();
470 let value = row.iter().nth(index).unwrap();
471 row_builder.packer().push(value);
472 (key, row_builder.clone())
473 });
474
475 let mut err_output = None;
476
477 if distinct {
479 let pairer = Pairer::new(key_arity);
481 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
482 if validating {
483 let (oks, errs) = self
484 .build_reduce_inaccumulable_distinct::<
485 RowValBuilder<Result<(), String>, _, _>,
486 RowValSpine<Result<(), String>, _, _>,
487 >(keyed, None)
488 .as_collection(|k, v| {
489 (
490 k.to_row(),
491 v.as_ref()
492 .map(|&()| ())
493 .map_err(|m| m.as_str().into()),
494 )
495 })
496 .map_fallible::<
497 CapacityContainerBuilder<_>,
498 CapacityContainerBuilder<_>,
499 _,
500 _,
501 _,
502 >(
503 "Demux Errors",
504 move |(key_val, result)| match result {
505 Ok(()) => Ok(pairer.split(&key_val)),
506 Err(m) => {
507 Err(EvalError::Internal(m).into())
508 }
509 },
510 );
511 err_output = Some(errs);
512 partial = oks;
513 } else {
514 partial = self
515 .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
516 keyed,
517 Some(" [val: empty]"),
518 )
519 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
520 }
521 }
522
523 let mut datums1 = DatumVec::new();
525 let mut datums2 = DatumVec::new();
526 let mut datums_key_1 = DatumVec::new();
527 let mut datums_key_2 = DatumVec::new();
528 let mfp_after1 = mfp_after.clone();
529 let func2 = func.clone();
530
531 let name = if !fused_unnest_list {
532 "ReduceInaccumulable"
533 } else {
534 "FusedReduceUnnestList"
535 };
536 let arranged = partial
537 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
538 "Arranged {name}"
539 ));
540 let oks = if !fused_unnest_list {
541 arranged
542 .clone()
543 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
544 move |key, source, target| {
545 let iter = source.iter().flat_map(|(v, w)| {
549 let count = usize::try_from(w.into_inner()).unwrap_or(0);
552 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
553 });
554
555 let temp_storage = RowArena::new();
556 let datum_iter = key.to_datum_iter();
557 let mut datums_local = datums1.borrow();
558 datums_local.extend(datum_iter);
559 let key_len = datums_local.len();
560 datums_local.push(
561 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
564 iter,
565 &temp_storage,
566 ),
567 );
568
569 if let Some(row) = evaluate_mfp_after(
570 &mfp_after1,
571 &mut datums_local,
572 &temp_storage,
573 key_len,
574 ) {
575 target.push((row, Diff::ONE));
576 }
577 }
578 })
579 } else {
580 arranged
581 .clone()
582 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
583 move |key, source, target| {
584 let iter = source.iter().flat_map(|(v, w)| {
586 let count = usize::try_from(w.into_inner()).unwrap_or(0);
587 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
588 });
589
590 let temp_storage = RowArena::new();
592 let mut datums_local = datums_key_1.borrow();
593 datums_local.extend(key.to_datum_iter());
594 let key_len = datums_local.len();
595 for datum in func
596 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
597 iter,
598 &temp_storage,
599 )
600 {
601 datums_local.truncate(key_len);
602 datums_local.push(datum);
603 if let Some(row) = evaluate_mfp_after(
604 &mfp_after1,
605 &mut datums_local,
606 &temp_storage,
607 key_len,
608 ) {
609 target.push((row, Diff::ONE));
610 }
611 }
612 }
613 })
614 };
615
616 let must_validate = validating && err_output.is_none();
620 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
621 if must_validate || mfp_after2.is_some() {
622 let error_logger = self.error_logger();
623
624 let errs = if !fused_unnest_list {
625 arranged
626 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
627 &format!("{name} Error Check"),
628 move |key, source, target| {
629 if must_validate {
633 for (value, count) in source.iter() {
634 if count.is_positive() {
635 continue;
636 }
637 let value = value.to_row();
638 let message =
639 "Non-positive accumulation in ReduceInaccumulable";
640 error_logger
641 .log(message, &format!("value={value:?}, count={count}"));
642 let err = EvalError::Internal(message.into());
643 target.push((err.into(), Diff::ONE));
644 return;
645 }
646 }
647
648 let Some(mfp) = &mfp_after2 else { return };
650 let iter = source.iter().flat_map(|&(mut v, ref w)| {
651 let count = usize::try_from(w.into_inner()).unwrap_or(0);
652 std::iter::repeat(v.next().unwrap()).take(count)
655 });
656
657 let temp_storage = RowArena::new();
658 let datum_iter = key.to_datum_iter();
659 let mut datums_local = datums2.borrow();
660 datums_local.extend(datum_iter);
661 datums_local.push(
662 func2.eval_with_fast_window_agg::<
663 _,
664 window_agg_helpers::OneByOneAggrImpls,
665 >(
666 iter, &temp_storage
667 ),
668 );
669 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
670 target.push((e.into(), Diff::ONE));
671 }
672 },
673 )
674 .as_collection(|_, v| v.clone())
675 } else {
676 assert!(!must_validate);
678 let Some(mfp) = mfp_after2 else {
681 unreachable!()
682 };
683 arranged
684 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
685 &format!("{name} Error Check"),
686 move |key, source, target| {
687 let iter = source.iter().flat_map(|&(mut v, ref w)| {
688 let count = usize::try_from(w.into_inner()).unwrap_or(0);
689 std::iter::repeat(v.next().unwrap()).take(count)
692 });
693
694 let temp_storage = RowArena::new();
695 let mut datums_local = datums_key_2.borrow();
696 datums_local.extend(key.to_datum_iter());
697 let key_len = datums_local.len();
698 for datum in func2
699 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
700 iter,
701 &temp_storage,
702 )
703 {
704 datums_local.truncate(key_len);
705 datums_local.push(datum);
706 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
709 {
710 target.push((e.into(), Diff::ONE));
711 }
712 }
713 },
714 )
715 .as_collection(|_, v| v.clone())
716 };
717
718 if let Some(e) = err_output {
719 err_output = Some(e.concat(errs));
720 } else {
721 err_output = Some(errs);
722 }
723 }
724 (oks, err_output)
725 }
726
727 fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
728 &self,
729 input: VecCollection<'s, T, Row, Diff>,
730 name_tag: Option<&str>,
731 ) -> Arranged<'s, TraceAgent<Tr>>
732 where
733 Tr: for<'a> Trace<
734 Key<'a> = DatumSeq<'a>,
735 KeyContainer: BatchContainer<Owned = Row>,
736 Time = T,
737 Diff = Diff,
738 ValOwn: Data + MaybeValidatingRow<(), String>,
739 > + 'static,
740 Bu: Builder<
741 Time = T,
742 Input: Container
743 + InternalMerge
744 + ClearContainer
745 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
746 Output = Tr::Batch,
747 >,
748 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
749 {
750 let error_logger = self.error_logger();
751
752 let output_name = format!(
753 "ReduceInaccumulable Distinct{}",
754 name_tag.unwrap_or_default()
755 );
756
757 let input: KeyCollection<_, _, _> = input.into();
758 input
759 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
760 "Arranged ReduceInaccumulable Distinct [val: empty]",
761 )
762 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
763 if let Some(err) = Tr::ValOwn::into_error() {
764 for (value, count) in source.iter() {
765 if count.is_positive() {
766 continue;
767 }
768
769 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
770 error_logger.log(message, &format!("value={value:?}, count={count}"));
771 t.push((err(message.to_string()), Diff::ONE));
772 return;
773 }
774 }
775 t.push((Tr::ValOwn::ok(()), Diff::ONE))
776 })
777 }
778
779 fn build_bucketed<'s>(
797 &self,
798 input: VecCollection<'s, T, (Row, Row), Diff>,
799 BucketedPlan {
800 aggr_funcs,
801 buckets,
802 }: BucketedPlan,
803 key_arity: usize,
804 mfp_after: Option<SafeMfpPlan>,
805 ) -> (
806 RowRowArrangement<'s, T>,
807 VecCollection<'s, T, DataflowErrorSer, Diff>,
808 ) {
809 let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
810 let outer_scope = input.scope();
811 let arranged_output = outer_scope
812 .clone()
813 .region_named("ReduceHierarchical", |inner| {
814 let input = input.enter(inner);
815
816 let first_mod = buckets.get(0).copied().unwrap_or(1);
818 let aggregations = aggr_funcs.len();
819
820 let mut stage = input.map(move |(key, row)| {
822 let mut row_builder = SharedRow::get();
823 let mut row_packer = row_builder.packer();
824 row_packer.extend(row.iter().take(aggregations));
825 let values = row_builder.clone();
826
827 let hash = values.hashed() % first_mod;
829 let hash_key =
830 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
831 (hash_key, values)
832 });
833
834 for (index, b) in buckets.into_iter().enumerate() {
836 let input = if index == 0 {
838 stage
839 } else {
840 stage.map(move |(hash_key, values)| {
841 let mut hash_key_iter = hash_key.iter();
842 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
843 let hash_key = SharedRow::pack(
845 std::iter::once(Datum::from(hash))
846 .chain(hash_key_iter.take(key_arity)),
847 );
848 (hash_key, values)
849 })
850 };
851
852 let validating = err_output.is_none();
856
857 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
858 if let Some(errs) = errs {
859 err_output = Some(errs.leave_region(outer_scope));
860 }
861 stage = oks
862 }
863
864 let partial = stage.map(move |(hash_key, values)| {
866 let mut hash_key_iter = hash_key.iter();
867 let _hash = hash_key_iter.next();
868 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
869 });
870
871 let mut datums1 = DatumVec::new();
873 let mut datums2 = DatumVec::new();
874 let mfp_after1 = mfp_after.clone();
875 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
876 let aggr_funcs2 = aggr_funcs.clone();
877
878 let error_logger = self.error_logger();
881 let arranged = partial
884 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
885 "Arrange ReduceMinsMaxes",
886 );
887 let must_validate = err_output.is_none();
891 if must_validate || mfp_after2.is_some() {
892 let errs = arranged
893 .clone()
894 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
895 "ReduceMinsMaxes Error Check",
896 move |key, source, target| {
897 if must_validate {
901 for (val, count) in source.iter() {
902 if count.is_positive() {
903 continue;
904 }
905 let val = val.to_row();
906 let message =
907 "Non-positive accumulation in ReduceMinsMaxes";
908 error_logger
909 .log(message, &format!("val={val:?}, count={count}"));
910 target.push((
911 EvalError::Internal(message.into()).into(),
912 Diff::ONE,
913 ));
914 return;
915 }
916 }
917
918 let Some(mfp) = &mfp_after2 else { return };
920 let temp_storage = RowArena::new();
921 let datum_iter = key.to_datum_iter();
922 let mut datums_local = datums2.borrow();
923 datums_local.extend(datum_iter);
924
925 let mut source_iters = source
926 .iter()
927 .map(|(values, _cnt)| *values)
928 .collect::<Vec<_>>();
929 for func in aggr_funcs2.iter() {
930 let column_iter = (0..source_iters.len())
931 .map(|i| source_iters[i].next().unwrap());
932 datums_local.push(func.eval(column_iter, &temp_storage));
933 }
934 if let Result::Err(e) =
935 mfp.evaluate_inner(&mut datums_local, &temp_storage)
936 {
937 target.push((e.into(), Diff::ONE));
938 }
939 },
940 )
941 .as_collection(|_, v| v.clone())
942 .leave_region(outer_scope);
943 if let Some(e) = err_output.take() {
944 err_output = Some(e.concat(errs));
945 } else {
946 err_output = Some(errs);
947 }
948 }
949 arranged
950 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
951 "ReduceMinsMaxes",
952 move |key, source, target| {
953 let temp_storage = RowArena::new();
954 let datum_iter = key.to_datum_iter();
955 let mut datums_local = datums1.borrow();
956 datums_local.extend(datum_iter);
957 let key_len = datums_local.len();
958
959 let mut source_iters = source
960 .iter()
961 .map(|(values, _cnt)| *values)
962 .collect::<Vec<_>>();
963 for func in aggr_funcs.iter() {
964 let column_iter = (0..source_iters.len())
965 .map(|i| source_iters[i].next().unwrap());
966 datums_local.push(func.eval(column_iter, &temp_storage));
967 }
968
969 if let Some(row) = evaluate_mfp_after(
970 &mfp_after1,
971 &mut datums_local,
972 &temp_storage,
973 key_len,
974 ) {
975 target.push((row, Diff::ONE));
976 }
977 },
978 )
979 .leave_region(outer_scope)
980 });
981 (
982 arranged_output,
983 err_output.expect("expected to validate in one level of the hierarchy"),
984 )
985 }
986
987 fn build_bucketed_stage<'s>(
994 &self,
995 aggr_funcs: &Vec<AggregateFunc>,
996 input: VecCollection<'s, T, (Row, Row), Diff>,
997 validating: bool,
998 ) -> (
999 VecCollection<'s, T, (Row, Row), Diff>,
1000 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
1001 ) {
1002 let (input, negated_output, errs) = if validating {
1003 let (input, reduced) = self
1004 .build_bucketed_negated_output::<
1005 RowValBuilder<_, _, _>,
1006 RowValSpine<Result<Row, Row>, _, _>,
1007 >(
1008 input.clone(),
1009 aggr_funcs.clone(),
1010 );
1011 let (oks, errs) = reduced
1012 .as_collection(|k, v| (k.to_row(), v.clone()))
1013 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1014 "Checked Invalid Accumulations",
1015 |(hash_key, result)| match result {
1016 Err(hash_key) => {
1017 let mut hash_key_iter = hash_key.iter();
1018 let _hash = hash_key_iter.next();
1019 let key = SharedRow::pack(hash_key_iter);
1020 let message = format!(
1021 "Invalid data in source, saw non-positive accumulation \
1022 for key {key:?} in hierarchical mins-maxes aggregate"
1023 );
1024 Err(EvalError::Internal(message.into()).into())
1025 }
1026 Ok(values) => Ok((hash_key, values)),
1027 },
1028 );
1029 (input, oks, Some(errs))
1030 } else {
1031 let (input, reduced) = self
1032 .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1033 input,
1034 aggr_funcs.clone(),
1035 );
1036 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1039 (input, oks, None)
1040 };
1041
1042 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1043 let oks = negated_output.concat(input);
1044 (oks, errs)
1045 }
1046
1047 fn build_bucketed_negated_output<'s, Bu, Tr>(
1051 &self,
1052 input: VecCollection<'s, T, (Row, Row), Diff>,
1053 aggrs: Vec<AggregateFunc>,
1054 ) -> (
1055 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1056 Arranged<'s, TraceAgent<Tr>>,
1057 )
1058 where
1059 Tr: for<'a> Trace<
1060 Key<'a> = DatumSeq<'a>,
1061 KeyContainer: BatchContainer<Owned = Row>,
1062 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1063 Time = T,
1064 Diff = Diff,
1065 > + 'static,
1066 Bu: Builder<
1067 Time = T,
1068 Input: Container
1069 + InternalMerge
1070 + ClearContainer
1071 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1072 Output = Tr::Batch,
1073 >,
1074 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1075 {
1076 let error_logger = self.error_logger();
1077 let arranged_input = input
1080 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1081 "Arranged MinsMaxesHierarchical input",
1082 );
1083
1084 let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1085 "Reduced Fallibly MinsMaxesHierarchical",
1086 move |key, source, target| {
1087 if let Some(err) = Tr::ValOwn::into_error() {
1088 for (value, count) in source.iter() {
1090 if count.is_positive() {
1091 continue;
1092 }
1093 error_logger.log(
1094 "Non-positive accumulation in MinsMaxesHierarchical",
1095 &format!("key={key:?}, value={value:?}, count={count}"),
1096 );
1097 target.push((
1100 err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1101 Diff::ONE,
1102 ));
1103 return;
1104 }
1105 }
1106
1107 let mut row_builder = SharedRow::get();
1108 let mut row_packer = row_builder.packer();
1109
1110 let mut source_iters = source
1111 .iter()
1112 .map(|(values, _cnt)| *values)
1113 .collect::<Vec<_>>();
1114 for func in aggrs.iter() {
1115 let column_iter =
1116 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1117 row_packer.push(func.eval(column_iter, &RowArena::new()));
1118 }
1119 target.reserve(source.len().saturating_add(1));
1125 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1126 target.extend(source.iter().map(|(values, cnt)| {
1127 let mut cnt = *cnt;
1128 cnt.negate();
1129 (Tr::ValOwn::ok(values.to_row()), cnt)
1130 }));
1131 },
1132 );
1133 (arranged_input, reduced)
1134 }
1135
1136 fn build_monotonic<'s>(
1139 &self,
1140 collection: VecCollection<'s, T, (Row, Row), Diff>,
1141 MonotonicPlan {
1142 aggr_funcs,
1143 must_consolidate,
1144 }: MonotonicPlan,
1145 mfp_after: Option<SafeMfpPlan>,
1146 ) -> (
1147 RowRowArrangement<'s, T>,
1148 VecCollection<'s, T, DataflowErrorSer, Diff>,
1149 ) {
1150 let aggregations = aggr_funcs.len();
1151 let collection = collection
1153 .map(move |(key, row)| {
1154 let mut row_builder = SharedRow::get();
1155 let mut values = Vec::with_capacity(aggregations);
1156 values.extend(
1157 row.iter()
1158 .take(aggregations)
1159 .map(|v| row_builder.pack_using(std::iter::once(v))),
1160 );
1161
1162 (key, values)
1163 })
1164 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1165 must_consolidate,
1166 "Consolidated ReduceMonotonic input",
1167 );
1168
1169 let error_logger = self.error_logger();
1171 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1172 error_logger.log(
1173 "Non-monotonic input to ReduceMonotonic",
1174 &format!("data={data:?}, diff={diff}"),
1175 );
1176 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1177 (EvalError::Internal(m).into(), Diff::ONE)
1178 });
1179 let partial = partial.explode_one(move |(key, values)| {
1183 let mut output = Vec::new();
1184 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1185 output.push(monoids::get_monoid(row, func).expect(
1186 "hierarchical aggregations are expected to have monoid implementations",
1187 ));
1188 }
1189 (key, output)
1190 });
1191
1192 let mut datums1 = DatumVec::new();
1194 let mut datums2 = DatumVec::new();
1195 let mfp_after1 = mfp_after.clone();
1196 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1197
1198 let partial: KeyCollection<_, _, _> = partial.into();
1199 let arranged = partial
1200 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1201 "ArrangeMonotonic [val: empty]",
1202 );
1203 let output = arranged
1204 .clone()
1205 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1206 move |key, input, output| {
1207 let temp_storage = RowArena::new();
1208 let datum_iter = key.to_datum_iter();
1209 let mut datums_local = datums1.borrow();
1210 datums_local.extend(datum_iter);
1211 let key_len = datums_local.len();
1212 let accum = &input[0].1;
1213 for monoid in accum.iter() {
1214 datums_local.extend(monoid.finalize().iter());
1215 }
1216
1217 if let Some(row) =
1218 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1219 {
1220 output.push((row, Diff::ONE));
1221 }
1222 }
1223 });
1224
1225 if let Some(mfp) = mfp_after2 {
1230 let mfp_errs = arranged
1231 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1232 "ReduceMonotonic Error Check",
1233 move |key, input, output| {
1234 let temp_storage = RowArena::new();
1235 let datum_iter = key.to_datum_iter();
1236 let mut datums_local = datums2.borrow();
1237 datums_local.extend(datum_iter);
1238 let accum = &input[0].1;
1239 for monoid in accum.iter() {
1240 datums_local.extend(monoid.finalize().iter());
1241 }
1242 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1243 {
1244 output.push((e.into(), Diff::ONE));
1245 }
1246 },
1247 )
1248 .as_collection(|_k, v| v.clone());
1249 (output, validation_errs.concat(mfp_errs))
1250 } else {
1251 (output, validation_errs)
1252 }
1253 }
1254
1255 fn build_accumulable<'s>(
1262 &self,
1263 collection: VecCollection<'s, T, (Row, Row), Diff>,
1264 AccumulablePlan {
1265 full_aggrs,
1266 simple_aggrs,
1267 distinct_aggrs,
1268 }: AccumulablePlan,
1269 key_arity: usize,
1270 mfp_after: Option<SafeMfpPlan>,
1271 ) -> (
1272 RowRowArrangement<'s, T>,
1273 VecCollection<'s, T, DataflowErrorSer, Diff>,
1274 ) {
1275 let collection_scope = collection.scope();
1276
1277 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1279 self.error_logger().soft_panic_or_log(
1280 "Incorrect numbers of aggregates in accummulable reduction rendering",
1281 &format!(
1282 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1283 full_aggrs.len(),
1284 simple_aggrs.len(),
1285 distinct_aggrs.len(),
1286 ),
1287 );
1288 }
1289
1290 let zero_diffs: (Vec<_>, Diff) = (
1302 full_aggrs
1303 .iter()
1304 .map(|f| accumulable_zero(&f.func))
1305 .collect(),
1306 Diff::ZERO,
1307 );
1308
1309 let mut to_aggregate = Vec::new();
1310 if simple_aggrs.len() > 0 {
1311 let collection = collection.clone();
1313 let easy_cases = collection.explode_one({
1314 let zero_diffs = zero_diffs.clone();
1315 move |(key, row)| {
1316 let mut diffs = zero_diffs.clone();
1317 let mut row_iter = row.iter().enumerate();
1323 for (datum_index, aggr) in simple_aggrs.iter() {
1324 let mut datum = row_iter.next().unwrap();
1325 while datum_index != &datum.0 {
1326 datum = row_iter.next().unwrap();
1327 }
1328 let datum = datum.1;
1329 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1330 diffs.1 = Diff::ONE;
1331 }
1332 ((key, ()), diffs)
1333 }
1334 });
1335 to_aggregate.push(easy_cases);
1336 }
1337
1338 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1340 let pairer = Pairer::new(key_arity);
1341 let collection = collection
1342 .clone()
1343 .map(move |(key, row)| {
1344 let value = row.iter().nth(datum_index).unwrap();
1345 (pairer.merge(&key, std::iter::once(value)), ())
1346 })
1347 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1348 "Arranged Accumulable Distinct [val: empty]",
1349 )
1350 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1351 "Reduced Accumulable Distinct [val: empty]",
1352 move |_k, _s, t| t.push(((), Diff::ONE)),
1353 )
1354 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1355 .explode_one({
1356 let zero_diffs = zero_diffs.clone();
1357 move |(key, row)| {
1358 let datum = row.iter().next().unwrap();
1359 let mut diffs = zero_diffs.clone();
1360 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1361 diffs.1 = Diff::ONE;
1362 ((key, ()), diffs)
1363 }
1364 });
1365 to_aggregate.push(collection);
1366 }
1367
1368 let collection = if to_aggregate.len() == 1 {
1370 to_aggregate.remove(0)
1371 } else {
1372 differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1373 };
1374
1375 let mut datums1 = DatumVec::new();
1377 let mut datums2 = DatumVec::new();
1378 let mfp_after1 = mfp_after.clone();
1379 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1380 let full_aggrs2 = full_aggrs.clone();
1381
1382 let error_logger = self.error_logger();
1383 let err_full_aggrs = full_aggrs.clone();
1384 let arranged = collection
1385 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, (Vec<Accum>, Diff)>>(
1386 "ArrangeAccumulable [val: empty]",
1387 );
1388 let arranged_output = arranged
1389 .clone()
1390 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1391 move |key, input, output| {
1392 let (ref accums, total) = input[0].1;
1393
1394 let temp_storage = RowArena::new();
1395 let datum_iter = key.to_datum_iter();
1396 let mut datums_local = datums1.borrow();
1397 datums_local.extend(datum_iter);
1398 let key_len = datums_local.len();
1399 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1400 datums_local.push(finalize_accum(&aggr.func, accum, total));
1401 }
1402
1403 if let Some(row) =
1404 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1405 {
1406 output.push((row, Diff::ONE));
1407 }
1408 }
1409 });
1410 let arranged_errs = arranged
1411 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1412 "AccumulableErrorCheck",
1413 move |key, input, output| {
1414 let (ref accums, total) = input[0].1;
1415 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1416 if total == Diff::ZERO && !accum.is_zero() {
1419 error_logger.log(
1420 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1421 &format!("aggr={aggr:?}, accum={accum:?}"),
1422 );
1423 let key = key.to_row();
1424 let message = format!(
1425 "Invalid data in source, saw net-zero records for key {key} \
1426 with non-zero accumulation in accumulable aggregate"
1427 );
1428 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1429 }
1430 match (&aggr.func, &accum) {
1431 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1432 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1433 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1434 if accum.is_negative() {
1435 error_logger.log(
1436 "Invalid negative unsigned aggregation in ReduceAccumulable",
1437 &format!("aggr={aggr:?}, accum={accum:?}"),
1438 );
1439 let key = key.to_row();
1440 let message = format!(
1441 "Invalid data in source, saw negative accumulation with \
1442 unsigned type for key {key}"
1443 );
1444 let err = EvalError::Internal(message.into());
1445 output.push((err.into(), Diff::ONE));
1446 }
1447 }
1448 _ => (), }
1450 }
1451
1452 let Some(mfp) = &mfp_after2 else { return };
1454 let temp_storage = RowArena::new();
1455 let datum_iter = key.to_datum_iter();
1456 let mut datums_local = datums2.borrow();
1457 datums_local.extend(datum_iter);
1458 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1459 datums_local.push(finalize_accum(&aggr.func, accum, total));
1460 }
1461
1462 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1463 output.push((e.into(), Diff::ONE));
1464 }
1465 },
1466 );
1467 (
1468 arranged_output,
1469 arranged_errs.as_collection(|_key, error| error.clone()),
1470 )
1471 }
1472}
1473
1474fn evaluate_mfp_after<'a, 'b>(
1478 mfp_after: &'a Option<SafeMfpPlan>,
1479 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1480 temp_storage: &'a RowArena,
1481 key_len: usize,
1482) -> Option<Row> {
1483 let mut row_builder = SharedRow::get();
1484 if let Some(mfp) = mfp_after {
1487 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1490 Some(row_builder.pack_using(iter.skip(key_len)))
1493 } else {
1494 None
1495 }
1496 } else {
1497 Some(row_builder.pack_using(&datums_local[key_len..]))
1498 }
1499}
1500
1501fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1502 match aggr_func {
1503 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1504 trues: Diff::ZERO,
1505 falses: Diff::ZERO,
1506 },
1507 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1508 accum: AccumCount::ZERO,
1509 pos_infs: Diff::ZERO,
1510 neg_infs: Diff::ZERO,
1511 nans: Diff::ZERO,
1512 non_nulls: Diff::ZERO,
1513 },
1514 AggregateFunc::SumNumeric => Accum::Numeric {
1515 accum: OrderedDecimal(NumericAgg::zero()),
1516 pos_infs: Diff::ZERO,
1517 neg_infs: Diff::ZERO,
1518 nans: Diff::ZERO,
1519 non_nulls: Diff::ZERO,
1520 },
1521 _ => Accum::SimpleNumber {
1522 accum: AccumCount::ZERO,
1523 non_nulls: Diff::ZERO,
1524 },
1525 }
1526}
1527
1528static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1529
1530fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1531 match aggregate_func {
1532 AggregateFunc::Count => Accum::SimpleNumber {
1533 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1535 Diff::ZERO
1536 } else {
1537 Diff::ONE
1538 },
1539 },
1540 AggregateFunc::Any | AggregateFunc::All => match datum {
1541 Datum::True => Accum::Bool {
1542 trues: Diff::ONE,
1543 falses: Diff::ZERO,
1544 },
1545 Datum::Null => Accum::Bool {
1546 trues: Diff::ZERO,
1547 falses: Diff::ZERO,
1548 },
1549 Datum::False => Accum::Bool {
1550 trues: Diff::ZERO,
1551 falses: Diff::ONE,
1552 },
1553 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1554 },
1555 AggregateFunc::Dummy => match datum {
1556 Datum::Dummy => Accum::SimpleNumber {
1557 accum: AccumCount::ZERO,
1558 non_nulls: Diff::ZERO,
1559 },
1560 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1561 },
1562 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1563 let n = match datum {
1564 Datum::Float32(n) => f64::from(*n),
1565 Datum::Float64(n) => *n,
1566 Datum::Null => 0f64,
1567 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1568 };
1569
1570 let nans = Diff::from(n.is_nan());
1571 let pos_infs = Diff::from(n == f64::INFINITY);
1572 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1573 let non_nulls = Diff::from(datum != Datum::Null);
1574
1575 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1578 AccumCount::ZERO
1579 } else {
1580 #[allow(clippy::as_conversions)]
1583 { (n * *FLOAT_SCALE) as i128 }.into()
1584 };
1585
1586 Accum::Float {
1587 accum,
1588 pos_infs,
1589 neg_infs,
1590 nans,
1591 non_nulls,
1592 }
1593 }
1594 AggregateFunc::SumNumeric => match datum {
1595 Datum::Numeric(n) => {
1596 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1597 if n.0.is_negative() {
1598 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1599 } else {
1600 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1601 }
1602 } else if n.0.is_nan() {
1603 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1604 } else {
1605 let mut cx_agg = numeric::cx_agg();
1608 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1609 };
1610
1611 Accum::Numeric {
1612 accum: OrderedDecimal(accum),
1613 pos_infs,
1614 neg_infs,
1615 nans,
1616 non_nulls: Diff::ONE,
1617 }
1618 }
1619 Datum::Null => Accum::Numeric {
1620 accum: OrderedDecimal(NumericAgg::zero()),
1621 pos_infs: Diff::ZERO,
1622 neg_infs: Diff::ZERO,
1623 nans: Diff::ZERO,
1624 non_nulls: Diff::ZERO,
1625 },
1626 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1627 },
1628 _ => {
1629 match datum {
1633 Datum::Int16(i) => Accum::SimpleNumber {
1634 accum: i.into(),
1635 non_nulls: Diff::ONE,
1636 },
1637 Datum::Int32(i) => Accum::SimpleNumber {
1638 accum: i.into(),
1639 non_nulls: Diff::ONE,
1640 },
1641 Datum::Int64(i) => Accum::SimpleNumber {
1642 accum: i.into(),
1643 non_nulls: Diff::ONE,
1644 },
1645 Datum::UInt16(u) => Accum::SimpleNumber {
1646 accum: u.into(),
1647 non_nulls: Diff::ONE,
1648 },
1649 Datum::UInt32(u) => Accum::SimpleNumber {
1650 accum: u.into(),
1651 non_nulls: Diff::ONE,
1652 },
1653 Datum::UInt64(u) => Accum::SimpleNumber {
1654 accum: u.into(),
1655 non_nulls: Diff::ONE,
1656 },
1657 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1658 accum: u64::from(t).into(),
1659 non_nulls: Diff::ONE,
1660 },
1661 Datum::Null => Accum::SimpleNumber {
1662 accum: AccumCount::ZERO,
1663 non_nulls: Diff::ZERO,
1664 },
1665 x => panic!("Accumulating non-integer data: {x:?}"),
1666 }
1667 }
1668 }
1669}
1670
1671fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1672 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1676 Datum::Null
1677 } else {
1678 match (&aggr_func, &accum) {
1679 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1680 Datum::Int64(non_nulls.into_inner())
1681 }
1682 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1683 if falses.is_positive() {
1685 Datum::False
1686 } else if *trues == total {
1687 Datum::True
1688 } else {
1689 Datum::Null
1690 }
1691 }
1692 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1693 if trues.is_positive() {
1695 Datum::True
1696 } else if *falses == total {
1697 Datum::False
1698 } else {
1699 Datum::Null
1700 }
1701 }
1702 (AggregateFunc::Dummy, _) => Datum::Dummy,
1703 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1705 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1706 #[allow(clippy::as_conversions)]
1711 Datum::Int64(accum.into_inner() as i64)
1712 }
1713 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1714 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1715 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1716 if !accum.is_negative() {
1717 #[allow(clippy::as_conversions)]
1723 Datum::UInt64(accum.into_inner() as u64)
1724 } else {
1725 Datum::Null
1729 }
1730 }
1731 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1732 if !accum.is_negative() {
1733 Datum::from(*accum)
1734 } else {
1735 Datum::Null
1739 }
1740 }
1741 (
1742 AggregateFunc::SumFloat32,
1743 Accum::Float {
1744 accum,
1745 pos_infs,
1746 neg_infs,
1747 nans,
1748 non_nulls: _,
1749 },
1750 ) => {
1751 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1752 Datum::from(f32::NAN)
1755 } else if pos_infs.is_positive() {
1756 Datum::from(f32::INFINITY)
1757 } else if neg_infs.is_positive() {
1758 Datum::from(f32::NEG_INFINITY)
1759 } else {
1760 #[allow(clippy::as_conversions)]
1762 {
1763 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1764 }
1765 }
1766 }
1767 (
1768 AggregateFunc::SumFloat64,
1769 Accum::Float {
1770 accum,
1771 pos_infs,
1772 neg_infs,
1773 nans,
1774 non_nulls: _,
1775 },
1776 ) => {
1777 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1778 Datum::from(f64::NAN)
1781 } else if pos_infs.is_positive() {
1782 Datum::from(f64::INFINITY)
1783 } else if neg_infs.is_positive() {
1784 Datum::from(f64::NEG_INFINITY)
1785 } else {
1786 #[allow(clippy::as_conversions)]
1788 {
1789 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1790 }
1791 }
1792 }
1793 (
1794 AggregateFunc::SumNumeric,
1795 Accum::Numeric {
1796 accum,
1797 pos_infs,
1798 neg_infs,
1799 nans,
1800 non_nulls: _,
1801 },
1802 ) => {
1803 let mut cx_datum = numeric::cx_datum();
1804 let d = cx_datum.to_width(accum.0);
1805 let inf_d = d.is_infinite();
1811 let neg_d = d.is_negative();
1812 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1813 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1814 if nans.is_positive() || (pos_inf && neg_inf) {
1815 Datum::from(Numeric::nan())
1818 } else if pos_inf {
1819 Datum::from(Numeric::infinity())
1820 } else if neg_inf {
1821 let mut cx = numeric::cx_datum();
1822 let mut d = Numeric::infinity();
1823 cx.neg(&mut d);
1824 Datum::from(d)
1825 } else {
1826 Datum::from(d)
1827 }
1828 }
1829 _ => panic!(
1830 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1831 aggr_func
1832 ),
1833 }
1834 }
1835}
1836
1837type AccumCount = mz_ore::Overflowing<i128>;
1839
1840#[derive(
1851 Debug,
1852 Clone,
1853 Copy,
1854 PartialEq,
1855 Eq,
1856 PartialOrd,
1857 Ord,
1858 Serialize,
1859 Deserialize
1860)]
1861enum Accum {
1862 Bool {
1864 trues: Diff,
1866 falses: Diff,
1868 },
1869 SimpleNumber {
1871 accum: AccumCount,
1873 non_nulls: Diff,
1875 },
1876 Float {
1878 accum: AccumCount,
1881 pos_infs: Diff,
1883 neg_infs: Diff,
1885 nans: Diff,
1887 non_nulls: Diff,
1889 },
1890 Numeric {
1892 accum: OrderedDecimal<NumericAgg>,
1894 pos_infs: Diff,
1896 neg_infs: Diff,
1898 nans: Diff,
1900 non_nulls: Diff,
1902 },
1903}
1904
1905impl IsZero for Accum {
1906 fn is_zero(&self) -> bool {
1907 match self {
1908 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1909 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1910 Accum::Float {
1911 accum,
1912 pos_infs,
1913 neg_infs,
1914 nans,
1915 non_nulls,
1916 } => {
1917 accum.is_zero()
1918 && pos_infs.is_zero()
1919 && neg_infs.is_zero()
1920 && nans.is_zero()
1921 && non_nulls.is_zero()
1922 }
1923 Accum::Numeric {
1924 accum,
1925 pos_infs,
1926 neg_infs,
1927 nans,
1928 non_nulls,
1929 } => {
1930 accum.0.is_zero()
1931 && pos_infs.is_zero()
1932 && neg_infs.is_zero()
1933 && nans.is_zero()
1934 && non_nulls.is_zero()
1935 }
1936 }
1937 }
1938}
1939
1940impl Semigroup for Accum {
1941 fn plus_equals(&mut self, other: &Accum) {
1942 match (&mut *self, other) {
1943 (
1944 Accum::Bool { trues, falses },
1945 Accum::Bool {
1946 trues: other_trues,
1947 falses: other_falses,
1948 },
1949 ) => {
1950 *trues += other_trues;
1951 *falses += other_falses;
1952 }
1953 (
1954 Accum::SimpleNumber { accum, non_nulls },
1955 Accum::SimpleNumber {
1956 accum: other_accum,
1957 non_nulls: other_non_nulls,
1958 },
1959 ) => {
1960 *accum += other_accum;
1961 *non_nulls += other_non_nulls;
1962 }
1963 (
1964 Accum::Float {
1965 accum,
1966 pos_infs,
1967 neg_infs,
1968 nans,
1969 non_nulls,
1970 },
1971 Accum::Float {
1972 accum: other_accum,
1973 pos_infs: other_pos_infs,
1974 neg_infs: other_neg_infs,
1975 nans: other_nans,
1976 non_nulls: other_non_nulls,
1977 },
1978 ) => {
1979 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1980 warn!("Float accumulator overflow. Incorrect results possible");
1981 accum.wrapping_add(*other_accum)
1982 });
1983 *pos_infs += other_pos_infs;
1984 *neg_infs += other_neg_infs;
1985 *nans += other_nans;
1986 *non_nulls += other_non_nulls;
1987 }
1988 (
1989 Accum::Numeric {
1990 accum,
1991 pos_infs,
1992 neg_infs,
1993 nans,
1994 non_nulls,
1995 },
1996 Accum::Numeric {
1997 accum: other_accum,
1998 pos_infs: other_pos_infs,
1999 neg_infs: other_neg_infs,
2000 nans: other_nans,
2001 non_nulls: other_non_nulls,
2002 },
2003 ) => {
2004 let mut cx_agg = numeric::cx_agg();
2005 cx_agg.add(&mut accum.0, &other_accum.0);
2006 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2012 cx_agg.reduce(&mut accum.0);
2031 *pos_infs += other_pos_infs;
2032 *neg_infs += other_neg_infs;
2033 *nans += other_nans;
2034 *non_nulls += other_non_nulls;
2035 }
2036 (l, r) => unreachable!(
2037 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2038 ),
2039 }
2040 }
2041}
2042
2043impl Multiply<Diff> for Accum {
2044 type Output = Accum;
2045
2046 fn multiply(self, factor: &Diff) -> Accum {
2047 let factor = *factor;
2048 match self {
2049 Accum::Bool { trues, falses } => Accum::Bool {
2050 trues: trues * factor,
2051 falses: falses * factor,
2052 },
2053 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2054 accum: accum * AccumCount::from(factor),
2055 non_nulls: non_nulls * factor,
2056 },
2057 Accum::Float {
2058 accum,
2059 pos_infs,
2060 neg_infs,
2061 nans,
2062 non_nulls,
2063 } => Accum::Float {
2064 accum: accum
2065 .checked_mul(AccumCount::from(factor))
2066 .unwrap_or_else(|| {
2067 warn!("Float accumulator overflow. Incorrect results possible");
2068 accum.wrapping_mul(AccumCount::from(factor))
2069 }),
2070 pos_infs: pos_infs * factor,
2071 neg_infs: neg_infs * factor,
2072 nans: nans * factor,
2073 non_nulls: non_nulls * factor,
2074 },
2075 Accum::Numeric {
2076 accum,
2077 pos_infs,
2078 neg_infs,
2079 nans,
2080 non_nulls,
2081 } => {
2082 let mut cx = numeric::cx_agg();
2083 let mut f = NumericAgg::from(factor.into_inner());
2084 cx.mul(&mut f, &accum.0);
2088 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2094 Accum::Numeric {
2095 accum: OrderedDecimal(f),
2096 pos_infs: pos_infs * factor,
2097 neg_infs: neg_infs * factor,
2098 nans: nans * factor,
2099 non_nulls: non_nulls * factor,
2100 }
2101 }
2102 }
2103 }
2104}
2105
2106impl Columnation for Accum {
2107 type InnerRegion = CopyRegion<Self>;
2108}
2109
2110mod monoids {
2112
2113 use columnation::{Columnation, Region};
2129 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2130 use mz_expr::AggregateFunc;
2131 use mz_ore::soft_panic_or_log;
2132 use mz_repr::{Datum, Diff, Row};
2133 use serde::{Deserialize, Serialize};
2134
2135 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2137 pub enum ReductionMonoid {
2138 Min(Row),
2139 Max(Row),
2140 }
2141
2142 impl ReductionMonoid {
2143 pub fn finalize(&self) -> &Row {
2144 use ReductionMonoid::*;
2145 match self {
2146 Min(row) | Max(row) => row,
2147 }
2148 }
2149 }
2150
2151 impl Clone for ReductionMonoid {
2152 fn clone(&self) -> Self {
2153 use ReductionMonoid::*;
2154 match self {
2155 Min(row) => Min(row.clone()),
2156 Max(row) => Max(row.clone()),
2157 }
2158 }
2159
2160 fn clone_from(&mut self, source: &Self) {
2161 use ReductionMonoid::*;
2162
2163 let mut row = std::mem::take(match self {
2164 Min(row) | Max(row) => row,
2165 });
2166
2167 let source_row = match source {
2168 Min(row) | Max(row) => row,
2169 };
2170
2171 row.clone_from(source_row);
2172
2173 match source {
2174 Min(_) => *self = Min(row),
2175 Max(_) => *self = Max(row),
2176 }
2177 }
2178 }
2179
2180 impl Multiply<Diff> for ReductionMonoid {
2181 type Output = Self;
2182
2183 fn multiply(self, factor: &Diff) -> Self {
2184 assert!(factor.is_positive());
2189 self
2190 }
2191 }
2192
2193 impl Semigroup for ReductionMonoid {
2194 fn plus_equals(&mut self, rhs: &Self) {
2195 match (self, rhs) {
2196 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2197 let swap = {
2198 let lhs_val = lhs.unpack_first();
2199 let rhs_val = rhs.unpack_first();
2200 match (lhs_val, rhs_val) {
2202 (_, Datum::Null) => false,
2203 (Datum::Null, _) => true,
2204 (lhs, rhs) => rhs < lhs,
2205 }
2206 };
2207 if swap {
2208 lhs.clone_from(rhs);
2209 }
2210 }
2211 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2212 let swap = {
2213 let lhs_val = lhs.unpack_first();
2214 let rhs_val = rhs.unpack_first();
2215 match (lhs_val, rhs_val) {
2217 (_, Datum::Null) => false,
2218 (Datum::Null, _) => true,
2219 (lhs, rhs) => rhs > lhs,
2220 }
2221 };
2222 if swap {
2223 lhs.clone_from(rhs);
2224 }
2225 }
2226 (lhs, rhs) => {
2227 soft_panic_or_log!(
2228 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2229 );
2230 }
2231 }
2232 }
2233 }
2234
2235 impl IsZero for ReductionMonoid {
2236 fn is_zero(&self) -> bool {
2237 false
2243 }
2244 }
2245
2246 impl Columnation for ReductionMonoid {
2247 type InnerRegion = ReductionMonoidRegion;
2248 }
2249
2250 #[derive(Default)]
2254 pub struct ReductionMonoidRegion {
2255 inner: <Row as Columnation>::InnerRegion,
2256 }
2257
2258 impl Region for ReductionMonoidRegion {
2259 type Item = ReductionMonoid;
2260
2261 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2262 use ReductionMonoid::*;
2263 match item {
2264 Min(row) => Min(unsafe { self.inner.copy(row) }),
2265 Max(row) => Max(unsafe { self.inner.copy(row) }),
2266 }
2267 }
2268
2269 fn clear(&mut self) {
2270 self.inner.clear();
2271 }
2272
2273 fn reserve_items<'a, I>(&mut self, items: I)
2274 where
2275 Self: 'a,
2276 I: Iterator<Item = &'a Self::Item> + Clone,
2277 {
2278 self.inner
2279 .reserve_items(items.map(ReductionMonoid::finalize));
2280 }
2281
2282 fn reserve_regions<'a, I>(&mut self, regions: I)
2283 where
2284 Self: 'a,
2285 I: Iterator<Item = &'a Self> + Clone,
2286 {
2287 self.inner.reserve_regions(regions.map(|r| &r.inner));
2288 }
2289
2290 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2291 self.inner.heap_size(callback);
2292 }
2293 }
2294
2295 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2298 match func {
2299 AggregateFunc::MaxNumeric
2300 | AggregateFunc::MaxInt16
2301 | AggregateFunc::MaxInt32
2302 | AggregateFunc::MaxInt64
2303 | AggregateFunc::MaxUInt16
2304 | AggregateFunc::MaxUInt32
2305 | AggregateFunc::MaxUInt64
2306 | AggregateFunc::MaxMzTimestamp
2307 | AggregateFunc::MaxFloat32
2308 | AggregateFunc::MaxFloat64
2309 | AggregateFunc::MaxBool
2310 | AggregateFunc::MaxString
2311 | AggregateFunc::MaxDate
2312 | AggregateFunc::MaxTimestamp
2313 | AggregateFunc::MaxTimestampTz
2314 | AggregateFunc::MaxInterval
2315 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2316 AggregateFunc::MinNumeric
2317 | AggregateFunc::MinInt16
2318 | AggregateFunc::MinInt32
2319 | AggregateFunc::MinInt64
2320 | AggregateFunc::MinUInt16
2321 | AggregateFunc::MinUInt32
2322 | AggregateFunc::MinUInt64
2323 | AggregateFunc::MinMzTimestamp
2324 | AggregateFunc::MinFloat32
2325 | AggregateFunc::MinFloat64
2326 | AggregateFunc::MinBool
2327 | AggregateFunc::MinString
2328 | AggregateFunc::MinDate
2329 | AggregateFunc::MinTimestamp
2330 | AggregateFunc::MinTimestampTz
2331 | AggregateFunc::MinInterval
2332 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2333 AggregateFunc::SumInt16
2334 | AggregateFunc::SumInt32
2335 | AggregateFunc::SumInt64
2336 | AggregateFunc::SumUInt16
2337 | AggregateFunc::SumUInt32
2338 | AggregateFunc::SumUInt64
2339 | AggregateFunc::SumFloat32
2340 | AggregateFunc::SumFloat64
2341 | AggregateFunc::SumNumeric
2342 | AggregateFunc::Count
2343 | AggregateFunc::Any
2344 | AggregateFunc::All
2345 | AggregateFunc::Dummy
2346 | AggregateFunc::JsonbAgg { .. }
2347 | AggregateFunc::JsonbObjectAgg { .. }
2348 | AggregateFunc::MapAgg { .. }
2349 | AggregateFunc::ArrayConcat { .. }
2350 | AggregateFunc::ListConcat { .. }
2351 | AggregateFunc::StringAgg { .. }
2352 | AggregateFunc::RowNumber { .. }
2353 | AggregateFunc::Rank { .. }
2354 | AggregateFunc::DenseRank { .. }
2355 | AggregateFunc::LagLead { .. }
2356 | AggregateFunc::FirstValue { .. }
2357 | AggregateFunc::LastValue { .. }
2358 | AggregateFunc::WindowAggregate { .. }
2359 | AggregateFunc::FusedValueWindowFunc { .. }
2360 | AggregateFunc::FusedWindowAggregate { .. } => None,
2361 }
2362 }
2363}
2364
2365mod window_agg_helpers {
2366 use crate::render::reduce::*;
2367
2368 pub enum OneByOneAggrImpls {
2373 Accumulable(AccumulableOneByOneAggr),
2374 Hierarchical(HierarchicalOneByOneAggr),
2375 Basic(mz_expr::NaiveOneByOneAggr),
2376 }
2377
2378 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2379 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2380 match reduction_type(agg) {
2381 ReductionType::Basic => {
2382 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2383 }
2384 ReductionType::Accumulable => {
2385 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2386 }
2387 ReductionType::Hierarchical => {
2388 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2389 }
2390 }
2391 }
2392
2393 fn give(&mut self, d: &Datum) {
2394 match self {
2395 OneByOneAggrImpls::Basic(i) => i.give(d),
2396 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2397 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2398 }
2399 }
2400
2401 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2402 match self {
2404 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2405 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2406 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2407 }
2408 }
2409 }
2410
2411 pub struct AccumulableOneByOneAggr {
2412 aggr_func: AggregateFunc,
2413 accum: Accum,
2414 total: Diff,
2415 }
2416
2417 impl AccumulableOneByOneAggr {
2418 fn new(aggr_func: &AggregateFunc) -> Self {
2419 AccumulableOneByOneAggr {
2420 aggr_func: aggr_func.clone(),
2421 accum: accumulable_zero(aggr_func),
2422 total: Diff::ZERO,
2423 }
2424 }
2425
2426 fn give(&mut self, d: &Datum) {
2427 self.accum
2428 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2429 self.total += Diff::ONE;
2430 }
2431
2432 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2433 temp_storage.make_datum(|packer| {
2434 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2435 })
2436 }
2437 }
2438
2439 pub struct HierarchicalOneByOneAggr {
2440 aggr_func: AggregateFunc,
2441 monoid: ReductionMonoid,
2444 }
2445
2446 impl HierarchicalOneByOneAggr {
2447 fn new(aggr_func: &AggregateFunc) -> Self {
2448 let mut row_buf = Row::default();
2449 row_buf.packer().push(Datum::Null);
2450 HierarchicalOneByOneAggr {
2451 aggr_func: aggr_func.clone(),
2452 monoid: get_monoid(row_buf, aggr_func)
2453 .expect("aggr_func should be a hierarchical aggregation function"),
2454 }
2455 }
2456
2457 fn give(&mut self, d: &Datum) {
2458 let mut row_buf = Row::default();
2459 row_buf.packer().push(d);
2460 let m = get_monoid(row_buf, &self.aggr_func)
2461 .expect("aggr_func should be a hierarchical aggregation function");
2462 self.monoid.plus_equals(&m);
2463 }
2464
2465 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2466 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2467 }
2468 }
2469}