1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use columnation::{Columnation, CopyRegion};
18use dec::OrderedDecimal;
19use differential_dataflow::Diff as _;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::BatchContainer;
26use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
27use differential_dataflow::trace::{Builder, Trace};
28use differential_dataflow::{Data, VecCollection};
29use itertools::Itertools;
30use mz_compute_types::plan::reduce::{
31 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
38use mz_repr::fixed_length::ToDatumIter;
39use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
40use mz_storage_types::errors::DataflowError;
41use mz_timely_util::operator::CollectionExt;
42use serde::{Deserialize, Serialize};
43use timely::Container;
44use timely::container::{CapacityContainerBuilder, PushInto};
45use tracing::warn;
46
47use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
48use crate::extensions::reduce::{ClearContainer, MzReduce};
49use crate::render::context::{CollectionBundle, Context};
50use crate::render::errors::MaybeValidatingRow;
51use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
52use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
53use crate::row_spine::{
54 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
55};
56use crate::typedefs::{
57 ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58 RowRowSpine, RowSpine, RowValSpine,
59};
60
61impl<'scope, T: RenderTimestamp> Context<'scope, T> {
62 pub fn render_reduce(
65 &self,
66 input_key: Option<Vec<MirScalarExpr>>,
67 input: CollectionBundle<'scope, T>,
68 key_val_plan: KeyValPlan,
69 reduce_plan: ReducePlan,
70 mfp_after: Option<MapFilterProject>,
71 ) -> CollectionBundle<'scope, T> {
72 let mfp_after = mfp_after.map(|m| {
74 m.into_plan()
75 .expect("MFP planning must succeed")
76 .into_nontemporal()
77 .expect("Fused Reduce MFPs do not have temporal predicates")
78 });
79
80 input.scope().region_named("Reduce", |inner| {
81 let KeyValPlan {
82 mut key_plan,
83 mut val_plan,
84 } = key_val_plan;
85 let key_arity = key_plan.projection.len();
86 let mut datums = DatumVec::new();
87
88 let mut demand = Vec::new();
90 demand.extend(key_plan.demand());
91 demand.extend(val_plan.demand());
92 demand.sort();
93 demand.dedup();
94
95 let mut demand_map = BTreeMap::new();
97 for column in demand.iter() {
98 demand_map.insert(*column, demand_map.len());
99 }
100 let demand_map_len = demand_map.len();
101 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
102 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
103 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
104 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
105
106 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
107 input_key.map(|k| (k, None)),
108 max_demand,
109 move |row_datums, time, diff| {
110 let mut row_builder = SharedRow::get();
111 let temp_storage = RowArena::new();
112
113 let mut row_iter = row_datums.drain(..);
114 let mut datums_local = datums.borrow();
115 for skip in skips.iter() {
117 datums_local.push(row_iter.nth(*skip).unwrap());
118 }
119
120 let key =
122 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
123 let key = match key {
124 Err(e) => {
125 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
126 }
127 Ok(Some(key)) => key.clone(),
128 Ok(None) => panic!("Row expected as no predicate was used"),
129 };
130
131 datums_local.truncate(skips.len());
134 let val =
135 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
136 let val = match val {
137 Err(e) => {
138 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
139 }
140 Ok(Some(val)) => val.clone(),
141 Ok(None) => panic!("Row expected as no predicate was used"),
142 };
143
144 Some((Ok((key, val)), time.clone(), diff.clone()))
145 },
146 );
147
148 type CB<T> = ConsolidatingContainerBuilder<T>;
150 let (ok, mut err) = key_val_input
151 .as_collection()
152 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
153
154 err = err.concat(err_input);
155
156 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
158 .leave_region(self.scope)
159 })
160 }
161
162 fn render_reduce_plan<'s>(
168 &self,
169 plan: ReducePlan,
170 collection: VecCollection<'s, T, (Row, Row), Diff>,
171 err_input: VecCollection<'s, T, DataflowError, Diff>,
172 key_arity: usize,
173 mfp_after: Option<SafeMfpPlan>,
174 ) -> CollectionBundle<'s, T> {
175 let mut errors = Default::default();
176 let arrangement =
177 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
178 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
179 CollectionBundle::from_columns(
180 0..key_arity,
181 ArrangementFlavor::Local(
182 arrangement,
183 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
184 ),
185 )
186 }
187
188 fn render_reduce_plan_inner<'s>(
189 &self,
190 plan: ReducePlan,
191 collection: VecCollection<'s, T, (Row, Row), Diff>,
192 errors: &mut Vec<VecCollection<'s, T, DataflowError, Diff>>,
193 key_arity: usize,
194 mfp_after: Option<SafeMfpPlan>,
195 ) -> Arranged<'s, RowRowAgent<T, Diff>> {
196 let arrangement = match plan {
199 ReducePlan::Distinct => {
202 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
203 errors.push(errs);
204 arranged_output
205 }
206 ReducePlan::Accumulable(expr) => {
207 let (arranged_output, errs) =
208 self.build_accumulable(collection, expr, key_arity, mfp_after);
209 errors.push(errs);
210 arranged_output
211 }
212 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
213 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
214 errors.push(errs);
215 output
216 }
217 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
218 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
219 errors.push(errs);
220 output
221 }
222 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
223 expr,
224 fused_unnest_list,
225 })) => {
226 let validating = !fused_unnest_list;
230 let (output, errs) = self.build_basic_aggregate(
231 collection,
232 0,
233 &expr,
234 validating,
235 key_arity,
236 mfp_after,
237 fused_unnest_list,
238 );
239 if validating {
240 errors.push(errs.expect("validation should have occurred as it was requested"));
241 }
242 output
243 }
244 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
245 let (output, errs) =
246 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
247 errors.push(errs);
248 output
249 }
250 };
251 arrangement
252 }
253
254 fn build_distinct<'s>(
256 &self,
257 collection: VecCollection<'s, T, (Row, Row), Diff>,
258 mfp_after: Option<SafeMfpPlan>,
259 ) -> (
260 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
261 VecCollection<'s, T, DataflowError, Diff>,
262 ) {
263 let error_logger = self.error_logger();
264
265 let mut datums1 = DatumVec::new();
267 let mut datums2 = DatumVec::new();
268 let mfp_after1 = mfp_after.clone();
269 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
270
271 let arranged = collection
272 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
273 "Arranged DistinctBy",
274 );
275 let output = arranged
276 .clone()
277 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
278 "DistinctBy",
279 move |key, _input, output| {
280 let temp_storage = RowArena::new();
281 let mut datums_local = datums1.borrow();
282 datums_local.extend(key.to_datum_iter());
283
284 if mfp_after1
288 .as_ref()
289 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
290 .unwrap_or(Ok(true))
291 == Ok(true)
292 {
293 output.push((Row::default(), Diff::ONE));
297 }
298 },
299 );
300 let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
301 "DistinctByErrorCheck",
302 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
303 for (_, count) in input.iter() {
304 if count.is_positive() {
305 continue;
306 }
307 let message = "Non-positive multiplicity in DistinctBy";
308 error_logger.log(message, &format!("row={key:?}, count={count}"));
309 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
310 return;
311 }
312 let Some(mfp) = &mfp_after2 else { return };
314 let temp_storage = RowArena::new();
315 let datum_iter = key.to_datum_iter();
316 let mut datums_local = datums2.borrow();
317 datums_local.extend(datum_iter);
318
319 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
320 output.push((e.into(), Diff::ONE));
321 }
322 },
323 );
324 (output, errors.as_collection(|_k, v| v.clone()))
325 }
326
327 fn build_basic_aggregates<'s>(
335 &self,
336 input: VecCollection<'s, T, (Row, Row), Diff>,
337 aggrs: Vec<AggregateExpr>,
338 key_arity: usize,
339 mfp_after: Option<SafeMfpPlan>,
340 ) -> (
341 RowRowArrangement<'s, T>,
342 VecCollection<'s, T, DataflowError, Diff>,
343 ) {
344 if aggrs.len() <= 1 {
347 self.error_logger().soft_panic_or_log(
348 "Too few aggregations when building basic aggregates",
349 &format!("len={}", aggrs.len()),
350 )
351 }
352 let mut err_output = None;
353 let mut to_collect = Vec::new();
354 for (index, aggr) in aggrs.into_iter().enumerate() {
355 let (result, errs) = self.build_basic_aggregate(
356 input.clone(),
357 index,
358 &aggr,
359 err_output.is_none(),
360 key_arity,
361 None,
362 false,
363 );
364 if errs.is_some() {
365 err_output = errs
366 }
367 to_collect
368 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
369 }
370
371 let mut datums1 = DatumVec::new();
373 let mut datums2 = DatumVec::new();
374 let mfp_after1 = mfp_after.clone();
375 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
376
377 let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
378 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
379 "Arranged ReduceFuseBasic input",
380 );
381
382 let output = arranged
383 .clone()
384 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
385 move |key, input, output| {
386 let temp_storage = RowArena::new();
387 let datum_iter = key.to_datum_iter();
388 let mut datums_local = datums1.borrow();
389 datums_local.extend(datum_iter);
390 let key_len = datums_local.len();
391
392 for ((_, row), _) in input.iter() {
393 datums_local.push(row.unpack_first());
394 }
395
396 if let Some(row) =
397 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
398 {
399 output.push((row, Diff::ONE));
400 }
401 }
402 });
403 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
408 if let Some(mfp) = mfp_after2 {
409 let mfp_errs = arranged
410 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
411 "ReduceFuseBasic Error Check",
412 move |key, input, output| {
413 let temp_storage = RowArena::new();
416 let datum_iter = key.to_datum_iter();
417 let mut datums_local = datums2.borrow();
418 datums_local.extend(datum_iter);
419
420 for ((_, row), _) in input.iter() {
421 datums_local.push(row.unpack_first());
422 }
423
424 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
425 output.push((e.into(), Diff::ONE));
426 }
427 },
428 )
429 .as_collection(|_, v| v.clone());
430 (output, validation_errs.concat(mfp_errs))
431 } else {
432 (output, validation_errs)
433 }
434 }
435
436 fn build_basic_aggregate<'s>(
440 &self,
441 input: VecCollection<'s, T, (Row, Row), Diff>,
442 index: usize,
443 aggr: &AggregateExpr,
444 validating: bool,
445 key_arity: usize,
446 mfp_after: Option<SafeMfpPlan>,
447 fused_unnest_list: bool,
448 ) -> (
449 RowRowArrangement<'s, T>,
450 Option<VecCollection<'s, T, DataflowError, Diff>>,
451 ) {
452 let AggregateExpr {
453 func,
454 expr: _,
455 distinct,
456 } = aggr.clone();
457
458 let mut partial = input.map(move |(key, row)| {
460 let mut row_builder = SharedRow::get();
461 let value = row.iter().nth(index).unwrap();
462 row_builder.packer().push(value);
463 (key, row_builder.clone())
464 });
465
466 let mut err_output = None;
467
468 if distinct {
470 let pairer = Pairer::new(key_arity);
472 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
473 if validating {
474 let (oks, errs) = self
475 .build_reduce_inaccumulable_distinct::<
476 RowValBuilder<Result<(), String>, _, _>,
477 RowValSpine<Result<(), String>, _, _>,
478 >(keyed, None)
479 .as_collection(|k, v| {
480 (
481 k.to_row(),
482 v.as_ref()
483 .map(|&()| ())
484 .map_err(|m| m.as_str().into()),
485 )
486 })
487 .map_fallible::<
488 CapacityContainerBuilder<_>,
489 CapacityContainerBuilder<_>,
490 _,
491 _,
492 _,
493 >(
494 "Demux Errors",
495 move |(key_val, result)| match result {
496 Ok(()) => Ok(pairer.split(&key_val)),
497 Err(m) => {
498 Err(EvalError::Internal(m).into())
499 }
500 },
501 );
502 err_output = Some(errs);
503 partial = oks;
504 } else {
505 partial = self
506 .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
507 keyed,
508 Some(" [val: empty]"),
509 )
510 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
511 }
512 }
513
514 let mut datums1 = DatumVec::new();
516 let mut datums2 = DatumVec::new();
517 let mut datums_key_1 = DatumVec::new();
518 let mut datums_key_2 = DatumVec::new();
519 let mfp_after1 = mfp_after.clone();
520 let func2 = func.clone();
521
522 let name = if !fused_unnest_list {
523 "ReduceInaccumulable"
524 } else {
525 "FusedReduceUnnestList"
526 };
527 let arranged = partial
528 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
529 "Arranged {name}"
530 ));
531 let oks = if !fused_unnest_list {
532 arranged
533 .clone()
534 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
535 move |key, source, target| {
536 let iter = source.iter().flat_map(|(v, w)| {
540 let count = usize::try_from(w.into_inner()).unwrap_or(0);
543 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
544 });
545
546 let temp_storage = RowArena::new();
547 let datum_iter = key.to_datum_iter();
548 let mut datums_local = datums1.borrow();
549 datums_local.extend(datum_iter);
550 let key_len = datums_local.len();
551 datums_local.push(
552 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
555 iter,
556 &temp_storage,
557 ),
558 );
559
560 if let Some(row) = evaluate_mfp_after(
561 &mfp_after1,
562 &mut datums_local,
563 &temp_storage,
564 key_len,
565 ) {
566 target.push((row, Diff::ONE));
567 }
568 }
569 })
570 } else {
571 arranged
572 .clone()
573 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
574 move |key, source, target| {
575 let iter = source.iter().flat_map(|(v, w)| {
577 let count = usize::try_from(w.into_inner()).unwrap_or(0);
578 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
579 });
580
581 let temp_storage = RowArena::new();
583 let mut datums_local = datums_key_1.borrow();
584 datums_local.extend(key.to_datum_iter());
585 let key_len = datums_local.len();
586 for datum in func
587 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
588 iter,
589 &temp_storage,
590 )
591 {
592 datums_local.truncate(key_len);
593 datums_local.push(datum);
594 if let Some(row) = evaluate_mfp_after(
595 &mfp_after1,
596 &mut datums_local,
597 &temp_storage,
598 key_len,
599 ) {
600 target.push((row, Diff::ONE));
601 }
602 }
603 }
604 })
605 };
606
607 let must_validate = validating && err_output.is_none();
611 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
612 if must_validate || mfp_after2.is_some() {
613 let error_logger = self.error_logger();
614
615 let errs = if !fused_unnest_list {
616 arranged
617 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
618 &format!("{name} Error Check"),
619 move |key, source, target| {
620 if must_validate {
624 for (value, count) in source.iter() {
625 if count.is_positive() {
626 continue;
627 }
628 let value = value.to_row();
629 let message =
630 "Non-positive accumulation in ReduceInaccumulable";
631 error_logger
632 .log(message, &format!("value={value:?}, count={count}"));
633 let err = EvalError::Internal(message.into());
634 target.push((err.into(), Diff::ONE));
635 return;
636 }
637 }
638
639 let Some(mfp) = &mfp_after2 else { return };
641 let iter = source.iter().flat_map(|&(mut v, ref w)| {
642 let count = usize::try_from(w.into_inner()).unwrap_or(0);
643 std::iter::repeat(v.next().unwrap()).take(count)
646 });
647
648 let temp_storage = RowArena::new();
649 let datum_iter = key.to_datum_iter();
650 let mut datums_local = datums2.borrow();
651 datums_local.extend(datum_iter);
652 datums_local.push(
653 func2.eval_with_fast_window_agg::<
654 _,
655 window_agg_helpers::OneByOneAggrImpls,
656 >(
657 iter, &temp_storage
658 ),
659 );
660 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
661 target.push((e.into(), Diff::ONE));
662 }
663 },
664 )
665 .as_collection(|_, v| v.clone())
666 } else {
667 assert!(!must_validate);
669 let Some(mfp) = mfp_after2 else {
672 unreachable!()
673 };
674 arranged
675 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
676 &format!("{name} Error Check"),
677 move |key, source, target| {
678 let iter = source.iter().flat_map(|&(mut v, ref w)| {
679 let count = usize::try_from(w.into_inner()).unwrap_or(0);
680 std::iter::repeat(v.next().unwrap()).take(count)
683 });
684
685 let temp_storage = RowArena::new();
686 let mut datums_local = datums_key_2.borrow();
687 datums_local.extend(key.to_datum_iter());
688 let key_len = datums_local.len();
689 for datum in func2
690 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
691 iter,
692 &temp_storage,
693 )
694 {
695 datums_local.truncate(key_len);
696 datums_local.push(datum);
697 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
700 {
701 target.push((e.into(), Diff::ONE));
702 }
703 }
704 },
705 )
706 .as_collection(|_, v| v.clone())
707 };
708
709 if let Some(e) = err_output {
710 err_output = Some(e.concat(errs));
711 } else {
712 err_output = Some(errs);
713 }
714 }
715 (oks, err_output)
716 }
717
718 fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
719 &self,
720 input: VecCollection<'s, T, Row, Diff>,
721 name_tag: Option<&str>,
722 ) -> Arranged<'s, TraceAgent<Tr>>
723 where
724 Tr: for<'a> Trace<
725 Key<'a> = DatumSeq<'a>,
726 KeyContainer: BatchContainer<Owned = Row>,
727 Time = T,
728 Diff = Diff,
729 ValOwn: Data + MaybeValidatingRow<(), String>,
730 > + 'static,
731 Bu: Builder<
732 Time = T,
733 Input: Container
734 + InternalMerge
735 + ClearContainer
736 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
737 Output = Tr::Batch,
738 >,
739 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
740 {
741 let error_logger = self.error_logger();
742
743 let output_name = format!(
744 "ReduceInaccumulable Distinct{}",
745 name_tag.unwrap_or_default()
746 );
747
748 let input: KeyCollection<_, _, _> = input.into();
749 input
750 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
751 "Arranged ReduceInaccumulable Distinct [val: empty]",
752 )
753 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
754 if let Some(err) = Tr::ValOwn::into_error() {
755 for (value, count) in source.iter() {
756 if count.is_positive() {
757 continue;
758 }
759
760 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
761 error_logger.log(message, &format!("value={value:?}, count={count}"));
762 t.push((err(message.to_string()), Diff::ONE));
763 return;
764 }
765 }
766 t.push((Tr::ValOwn::ok(()), Diff::ONE))
767 })
768 }
769
770 fn build_bucketed<'s>(
788 &self,
789 input: VecCollection<'s, T, (Row, Row), Diff>,
790 BucketedPlan {
791 aggr_funcs,
792 buckets,
793 }: BucketedPlan,
794 key_arity: usize,
795 mfp_after: Option<SafeMfpPlan>,
796 ) -> (
797 RowRowArrangement<'s, T>,
798 VecCollection<'s, T, DataflowError, Diff>,
799 ) {
800 let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
801 let outer_scope = input.scope();
802 let arranged_output = outer_scope
803 .clone()
804 .region_named("ReduceHierarchical", |inner| {
805 let input = input.enter(inner);
806
807 let first_mod = buckets.get(0).copied().unwrap_or(1);
809 let aggregations = aggr_funcs.len();
810
811 let mut stage = input.map(move |(key, row)| {
813 let mut row_builder = SharedRow::get();
814 let mut row_packer = row_builder.packer();
815 row_packer.extend(row.iter().take(aggregations));
816 let values = row_builder.clone();
817
818 let hash = values.hashed() % first_mod;
820 let hash_key =
821 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
822 (hash_key, values)
823 });
824
825 for (index, b) in buckets.into_iter().enumerate() {
827 let input = if index == 0 {
829 stage
830 } else {
831 stage.map(move |(hash_key, values)| {
832 let mut hash_key_iter = hash_key.iter();
833 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
834 let hash_key = SharedRow::pack(
836 std::iter::once(Datum::from(hash))
837 .chain(hash_key_iter.take(key_arity)),
838 );
839 (hash_key, values)
840 })
841 };
842
843 let validating = err_output.is_none();
847
848 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
849 if let Some(errs) = errs {
850 err_output = Some(errs.leave_region(outer_scope));
851 }
852 stage = oks
853 }
854
855 let partial = stage.map(move |(hash_key, values)| {
857 let mut hash_key_iter = hash_key.iter();
858 let _hash = hash_key_iter.next();
859 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
860 });
861
862 let mut datums1 = DatumVec::new();
864 let mut datums2 = DatumVec::new();
865 let mfp_after1 = mfp_after.clone();
866 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
867 let aggr_funcs2 = aggr_funcs.clone();
868
869 let error_logger = self.error_logger();
872 let arranged = partial
875 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
876 "Arrange ReduceMinsMaxes",
877 );
878 let must_validate = err_output.is_none();
882 if must_validate || mfp_after2.is_some() {
883 let errs = arranged
884 .clone()
885 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
886 "ReduceMinsMaxes Error Check",
887 move |key, source, target| {
888 if must_validate {
892 for (val, count) in source.iter() {
893 if count.is_positive() {
894 continue;
895 }
896 let val = val.to_row();
897 let message =
898 "Non-positive accumulation in ReduceMinsMaxes";
899 error_logger
900 .log(message, &format!("val={val:?}, count={count}"));
901 target.push((
902 EvalError::Internal(message.into()).into(),
903 Diff::ONE,
904 ));
905 return;
906 }
907 }
908
909 let Some(mfp) = &mfp_after2 else { return };
911 let temp_storage = RowArena::new();
912 let datum_iter = key.to_datum_iter();
913 let mut datums_local = datums2.borrow();
914 datums_local.extend(datum_iter);
915
916 let mut source_iters = source
917 .iter()
918 .map(|(values, _cnt)| *values)
919 .collect::<Vec<_>>();
920 for func in aggr_funcs2.iter() {
921 let column_iter = (0..source_iters.len())
922 .map(|i| source_iters[i].next().unwrap());
923 datums_local.push(func.eval(column_iter, &temp_storage));
924 }
925 if let Result::Err(e) =
926 mfp.evaluate_inner(&mut datums_local, &temp_storage)
927 {
928 target.push((e.into(), Diff::ONE));
929 }
930 },
931 )
932 .as_collection(|_, v| v.clone())
933 .leave_region(outer_scope);
934 if let Some(e) = err_output.take() {
935 err_output = Some(e.concat(errs));
936 } else {
937 err_output = Some(errs);
938 }
939 }
940 arranged
941 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
942 "ReduceMinsMaxes",
943 move |key, source, target| {
944 let temp_storage = RowArena::new();
945 let datum_iter = key.to_datum_iter();
946 let mut datums_local = datums1.borrow();
947 datums_local.extend(datum_iter);
948 let key_len = datums_local.len();
949
950 let mut source_iters = source
951 .iter()
952 .map(|(values, _cnt)| *values)
953 .collect::<Vec<_>>();
954 for func in aggr_funcs.iter() {
955 let column_iter = (0..source_iters.len())
956 .map(|i| source_iters[i].next().unwrap());
957 datums_local.push(func.eval(column_iter, &temp_storage));
958 }
959
960 if let Some(row) = evaluate_mfp_after(
961 &mfp_after1,
962 &mut datums_local,
963 &temp_storage,
964 key_len,
965 ) {
966 target.push((row, Diff::ONE));
967 }
968 },
969 )
970 .leave_region(outer_scope)
971 });
972 (
973 arranged_output,
974 err_output.expect("expected to validate in one level of the hierarchy"),
975 )
976 }
977
978 fn build_bucketed_stage<'s>(
985 &self,
986 aggr_funcs: &Vec<AggregateFunc>,
987 input: VecCollection<'s, T, (Row, Row), Diff>,
988 validating: bool,
989 ) -> (
990 VecCollection<'s, T, (Row, Row), Diff>,
991 Option<VecCollection<'s, T, DataflowError, Diff>>,
992 ) {
993 let (input, negated_output, errs) = if validating {
994 let (input, reduced) = self
995 .build_bucketed_negated_output::<
996 RowValBuilder<_, _, _>,
997 RowValSpine<Result<Row, Row>, _, _>,
998 >(
999 input.clone(),
1000 aggr_funcs.clone(),
1001 );
1002 let (oks, errs) = reduced
1003 .as_collection(|k, v| (k.to_row(), v.clone()))
1004 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1005 "Checked Invalid Accumulations",
1006 |(hash_key, result)| match result {
1007 Err(hash_key) => {
1008 let mut hash_key_iter = hash_key.iter();
1009 let _hash = hash_key_iter.next();
1010 let key = SharedRow::pack(hash_key_iter);
1011 let message = format!(
1012 "Invalid data in source, saw non-positive accumulation \
1013 for key {key:?} in hierarchical mins-maxes aggregate"
1014 );
1015 Err(EvalError::Internal(message.into()).into())
1016 }
1017 Ok(values) => Ok((hash_key, values)),
1018 },
1019 );
1020 (input, oks, Some(errs))
1021 } else {
1022 let (input, reduced) = self
1023 .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1024 input,
1025 aggr_funcs.clone(),
1026 );
1027 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1030 (input, oks, None)
1031 };
1032
1033 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1034 let oks = negated_output.concat(input);
1035 (oks, errs)
1036 }
1037
1038 fn build_bucketed_negated_output<'s, Bu, Tr>(
1042 &self,
1043 input: VecCollection<'s, T, (Row, Row), Diff>,
1044 aggrs: Vec<AggregateFunc>,
1045 ) -> (
1046 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1047 Arranged<'s, TraceAgent<Tr>>,
1048 )
1049 where
1050 Tr: for<'a> Trace<
1051 Key<'a> = DatumSeq<'a>,
1052 KeyContainer: BatchContainer<Owned = Row>,
1053 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1054 Time = T,
1055 Diff = Diff,
1056 > + 'static,
1057 Bu: Builder<
1058 Time = T,
1059 Input: Container
1060 + InternalMerge
1061 + ClearContainer
1062 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1063 Output = Tr::Batch,
1064 >,
1065 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1066 {
1067 let error_logger = self.error_logger();
1068 let arranged_input = input
1071 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1072 "Arranged MinsMaxesHierarchical input",
1073 );
1074
1075 let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1076 "Reduced Fallibly MinsMaxesHierarchical",
1077 move |key, source, target| {
1078 if let Some(err) = Tr::ValOwn::into_error() {
1079 for (value, count) in source.iter() {
1081 if count.is_positive() {
1082 continue;
1083 }
1084 error_logger.log(
1085 "Non-positive accumulation in MinsMaxesHierarchical",
1086 &format!("key={key:?}, value={value:?}, count={count}"),
1087 );
1088 target.push((
1091 err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1092 Diff::ONE,
1093 ));
1094 return;
1095 }
1096 }
1097
1098 let mut row_builder = SharedRow::get();
1099 let mut row_packer = row_builder.packer();
1100
1101 let mut source_iters = source
1102 .iter()
1103 .map(|(values, _cnt)| *values)
1104 .collect::<Vec<_>>();
1105 for func in aggrs.iter() {
1106 let column_iter =
1107 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1108 row_packer.push(func.eval(column_iter, &RowArena::new()));
1109 }
1110 target.reserve(source.len().saturating_add(1));
1116 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1117 target.extend(source.iter().map(|(values, cnt)| {
1118 let mut cnt = *cnt;
1119 cnt.negate();
1120 (Tr::ValOwn::ok(values.to_row()), cnt)
1121 }));
1122 },
1123 );
1124 (arranged_input, reduced)
1125 }
1126
1127 fn build_monotonic<'s>(
1130 &self,
1131 collection: VecCollection<'s, T, (Row, Row), Diff>,
1132 MonotonicPlan {
1133 aggr_funcs,
1134 must_consolidate,
1135 }: MonotonicPlan,
1136 mfp_after: Option<SafeMfpPlan>,
1137 ) -> (
1138 RowRowArrangement<'s, T>,
1139 VecCollection<'s, T, DataflowError, Diff>,
1140 ) {
1141 let aggregations = aggr_funcs.len();
1142 let collection = collection
1144 .map(move |(key, row)| {
1145 let mut row_builder = SharedRow::get();
1146 let mut values = Vec::with_capacity(aggregations);
1147 values.extend(
1148 row.iter()
1149 .take(aggregations)
1150 .map(|v| row_builder.pack_using(std::iter::once(v))),
1151 );
1152
1153 (key, values)
1154 })
1155 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1156 must_consolidate,
1157 "Consolidated ReduceMonotonic input",
1158 );
1159
1160 let error_logger = self.error_logger();
1162 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1163 error_logger.log(
1164 "Non-monotonic input to ReduceMonotonic",
1165 &format!("data={data:?}, diff={diff}"),
1166 );
1167 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1168 (EvalError::Internal(m).into(), Diff::ONE)
1169 });
1170 let partial = partial.explode_one(move |(key, values)| {
1174 let mut output = Vec::new();
1175 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1176 output.push(monoids::get_monoid(row, func).expect(
1177 "hierarchical aggregations are expected to have monoid implementations",
1178 ));
1179 }
1180 (key, output)
1181 });
1182
1183 let mut datums1 = DatumVec::new();
1185 let mut datums2 = DatumVec::new();
1186 let mfp_after1 = mfp_after.clone();
1187 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1188
1189 let partial: KeyCollection<_, _, _> = partial.into();
1190 let arranged = partial
1191 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1192 "ArrangeMonotonic [val: empty]",
1193 );
1194 let output = arranged
1195 .clone()
1196 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1197 move |key, input, output| {
1198 let temp_storage = RowArena::new();
1199 let datum_iter = key.to_datum_iter();
1200 let mut datums_local = datums1.borrow();
1201 datums_local.extend(datum_iter);
1202 let key_len = datums_local.len();
1203 let accum = &input[0].1;
1204 for monoid in accum.iter() {
1205 datums_local.extend(monoid.finalize().iter());
1206 }
1207
1208 if let Some(row) =
1209 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1210 {
1211 output.push((row, Diff::ONE));
1212 }
1213 }
1214 });
1215
1216 if let Some(mfp) = mfp_after2 {
1221 let mfp_errs = arranged
1222 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1223 "ReduceMonotonic Error Check",
1224 move |key, input, output| {
1225 let temp_storage = RowArena::new();
1226 let datum_iter = key.to_datum_iter();
1227 let mut datums_local = datums2.borrow();
1228 datums_local.extend(datum_iter);
1229 let accum = &input[0].1;
1230 for monoid in accum.iter() {
1231 datums_local.extend(monoid.finalize().iter());
1232 }
1233 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1234 {
1235 output.push((e.into(), Diff::ONE));
1236 }
1237 },
1238 )
1239 .as_collection(|_k, v| v.clone());
1240 (output, validation_errs.concat(mfp_errs))
1241 } else {
1242 (output, validation_errs)
1243 }
1244 }
1245
1246 fn build_accumulable<'s>(
1253 &self,
1254 collection: VecCollection<'s, T, (Row, Row), Diff>,
1255 AccumulablePlan {
1256 full_aggrs,
1257 simple_aggrs,
1258 distinct_aggrs,
1259 }: AccumulablePlan,
1260 key_arity: usize,
1261 mfp_after: Option<SafeMfpPlan>,
1262 ) -> (
1263 RowRowArrangement<'s, T>,
1264 VecCollection<'s, T, DataflowError, Diff>,
1265 ) {
1266 let collection_scope = collection.scope();
1267
1268 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1270 self.error_logger().soft_panic_or_log(
1271 "Incorrect numbers of aggregates in accummulable reduction rendering",
1272 &format!(
1273 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1274 full_aggrs.len(),
1275 simple_aggrs.len(),
1276 distinct_aggrs.len(),
1277 ),
1278 );
1279 }
1280
1281 let zero_diffs: (Vec<_>, Diff) = (
1293 full_aggrs
1294 .iter()
1295 .map(|f| accumulable_zero(&f.func))
1296 .collect(),
1297 Diff::ZERO,
1298 );
1299
1300 let mut to_aggregate = Vec::new();
1301 if simple_aggrs.len() > 0 {
1302 let collection = collection.clone();
1304 let easy_cases = collection.explode_one({
1305 let zero_diffs = zero_diffs.clone();
1306 move |(key, row)| {
1307 let mut diffs = zero_diffs.clone();
1308 let mut row_iter = row.iter().enumerate();
1314 for (datum_index, aggr) in simple_aggrs.iter() {
1315 let mut datum = row_iter.next().unwrap();
1316 while datum_index != &datum.0 {
1317 datum = row_iter.next().unwrap();
1318 }
1319 let datum = datum.1;
1320 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1321 diffs.1 = Diff::ONE;
1322 }
1323 ((key, ()), diffs)
1324 }
1325 });
1326 to_aggregate.push(easy_cases);
1327 }
1328
1329 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1331 let pairer = Pairer::new(key_arity);
1332 let collection = collection
1333 .clone()
1334 .map(move |(key, row)| {
1335 let value = row.iter().nth(datum_index).unwrap();
1336 (pairer.merge(&key, std::iter::once(value)), ())
1337 })
1338 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1339 "Arranged Accumulable Distinct [val: empty]",
1340 )
1341 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1342 "Reduced Accumulable Distinct [val: empty]",
1343 move |_k, _s, t| t.push(((), Diff::ONE)),
1344 )
1345 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1346 .explode_one({
1347 let zero_diffs = zero_diffs.clone();
1348 move |(key, row)| {
1349 let datum = row.iter().next().unwrap();
1350 let mut diffs = zero_diffs.clone();
1351 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1352 diffs.1 = Diff::ONE;
1353 ((key, ()), diffs)
1354 }
1355 });
1356 to_aggregate.push(collection);
1357 }
1358
1359 let collection = if to_aggregate.len() == 1 {
1361 to_aggregate.remove(0)
1362 } else {
1363 differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1364 };
1365
1366 let mut datums1 = DatumVec::new();
1368 let mut datums2 = DatumVec::new();
1369 let mfp_after1 = mfp_after.clone();
1370 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1371 let full_aggrs2 = full_aggrs.clone();
1372
1373 let error_logger = self.error_logger();
1374 let err_full_aggrs = full_aggrs.clone();
1375 let arranged = collection
1376 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, (Vec<Accum>, Diff)>>(
1377 "ArrangeAccumulable [val: empty]",
1378 );
1379 let arranged_output = arranged
1380 .clone()
1381 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1382 move |key, input, output| {
1383 let (ref accums, total) = input[0].1;
1384
1385 let temp_storage = RowArena::new();
1386 let datum_iter = key.to_datum_iter();
1387 let mut datums_local = datums1.borrow();
1388 datums_local.extend(datum_iter);
1389 let key_len = datums_local.len();
1390 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1391 datums_local.push(finalize_accum(&aggr.func, accum, total));
1392 }
1393
1394 if let Some(row) =
1395 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1396 {
1397 output.push((row, Diff::ONE));
1398 }
1399 }
1400 });
1401 let arranged_errs = arranged
1402 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1403 "AccumulableErrorCheck",
1404 move |key, input, output| {
1405 let (ref accums, total) = input[0].1;
1406 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1407 if total == Diff::ZERO && !accum.is_zero() {
1410 error_logger.log(
1411 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1412 &format!("aggr={aggr:?}, accum={accum:?}"),
1413 );
1414 let key = key.to_row();
1415 let message = format!(
1416 "Invalid data in source, saw net-zero records for key {key} \
1417 with non-zero accumulation in accumulable aggregate"
1418 );
1419 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1420 }
1421 match (&aggr.func, &accum) {
1422 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1423 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1424 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1425 if accum.is_negative() {
1426 error_logger.log(
1427 "Invalid negative unsigned aggregation in ReduceAccumulable",
1428 &format!("aggr={aggr:?}, accum={accum:?}"),
1429 );
1430 let key = key.to_row();
1431 let message = format!(
1432 "Invalid data in source, saw negative accumulation with \
1433 unsigned type for key {key}"
1434 );
1435 let err = EvalError::Internal(message.into());
1436 output.push((err.into(), Diff::ONE));
1437 }
1438 }
1439 _ => (), }
1441 }
1442
1443 let Some(mfp) = &mfp_after2 else { return };
1445 let temp_storage = RowArena::new();
1446 let datum_iter = key.to_datum_iter();
1447 let mut datums_local = datums2.borrow();
1448 datums_local.extend(datum_iter);
1449 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1450 datums_local.push(finalize_accum(&aggr.func, accum, total));
1451 }
1452
1453 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1454 output.push((e.into(), Diff::ONE));
1455 }
1456 },
1457 );
1458 (
1459 arranged_output,
1460 arranged_errs.as_collection(|_key, error| error.clone()),
1461 )
1462 }
1463}
1464
1465fn evaluate_mfp_after<'a, 'b>(
1469 mfp_after: &'a Option<SafeMfpPlan>,
1470 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1471 temp_storage: &'a RowArena,
1472 key_len: usize,
1473) -> Option<Row> {
1474 let mut row_builder = SharedRow::get();
1475 if let Some(mfp) = mfp_after {
1478 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1481 Some(row_builder.pack_using(iter.skip(key_len)))
1484 } else {
1485 None
1486 }
1487 } else {
1488 Some(row_builder.pack_using(&datums_local[key_len..]))
1489 }
1490}
1491
1492fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1493 match aggr_func {
1494 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1495 trues: Diff::ZERO,
1496 falses: Diff::ZERO,
1497 },
1498 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1499 accum: AccumCount::ZERO,
1500 pos_infs: Diff::ZERO,
1501 neg_infs: Diff::ZERO,
1502 nans: Diff::ZERO,
1503 non_nulls: Diff::ZERO,
1504 },
1505 AggregateFunc::SumNumeric => Accum::Numeric {
1506 accum: OrderedDecimal(NumericAgg::zero()),
1507 pos_infs: Diff::ZERO,
1508 neg_infs: Diff::ZERO,
1509 nans: Diff::ZERO,
1510 non_nulls: Diff::ZERO,
1511 },
1512 _ => Accum::SimpleNumber {
1513 accum: AccumCount::ZERO,
1514 non_nulls: Diff::ZERO,
1515 },
1516 }
1517}
1518
1519static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1520
1521fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1522 match aggregate_func {
1523 AggregateFunc::Count => Accum::SimpleNumber {
1524 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1526 Diff::ZERO
1527 } else {
1528 Diff::ONE
1529 },
1530 },
1531 AggregateFunc::Any | AggregateFunc::All => match datum {
1532 Datum::True => Accum::Bool {
1533 trues: Diff::ONE,
1534 falses: Diff::ZERO,
1535 },
1536 Datum::Null => Accum::Bool {
1537 trues: Diff::ZERO,
1538 falses: Diff::ZERO,
1539 },
1540 Datum::False => Accum::Bool {
1541 trues: Diff::ZERO,
1542 falses: Diff::ONE,
1543 },
1544 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1545 },
1546 AggregateFunc::Dummy => match datum {
1547 Datum::Dummy => Accum::SimpleNumber {
1548 accum: AccumCount::ZERO,
1549 non_nulls: Diff::ZERO,
1550 },
1551 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1552 },
1553 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1554 let n = match datum {
1555 Datum::Float32(n) => f64::from(*n),
1556 Datum::Float64(n) => *n,
1557 Datum::Null => 0f64,
1558 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1559 };
1560
1561 let nans = Diff::from(n.is_nan());
1562 let pos_infs = Diff::from(n == f64::INFINITY);
1563 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1564 let non_nulls = Diff::from(datum != Datum::Null);
1565
1566 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1569 AccumCount::ZERO
1570 } else {
1571 #[allow(clippy::as_conversions)]
1574 { (n * *FLOAT_SCALE) as i128 }.into()
1575 };
1576
1577 Accum::Float {
1578 accum,
1579 pos_infs,
1580 neg_infs,
1581 nans,
1582 non_nulls,
1583 }
1584 }
1585 AggregateFunc::SumNumeric => match datum {
1586 Datum::Numeric(n) => {
1587 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1588 if n.0.is_negative() {
1589 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1590 } else {
1591 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1592 }
1593 } else if n.0.is_nan() {
1594 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1595 } else {
1596 let mut cx_agg = numeric::cx_agg();
1599 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1600 };
1601
1602 Accum::Numeric {
1603 accum: OrderedDecimal(accum),
1604 pos_infs,
1605 neg_infs,
1606 nans,
1607 non_nulls: Diff::ONE,
1608 }
1609 }
1610 Datum::Null => Accum::Numeric {
1611 accum: OrderedDecimal(NumericAgg::zero()),
1612 pos_infs: Diff::ZERO,
1613 neg_infs: Diff::ZERO,
1614 nans: Diff::ZERO,
1615 non_nulls: Diff::ZERO,
1616 },
1617 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1618 },
1619 _ => {
1620 match datum {
1624 Datum::Int16(i) => Accum::SimpleNumber {
1625 accum: i.into(),
1626 non_nulls: Diff::ONE,
1627 },
1628 Datum::Int32(i) => Accum::SimpleNumber {
1629 accum: i.into(),
1630 non_nulls: Diff::ONE,
1631 },
1632 Datum::Int64(i) => Accum::SimpleNumber {
1633 accum: i.into(),
1634 non_nulls: Diff::ONE,
1635 },
1636 Datum::UInt16(u) => Accum::SimpleNumber {
1637 accum: u.into(),
1638 non_nulls: Diff::ONE,
1639 },
1640 Datum::UInt32(u) => Accum::SimpleNumber {
1641 accum: u.into(),
1642 non_nulls: Diff::ONE,
1643 },
1644 Datum::UInt64(u) => Accum::SimpleNumber {
1645 accum: u.into(),
1646 non_nulls: Diff::ONE,
1647 },
1648 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1649 accum: u64::from(t).into(),
1650 non_nulls: Diff::ONE,
1651 },
1652 Datum::Null => Accum::SimpleNumber {
1653 accum: AccumCount::ZERO,
1654 non_nulls: Diff::ZERO,
1655 },
1656 x => panic!("Accumulating non-integer data: {x:?}"),
1657 }
1658 }
1659 }
1660}
1661
1662fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1663 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1667 Datum::Null
1668 } else {
1669 match (&aggr_func, &accum) {
1670 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1671 Datum::Int64(non_nulls.into_inner())
1672 }
1673 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1674 if falses.is_positive() {
1676 Datum::False
1677 } else if *trues == total {
1678 Datum::True
1679 } else {
1680 Datum::Null
1681 }
1682 }
1683 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1684 if trues.is_positive() {
1686 Datum::True
1687 } else if *falses == total {
1688 Datum::False
1689 } else {
1690 Datum::Null
1691 }
1692 }
1693 (AggregateFunc::Dummy, _) => Datum::Dummy,
1694 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1696 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1697 #[allow(clippy::as_conversions)]
1702 Datum::Int64(accum.into_inner() as i64)
1703 }
1704 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1705 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1706 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1707 if !accum.is_negative() {
1708 #[allow(clippy::as_conversions)]
1714 Datum::UInt64(accum.into_inner() as u64)
1715 } else {
1716 Datum::Null
1720 }
1721 }
1722 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1723 if !accum.is_negative() {
1724 Datum::from(*accum)
1725 } else {
1726 Datum::Null
1730 }
1731 }
1732 (
1733 AggregateFunc::SumFloat32,
1734 Accum::Float {
1735 accum,
1736 pos_infs,
1737 neg_infs,
1738 nans,
1739 non_nulls: _,
1740 },
1741 ) => {
1742 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1743 Datum::from(f32::NAN)
1746 } else if pos_infs.is_positive() {
1747 Datum::from(f32::INFINITY)
1748 } else if neg_infs.is_positive() {
1749 Datum::from(f32::NEG_INFINITY)
1750 } else {
1751 #[allow(clippy::as_conversions)]
1753 {
1754 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1755 }
1756 }
1757 }
1758 (
1759 AggregateFunc::SumFloat64,
1760 Accum::Float {
1761 accum,
1762 pos_infs,
1763 neg_infs,
1764 nans,
1765 non_nulls: _,
1766 },
1767 ) => {
1768 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1769 Datum::from(f64::NAN)
1772 } else if pos_infs.is_positive() {
1773 Datum::from(f64::INFINITY)
1774 } else if neg_infs.is_positive() {
1775 Datum::from(f64::NEG_INFINITY)
1776 } else {
1777 #[allow(clippy::as_conversions)]
1779 {
1780 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1781 }
1782 }
1783 }
1784 (
1785 AggregateFunc::SumNumeric,
1786 Accum::Numeric {
1787 accum,
1788 pos_infs,
1789 neg_infs,
1790 nans,
1791 non_nulls: _,
1792 },
1793 ) => {
1794 let mut cx_datum = numeric::cx_datum();
1795 let d = cx_datum.to_width(accum.0);
1796 let inf_d = d.is_infinite();
1802 let neg_d = d.is_negative();
1803 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1804 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1805 if nans.is_positive() || (pos_inf && neg_inf) {
1806 Datum::from(Numeric::nan())
1809 } else if pos_inf {
1810 Datum::from(Numeric::infinity())
1811 } else if neg_inf {
1812 let mut cx = numeric::cx_datum();
1813 let mut d = Numeric::infinity();
1814 cx.neg(&mut d);
1815 Datum::from(d)
1816 } else {
1817 Datum::from(d)
1818 }
1819 }
1820 _ => panic!(
1821 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1822 aggr_func
1823 ),
1824 }
1825 }
1826}
1827
1828type AccumCount = mz_ore::Overflowing<i128>;
1830
1831#[derive(
1842 Debug,
1843 Clone,
1844 Copy,
1845 PartialEq,
1846 Eq,
1847 PartialOrd,
1848 Ord,
1849 Serialize,
1850 Deserialize
1851)]
1852enum Accum {
1853 Bool {
1855 trues: Diff,
1857 falses: Diff,
1859 },
1860 SimpleNumber {
1862 accum: AccumCount,
1864 non_nulls: Diff,
1866 },
1867 Float {
1869 accum: AccumCount,
1872 pos_infs: Diff,
1874 neg_infs: Diff,
1876 nans: Diff,
1878 non_nulls: Diff,
1880 },
1881 Numeric {
1883 accum: OrderedDecimal<NumericAgg>,
1885 pos_infs: Diff,
1887 neg_infs: Diff,
1889 nans: Diff,
1891 non_nulls: Diff,
1893 },
1894}
1895
1896impl IsZero for Accum {
1897 fn is_zero(&self) -> bool {
1898 match self {
1899 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1900 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1901 Accum::Float {
1902 accum,
1903 pos_infs,
1904 neg_infs,
1905 nans,
1906 non_nulls,
1907 } => {
1908 accum.is_zero()
1909 && pos_infs.is_zero()
1910 && neg_infs.is_zero()
1911 && nans.is_zero()
1912 && non_nulls.is_zero()
1913 }
1914 Accum::Numeric {
1915 accum,
1916 pos_infs,
1917 neg_infs,
1918 nans,
1919 non_nulls,
1920 } => {
1921 accum.0.is_zero()
1922 && pos_infs.is_zero()
1923 && neg_infs.is_zero()
1924 && nans.is_zero()
1925 && non_nulls.is_zero()
1926 }
1927 }
1928 }
1929}
1930
1931impl Semigroup for Accum {
1932 fn plus_equals(&mut self, other: &Accum) {
1933 match (&mut *self, other) {
1934 (
1935 Accum::Bool { trues, falses },
1936 Accum::Bool {
1937 trues: other_trues,
1938 falses: other_falses,
1939 },
1940 ) => {
1941 *trues += other_trues;
1942 *falses += other_falses;
1943 }
1944 (
1945 Accum::SimpleNumber { accum, non_nulls },
1946 Accum::SimpleNumber {
1947 accum: other_accum,
1948 non_nulls: other_non_nulls,
1949 },
1950 ) => {
1951 *accum += other_accum;
1952 *non_nulls += other_non_nulls;
1953 }
1954 (
1955 Accum::Float {
1956 accum,
1957 pos_infs,
1958 neg_infs,
1959 nans,
1960 non_nulls,
1961 },
1962 Accum::Float {
1963 accum: other_accum,
1964 pos_infs: other_pos_infs,
1965 neg_infs: other_neg_infs,
1966 nans: other_nans,
1967 non_nulls: other_non_nulls,
1968 },
1969 ) => {
1970 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1971 warn!("Float accumulator overflow. Incorrect results possible");
1972 accum.wrapping_add(*other_accum)
1973 });
1974 *pos_infs += other_pos_infs;
1975 *neg_infs += other_neg_infs;
1976 *nans += other_nans;
1977 *non_nulls += other_non_nulls;
1978 }
1979 (
1980 Accum::Numeric {
1981 accum,
1982 pos_infs,
1983 neg_infs,
1984 nans,
1985 non_nulls,
1986 },
1987 Accum::Numeric {
1988 accum: other_accum,
1989 pos_infs: other_pos_infs,
1990 neg_infs: other_neg_infs,
1991 nans: other_nans,
1992 non_nulls: other_non_nulls,
1993 },
1994 ) => {
1995 let mut cx_agg = numeric::cx_agg();
1996 cx_agg.add(&mut accum.0, &other_accum.0);
1997 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2003 cx_agg.reduce(&mut accum.0);
2022 *pos_infs += other_pos_infs;
2023 *neg_infs += other_neg_infs;
2024 *nans += other_nans;
2025 *non_nulls += other_non_nulls;
2026 }
2027 (l, r) => unreachable!(
2028 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2029 ),
2030 }
2031 }
2032}
2033
2034impl Multiply<Diff> for Accum {
2035 type Output = Accum;
2036
2037 fn multiply(self, factor: &Diff) -> Accum {
2038 let factor = *factor;
2039 match self {
2040 Accum::Bool { trues, falses } => Accum::Bool {
2041 trues: trues * factor,
2042 falses: falses * factor,
2043 },
2044 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2045 accum: accum * AccumCount::from(factor),
2046 non_nulls: non_nulls * factor,
2047 },
2048 Accum::Float {
2049 accum,
2050 pos_infs,
2051 neg_infs,
2052 nans,
2053 non_nulls,
2054 } => Accum::Float {
2055 accum: accum
2056 .checked_mul(AccumCount::from(factor))
2057 .unwrap_or_else(|| {
2058 warn!("Float accumulator overflow. Incorrect results possible");
2059 accum.wrapping_mul(AccumCount::from(factor))
2060 }),
2061 pos_infs: pos_infs * factor,
2062 neg_infs: neg_infs * factor,
2063 nans: nans * factor,
2064 non_nulls: non_nulls * factor,
2065 },
2066 Accum::Numeric {
2067 accum,
2068 pos_infs,
2069 neg_infs,
2070 nans,
2071 non_nulls,
2072 } => {
2073 let mut cx = numeric::cx_agg();
2074 let mut f = NumericAgg::from(factor.into_inner());
2075 cx.mul(&mut f, &accum.0);
2079 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2085 Accum::Numeric {
2086 accum: OrderedDecimal(f),
2087 pos_infs: pos_infs * factor,
2088 neg_infs: neg_infs * factor,
2089 nans: nans * factor,
2090 non_nulls: non_nulls * factor,
2091 }
2092 }
2093 }
2094 }
2095}
2096
2097impl Columnation for Accum {
2098 type InnerRegion = CopyRegion<Self>;
2099}
2100
2101mod monoids {
2103
2104 use columnation::{Columnation, Region};
2120 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2121 use mz_expr::AggregateFunc;
2122 use mz_ore::soft_panic_or_log;
2123 use mz_repr::{Datum, Diff, Row};
2124 use serde::{Deserialize, Serialize};
2125
2126 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2128 pub enum ReductionMonoid {
2129 Min(Row),
2130 Max(Row),
2131 }
2132
2133 impl ReductionMonoid {
2134 pub fn finalize(&self) -> &Row {
2135 use ReductionMonoid::*;
2136 match self {
2137 Min(row) | Max(row) => row,
2138 }
2139 }
2140 }
2141
2142 impl Clone for ReductionMonoid {
2143 fn clone(&self) -> Self {
2144 use ReductionMonoid::*;
2145 match self {
2146 Min(row) => Min(row.clone()),
2147 Max(row) => Max(row.clone()),
2148 }
2149 }
2150
2151 fn clone_from(&mut self, source: &Self) {
2152 use ReductionMonoid::*;
2153
2154 let mut row = std::mem::take(match self {
2155 Min(row) | Max(row) => row,
2156 });
2157
2158 let source_row = match source {
2159 Min(row) | Max(row) => row,
2160 };
2161
2162 row.clone_from(source_row);
2163
2164 match source {
2165 Min(_) => *self = Min(row),
2166 Max(_) => *self = Max(row),
2167 }
2168 }
2169 }
2170
2171 impl Multiply<Diff> for ReductionMonoid {
2172 type Output = Self;
2173
2174 fn multiply(self, factor: &Diff) -> Self {
2175 assert!(factor.is_positive());
2180 self
2181 }
2182 }
2183
2184 impl Semigroup for ReductionMonoid {
2185 fn plus_equals(&mut self, rhs: &Self) {
2186 match (self, rhs) {
2187 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2188 let swap = {
2189 let lhs_val = lhs.unpack_first();
2190 let rhs_val = rhs.unpack_first();
2191 match (lhs_val, rhs_val) {
2193 (_, Datum::Null) => false,
2194 (Datum::Null, _) => true,
2195 (lhs, rhs) => rhs < lhs,
2196 }
2197 };
2198 if swap {
2199 lhs.clone_from(rhs);
2200 }
2201 }
2202 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2203 let swap = {
2204 let lhs_val = lhs.unpack_first();
2205 let rhs_val = rhs.unpack_first();
2206 match (lhs_val, rhs_val) {
2208 (_, Datum::Null) => false,
2209 (Datum::Null, _) => true,
2210 (lhs, rhs) => rhs > lhs,
2211 }
2212 };
2213 if swap {
2214 lhs.clone_from(rhs);
2215 }
2216 }
2217 (lhs, rhs) => {
2218 soft_panic_or_log!(
2219 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2220 );
2221 }
2222 }
2223 }
2224 }
2225
2226 impl IsZero for ReductionMonoid {
2227 fn is_zero(&self) -> bool {
2228 false
2234 }
2235 }
2236
2237 impl Columnation for ReductionMonoid {
2238 type InnerRegion = ReductionMonoidRegion;
2239 }
2240
2241 #[derive(Default)]
2245 pub struct ReductionMonoidRegion {
2246 inner: <Row as Columnation>::InnerRegion,
2247 }
2248
2249 impl Region for ReductionMonoidRegion {
2250 type Item = ReductionMonoid;
2251
2252 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2253 use ReductionMonoid::*;
2254 match item {
2255 Min(row) => Min(unsafe { self.inner.copy(row) }),
2256 Max(row) => Max(unsafe { self.inner.copy(row) }),
2257 }
2258 }
2259
2260 fn clear(&mut self) {
2261 self.inner.clear();
2262 }
2263
2264 fn reserve_items<'a, I>(&mut self, items: I)
2265 where
2266 Self: 'a,
2267 I: Iterator<Item = &'a Self::Item> + Clone,
2268 {
2269 self.inner
2270 .reserve_items(items.map(ReductionMonoid::finalize));
2271 }
2272
2273 fn reserve_regions<'a, I>(&mut self, regions: I)
2274 where
2275 Self: 'a,
2276 I: Iterator<Item = &'a Self> + Clone,
2277 {
2278 self.inner.reserve_regions(regions.map(|r| &r.inner));
2279 }
2280
2281 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2282 self.inner.heap_size(callback);
2283 }
2284 }
2285
2286 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2289 match func {
2290 AggregateFunc::MaxNumeric
2291 | AggregateFunc::MaxInt16
2292 | AggregateFunc::MaxInt32
2293 | AggregateFunc::MaxInt64
2294 | AggregateFunc::MaxUInt16
2295 | AggregateFunc::MaxUInt32
2296 | AggregateFunc::MaxUInt64
2297 | AggregateFunc::MaxMzTimestamp
2298 | AggregateFunc::MaxFloat32
2299 | AggregateFunc::MaxFloat64
2300 | AggregateFunc::MaxBool
2301 | AggregateFunc::MaxString
2302 | AggregateFunc::MaxDate
2303 | AggregateFunc::MaxTimestamp
2304 | AggregateFunc::MaxTimestampTz
2305 | AggregateFunc::MaxInterval
2306 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2307 AggregateFunc::MinNumeric
2308 | AggregateFunc::MinInt16
2309 | AggregateFunc::MinInt32
2310 | AggregateFunc::MinInt64
2311 | AggregateFunc::MinUInt16
2312 | AggregateFunc::MinUInt32
2313 | AggregateFunc::MinUInt64
2314 | AggregateFunc::MinMzTimestamp
2315 | AggregateFunc::MinFloat32
2316 | AggregateFunc::MinFloat64
2317 | AggregateFunc::MinBool
2318 | AggregateFunc::MinString
2319 | AggregateFunc::MinDate
2320 | AggregateFunc::MinTimestamp
2321 | AggregateFunc::MinTimestampTz
2322 | AggregateFunc::MinInterval
2323 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2324 AggregateFunc::SumInt16
2325 | AggregateFunc::SumInt32
2326 | AggregateFunc::SumInt64
2327 | AggregateFunc::SumUInt16
2328 | AggregateFunc::SumUInt32
2329 | AggregateFunc::SumUInt64
2330 | AggregateFunc::SumFloat32
2331 | AggregateFunc::SumFloat64
2332 | AggregateFunc::SumNumeric
2333 | AggregateFunc::Count
2334 | AggregateFunc::Any
2335 | AggregateFunc::All
2336 | AggregateFunc::Dummy
2337 | AggregateFunc::JsonbAgg { .. }
2338 | AggregateFunc::JsonbObjectAgg { .. }
2339 | AggregateFunc::MapAgg { .. }
2340 | AggregateFunc::ArrayConcat { .. }
2341 | AggregateFunc::ListConcat { .. }
2342 | AggregateFunc::StringAgg { .. }
2343 | AggregateFunc::RowNumber { .. }
2344 | AggregateFunc::Rank { .. }
2345 | AggregateFunc::DenseRank { .. }
2346 | AggregateFunc::LagLead { .. }
2347 | AggregateFunc::FirstValue { .. }
2348 | AggregateFunc::LastValue { .. }
2349 | AggregateFunc::WindowAggregate { .. }
2350 | AggregateFunc::FusedValueWindowFunc { .. }
2351 | AggregateFunc::FusedWindowAggregate { .. } => None,
2352 }
2353 }
2354}
2355
2356mod window_agg_helpers {
2357 use crate::render::reduce::*;
2358
2359 pub enum OneByOneAggrImpls {
2364 Accumulable(AccumulableOneByOneAggr),
2365 Hierarchical(HierarchicalOneByOneAggr),
2366 Basic(mz_expr::NaiveOneByOneAggr),
2367 }
2368
2369 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2370 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2371 match reduction_type(agg) {
2372 ReductionType::Basic => {
2373 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2374 }
2375 ReductionType::Accumulable => {
2376 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2377 }
2378 ReductionType::Hierarchical => {
2379 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2380 }
2381 }
2382 }
2383
2384 fn give(&mut self, d: &Datum) {
2385 match self {
2386 OneByOneAggrImpls::Basic(i) => i.give(d),
2387 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2388 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2389 }
2390 }
2391
2392 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2393 match self {
2395 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2396 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2397 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2398 }
2399 }
2400 }
2401
2402 pub struct AccumulableOneByOneAggr {
2403 aggr_func: AggregateFunc,
2404 accum: Accum,
2405 total: Diff,
2406 }
2407
2408 impl AccumulableOneByOneAggr {
2409 fn new(aggr_func: &AggregateFunc) -> Self {
2410 AccumulableOneByOneAggr {
2411 aggr_func: aggr_func.clone(),
2412 accum: accumulable_zero(aggr_func),
2413 total: Diff::ZERO,
2414 }
2415 }
2416
2417 fn give(&mut self, d: &Datum) {
2418 self.accum
2419 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2420 self.total += Diff::ONE;
2421 }
2422
2423 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2424 temp_storage.make_datum(|packer| {
2425 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2426 })
2427 }
2428 }
2429
2430 pub struct HierarchicalOneByOneAggr {
2431 aggr_func: AggregateFunc,
2432 monoid: ReductionMonoid,
2435 }
2436
2437 impl HierarchicalOneByOneAggr {
2438 fn new(aggr_func: &AggregateFunc) -> Self {
2439 let mut row_buf = Row::default();
2440 row_buf.packer().push(Datum::Null);
2441 HierarchicalOneByOneAggr {
2442 aggr_func: aggr_func.clone(),
2443 monoid: get_monoid(row_buf, aggr_func)
2444 .expect("aggr_func should be a hierarchical aggregation function"),
2445 }
2446 }
2447
2448 fn give(&mut self, d: &Datum) {
2449 let mut row_buf = Row::default();
2450 row_buf.packer().push(d);
2451 let m = get_monoid(row_buf, &self.aggr_func)
2452 .expect("aggr_func should be a hierarchical aggregation function");
2453 self.monoid.plus_equals(&m);
2454 }
2455
2456 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2457 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2458 }
2459 }
2460}