1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Data, VecCollection};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58 ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59 RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64 G: Scope,
65 G::Timestamp: MzTimestamp + Refines<T>,
66 T: MzTimestamp,
67{
68 pub fn render_reduce(
71 &self,
72 input_key: Option<Vec<MirScalarExpr>>,
73 input: CollectionBundle<G, T>,
74 key_val_plan: KeyValPlan,
75 reduce_plan: ReducePlan,
76 mfp_after: Option<MapFilterProject>,
77 ) -> CollectionBundle<G, T> {
78 let mfp_after = mfp_after.map(|m| {
80 m.into_plan()
81 .expect("MFP planning must succeed")
82 .into_nontemporal()
83 .expect("Fused Reduce MFPs do not have temporal predicates")
84 });
85
86 input.scope().region_named("Reduce", |inner| {
87 let KeyValPlan {
88 mut key_plan,
89 mut val_plan,
90 } = key_val_plan;
91 let key_arity = key_plan.projection.len();
92 let mut datums = DatumVec::new();
93
94 let mut demand = Vec::new();
96 demand.extend(key_plan.demand());
97 demand.extend(val_plan.demand());
98 demand.sort();
99 demand.dedup();
100
101 let mut demand_map = BTreeMap::new();
103 for column in demand.iter() {
104 demand_map.insert(*column, demand_map.len());
105 }
106 let demand_map_len = demand_map.len();
107 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113 input_key.map(|k| (k, None)),
114 max_demand,
115 move |row_datums, time, diff| {
116 let mut row_builder = SharedRow::get();
117 let temp_storage = RowArena::new();
118
119 let mut row_iter = row_datums.drain(..);
120 let mut datums_local = datums.borrow();
121 for skip in skips.iter() {
123 datums_local.push(row_iter.nth(*skip).unwrap());
124 }
125
126 let key =
128 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129 let key = match key {
130 Err(e) => {
131 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132 }
133 Ok(Some(key)) => key.clone(),
134 Ok(None) => panic!("Row expected as no predicate was used"),
135 };
136
137 datums_local.truncate(skips.len());
140 let val =
141 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142 let val = match val {
143 Err(e) => {
144 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145 }
146 Ok(Some(val)) => val.clone(),
147 Ok(None) => panic!("Row expected as no predicate was used"),
148 };
149
150 Some((Ok((key, val)), time.clone(), diff.clone()))
151 },
152 );
153
154 type CB<T> = ConsolidatingContainerBuilder<T>;
156 let (ok, mut err) = key_val_input
157 .as_collection()
158 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160 err = err.concat(&err_input);
161
162 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164 .leave_region()
165 })
166 }
167
168 fn render_reduce_plan<S>(
174 &self,
175 plan: ReducePlan,
176 collection: VecCollection<S, (Row, Row), Diff>,
177 err_input: VecCollection<S, DataflowError, Diff>,
178 key_arity: usize,
179 mfp_after: Option<SafeMfpPlan>,
180 ) -> CollectionBundle<S, T>
181 where
182 S: Scope<Timestamp = G::Timestamp>,
183 {
184 let mut errors = Default::default();
185 let arrangement =
186 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188 CollectionBundle::from_columns(
189 0..key_arity,
190 ArrangementFlavor::Local(
191 arrangement,
192 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193 ),
194 )
195 }
196
197 fn render_reduce_plan_inner<S>(
198 &self,
199 plan: ReducePlan,
200 collection: VecCollection<S, (Row, Row), Diff>,
201 errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
202 key_arity: usize,
203 mfp_after: Option<SafeMfpPlan>,
204 ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205 where
206 S: Scope<Timestamp = G::Timestamp>,
207 {
208 let arrangement = match plan {
211 ReducePlan::Distinct => {
214 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215 errors.push(errs);
216 arranged_output
217 }
218 ReducePlan::Accumulable(expr) => {
219 let (arranged_output, errs) =
220 self.build_accumulable(collection, expr, key_arity, mfp_after);
221 errors.push(errs);
222 arranged_output
223 }
224 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226 errors.push(errs);
227 output
228 }
229 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231 errors.push(errs);
232 output
233 }
234 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235 expr,
236 fused_unnest_list,
237 })) => {
238 let validating = !fused_unnest_list;
242 let (output, errs) = self.build_basic_aggregate(
243 collection,
244 0,
245 &expr,
246 validating,
247 key_arity,
248 mfp_after,
249 fused_unnest_list,
250 );
251 if validating {
252 errors.push(errs.expect("validation should have occurred as it was requested"));
253 }
254 output
255 }
256 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257 let (output, errs) =
258 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259 errors.push(errs);
260 output
261 }
262 };
263 arrangement
264 }
265
266 fn build_distinct<S>(
268 &self,
269 collection: VecCollection<S, (Row, Row), Diff>,
270 mfp_after: Option<SafeMfpPlan>,
271 ) -> (
272 Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
273 VecCollection<S, DataflowError, Diff>,
274 )
275 where
276 S: Scope<Timestamp = G::Timestamp>,
277 {
278 let error_logger = self.error_logger();
279
280 let mut datums1 = DatumVec::new();
282 let mut datums2 = DatumVec::new();
283 let mfp_after1 = mfp_after.clone();
284 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
285
286 let (output, errors) = collection
287 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
288 "Arranged DistinctBy",
289 )
290 .reduce_pair::<
291 _,
292 RowRowBuilder<_, _>,
293 RowRowSpine<_, _>,
294 _,
295 RowErrBuilder<_, _>,
296 RowErrSpine<_, _>,
297 >(
298 "DistinctBy",
299 "DistinctByErrorCheck",
300 move |key, _input, output| {
301 let temp_storage = RowArena::new();
302 let mut datums_local = datums1.borrow();
303 datums_local.extend(key.to_datum_iter());
304
305 if mfp_after1
309 .as_ref()
310 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
311 .unwrap_or(Ok(true))
312 == Ok(true)
313 {
314 output.push((Row::default(), Diff::ONE));
318 }
319 },
320 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
321 for (_, count) in input.iter() {
322 if count.is_positive() {
323 continue;
324 }
325 let message = "Non-positive multiplicity in DistinctBy";
326 error_logger.log(message, &format!("row={key:?}, count={count}"));
327 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
328 return;
329 }
330 let Some(mfp) = &mfp_after2 else { return };
332 let temp_storage = RowArena::new();
333 let datum_iter = key.to_datum_iter();
334 let mut datums_local = datums2.borrow();
335 datums_local.extend(datum_iter);
336
337 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
338 output.push((e.into(), Diff::ONE));
339 }
340 },
341 );
342 (output, errors.as_collection(|_k, v| v.clone()))
343 }
344
345 fn build_basic_aggregates<S>(
353 &self,
354 input: VecCollection<S, (Row, Row), Diff>,
355 aggrs: Vec<AggregateExpr>,
356 key_arity: usize,
357 mfp_after: Option<SafeMfpPlan>,
358 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
359 where
360 S: Scope<Timestamp = G::Timestamp>,
361 {
362 if aggrs.len() <= 1 {
365 self.error_logger().soft_panic_or_log(
366 "Too few aggregations when building basic aggregates",
367 &format!("len={}", aggrs.len()),
368 )
369 }
370 let mut err_output = None;
371 let mut to_collect = Vec::new();
372 for (index, aggr) in aggrs.into_iter().enumerate() {
373 let (result, errs) = self.build_basic_aggregate(
374 input.clone(),
375 index,
376 &aggr,
377 err_output.is_none(),
378 key_arity,
379 None,
380 false,
381 );
382 if errs.is_some() {
383 err_output = errs
384 }
385 to_collect
386 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
387 }
388
389 let mut datums1 = DatumVec::new();
391 let mut datums2 = DatumVec::new();
392 let mfp_after1 = mfp_after.clone();
393 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
394
395 let arranged =
396 differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
397 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
398 "Arranged ReduceFuseBasic input",
399 );
400
401 let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
402 "ReduceFuseBasic",
403 {
404 move |key, input, output| {
405 let temp_storage = RowArena::new();
406 let datum_iter = key.to_datum_iter();
407 let mut datums_local = datums1.borrow();
408 datums_local.extend(datum_iter);
409 let key_len = datums_local.len();
410
411 for ((_, row), _) in input.iter() {
412 datums_local.push(row.unpack_first());
413 }
414
415 if let Some(row) =
416 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
417 {
418 output.push((row, Diff::ONE));
419 }
420 }
421 },
422 );
423 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
428 if let Some(mfp) = mfp_after2 {
429 let mfp_errs = arranged
430 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
431 "ReduceFuseBasic Error Check",
432 move |key, input, output| {
433 let temp_storage = RowArena::new();
436 let datum_iter = key.to_datum_iter();
437 let mut datums_local = datums2.borrow();
438 datums_local.extend(datum_iter);
439
440 for ((_, row), _) in input.iter() {
441 datums_local.push(row.unpack_first());
442 }
443
444 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
445 output.push((e.into(), Diff::ONE));
446 }
447 },
448 )
449 .as_collection(|_, v| v.clone());
450 (output, validation_errs.concat(&mfp_errs))
451 } else {
452 (output, validation_errs)
453 }
454 }
455
456 fn build_basic_aggregate<S>(
460 &self,
461 input: VecCollection<S, (Row, Row), Diff>,
462 index: usize,
463 aggr: &AggregateExpr,
464 validating: bool,
465 key_arity: usize,
466 mfp_after: Option<SafeMfpPlan>,
467 fused_unnest_list: bool,
468 ) -> (
469 RowRowArrangement<S>,
470 Option<VecCollection<S, DataflowError, Diff>>,
471 )
472 where
473 S: Scope<Timestamp = G::Timestamp>,
474 {
475 let AggregateExpr {
476 func,
477 expr: _,
478 distinct,
479 } = aggr.clone();
480
481 let mut partial = input.map(move |(key, row)| {
483 let mut row_builder = SharedRow::get();
484 let value = row.iter().nth(index).unwrap();
485 row_builder.packer().push(value);
486 (key, row_builder.clone())
487 });
488
489 let mut err_output = None;
490
491 if distinct {
493 let pairer = Pairer::new(key_arity);
495 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
496 if validating {
497 let (oks, errs) = self
498 .build_reduce_inaccumulable_distinct::<
499 _,
500 RowValBuilder<Result<(), String>, _, _>,
501 RowValSpine<Result<(), String>, _, _>,
502 >(keyed, None)
503 .as_collection(|k, v| {
504 (
505 k.to_row(),
506 v.as_ref()
507 .map(|&()| ())
508 .map_err(|m| m.as_str().into()),
509 )
510 })
511 .map_fallible::<
512 CapacityContainerBuilder<_>,
513 CapacityContainerBuilder<_>,
514 _,
515 _,
516 _,
517 >(
518 "Demux Errors",
519 move |(key_val, result)| match result {
520 Ok(()) => Ok(pairer.split(&key_val)),
521 Err(m) => {
522 Err(EvalError::Internal(m).into())
523 }
524 },
525 );
526 err_output = Some(errs);
527 partial = oks;
528 } else {
529 partial = self
530 .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
531 keyed,
532 Some(" [val: empty]"),
533 )
534 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
535 }
536 }
537
538 let mut datums1 = DatumVec::new();
540 let mut datums2 = DatumVec::new();
541 let mut datums_key_1 = DatumVec::new();
542 let mut datums_key_2 = DatumVec::new();
543 let mfp_after1 = mfp_after.clone();
544 let func2 = func.clone();
545
546 let name = if !fused_unnest_list {
547 "ReduceInaccumulable"
548 } else {
549 "FusedReduceUnnestList"
550 };
551 let arranged = partial
552 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
553 "Arranged {name}"
554 ));
555 let oks = if !fused_unnest_list {
556 arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
557 move |key, source, target| {
558 let iter = source.iter().flat_map(|(v, w)| {
562 let count = usize::try_from(w.into_inner()).unwrap_or(0);
565 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
566 });
567
568 let temp_storage = RowArena::new();
569 let datum_iter = key.to_datum_iter();
570 let mut datums_local = datums1.borrow();
571 datums_local.extend(datum_iter);
572 let key_len = datums_local.len();
573 datums_local.push(
574 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
577 iter,
578 &temp_storage,
579 ),
580 );
581
582 if let Some(row) =
583 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
584 {
585 target.push((row, Diff::ONE));
586 }
587 }
588 })
589 } else {
590 arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
591 move |key, source, target| {
592 let iter = source.iter().flat_map(|(v, w)| {
594 let count = usize::try_from(w.into_inner()).unwrap_or(0);
595 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
596 });
597
598 let temp_storage = RowArena::new();
600 let mut datums_local = datums_key_1.borrow();
601 datums_local.extend(key.to_datum_iter());
602 let key_len = datums_local.len();
603 for datum in func
604 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
605 iter,
606 &temp_storage,
607 )
608 {
609 datums_local.truncate(key_len);
610 datums_local.push(datum);
611 if let Some(row) = evaluate_mfp_after(
612 &mfp_after1,
613 &mut datums_local,
614 &temp_storage,
615 key_len,
616 ) {
617 target.push((row, Diff::ONE));
618 }
619 }
620 }
621 })
622 };
623
624 let must_validate = validating && err_output.is_none();
628 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
629 if must_validate || mfp_after2.is_some() {
630 let error_logger = self.error_logger();
631
632 let errs = if !fused_unnest_list {
633 arranged
634 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
635 &format!("{name} Error Check"),
636 move |key, source, target| {
637 if must_validate {
641 for (value, count) in source.iter() {
642 if count.is_positive() {
643 continue;
644 }
645 let value = value.to_row();
646 let message =
647 "Non-positive accumulation in ReduceInaccumulable";
648 error_logger
649 .log(message, &format!("value={value:?}, count={count}"));
650 let err = EvalError::Internal(message.into());
651 target.push((err.into(), Diff::ONE));
652 return;
653 }
654 }
655
656 let Some(mfp) = &mfp_after2 else { return };
658 let iter = source.iter().flat_map(|&(mut v, ref w)| {
659 let count = usize::try_from(w.into_inner()).unwrap_or(0);
660 std::iter::repeat(v.next().unwrap()).take(count)
663 });
664
665 let temp_storage = RowArena::new();
666 let datum_iter = key.to_datum_iter();
667 let mut datums_local = datums2.borrow();
668 datums_local.extend(datum_iter);
669 datums_local.push(
670 func2.eval_with_fast_window_agg::<
671 _,
672 window_agg_helpers::OneByOneAggrImpls,
673 >(
674 iter, &temp_storage
675 ),
676 );
677 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
678 target.push((e.into(), Diff::ONE));
679 }
680 },
681 )
682 .as_collection(|_, v| v.clone())
683 } else {
684 assert!(!must_validate);
686 let Some(mfp) = mfp_after2 else {
689 unreachable!()
690 };
691 arranged
692 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
693 &format!("{name} Error Check"),
694 move |key, source, target| {
695 let iter = source.iter().flat_map(|&(mut v, ref w)| {
696 let count = usize::try_from(w.into_inner()).unwrap_or(0);
697 std::iter::repeat(v.next().unwrap()).take(count)
700 });
701
702 let temp_storage = RowArena::new();
703 let mut datums_local = datums_key_2.borrow();
704 datums_local.extend(key.to_datum_iter());
705 let key_len = datums_local.len();
706 for datum in func2
707 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
708 iter,
709 &temp_storage,
710 )
711 {
712 datums_local.truncate(key_len);
713 datums_local.push(datum);
714 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
717 {
718 target.push((e.into(), Diff::ONE));
719 }
720 }
721 },
722 )
723 .as_collection(|_, v| v.clone())
724 };
725
726 if let Some(e) = err_output {
727 err_output = Some(e.concat(&errs));
728 } else {
729 err_output = Some(errs);
730 }
731 }
732 (oks, err_output)
733 }
734
735 fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
736 &self,
737 input: VecCollection<S, Row, Diff>,
738 name_tag: Option<&str>,
739 ) -> Arranged<S, TraceAgent<Tr>>
740 where
741 S: Scope<Timestamp = G::Timestamp>,
742 Tr: for<'a> Trace<
743 Key<'a> = DatumSeq<'a>,
744 KeyOwn = Row,
745 Time = G::Timestamp,
746 Diff = Diff,
747 ValOwn: Data + MaybeValidatingRow<(), String>,
748 > + 'static,
749 Bu: Builder<
750 Time = G::Timestamp,
751 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
752 Output = Tr::Batch,
753 >,
754 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
755 {
756 let error_logger = self.error_logger();
757
758 let output_name = format!(
759 "ReduceInaccumulable Distinct{}",
760 name_tag.unwrap_or_default()
761 );
762
763 let input: KeyCollection<_, _, _> = input.into();
764 input
765 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
766 "Arranged ReduceInaccumulable Distinct [val: empty]",
767 )
768 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
769 if let Some(err) = Tr::ValOwn::into_error() {
770 for (value, count) in source.iter() {
771 if count.is_positive() {
772 continue;
773 }
774
775 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
776 error_logger.log(message, &format!("value={value:?}, count={count}"));
777 t.push((err(message.to_string()), Diff::ONE));
778 return;
779 }
780 }
781 t.push((Tr::ValOwn::ok(()), Diff::ONE))
782 })
783 }
784
785 fn build_bucketed<S>(
803 &self,
804 input: VecCollection<S, (Row, Row), Diff>,
805 BucketedPlan {
806 aggr_funcs,
807 buckets,
808 }: BucketedPlan,
809 key_arity: usize,
810 mfp_after: Option<SafeMfpPlan>,
811 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
812 where
813 S: Scope<Timestamp = G::Timestamp>,
814 {
815 let mut err_output: Option<VecCollection<S, _, _>> = None;
816 let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
817 let input = input.enter(inner);
818
819 let first_mod = buckets.get(0).copied().unwrap_or(1);
821 let aggregations = aggr_funcs.len();
822
823 let mut stage = input.map(move |(key, row)| {
825 let mut row_builder = SharedRow::get();
826 let mut row_packer = row_builder.packer();
827 row_packer.extend(row.iter().take(aggregations));
828 let values = row_builder.clone();
829
830 let hash = values.hashed() % first_mod;
832 let hash_key =
833 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
834 (hash_key, values)
835 });
836
837 for (index, b) in buckets.into_iter().enumerate() {
839 let input = if index == 0 {
841 stage
842 } else {
843 stage.map(move |(hash_key, values)| {
844 let mut hash_key_iter = hash_key.iter();
845 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
846 let hash_key = SharedRow::pack(
848 std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
849 );
850 (hash_key, values)
851 })
852 };
853
854 let validating = err_output.is_none();
858
859 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
860 if let Some(errs) = errs {
861 err_output = Some(errs.leave_region());
862 }
863 stage = oks
864 }
865
866 let partial = stage.map(move |(hash_key, values)| {
868 let mut hash_key_iter = hash_key.iter();
869 let _hash = hash_key_iter.next();
870 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
871 });
872
873 let mut datums1 = DatumVec::new();
875 let mut datums2 = DatumVec::new();
876 let mfp_after1 = mfp_after.clone();
877 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
878 let aggr_funcs2 = aggr_funcs.clone();
879
880 let error_logger = self.error_logger();
883 let arranged = partial
886 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
887 "Arrange ReduceMinsMaxes",
888 );
889 let must_validate = err_output.is_none();
893 if must_validate || mfp_after2.is_some() {
894 let errs = arranged
895 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
896 "ReduceMinsMaxes Error Check",
897 move |key, source, target| {
898 if must_validate {
902 for (val, count) in source.iter() {
903 if count.is_positive() {
904 continue;
905 }
906 let val = val.to_row();
907 let message = "Non-positive accumulation in ReduceMinsMaxes";
908 error_logger
909 .log(message, &format!("val={val:?}, count={count}"));
910 target.push((
911 EvalError::Internal(message.into()).into(),
912 Diff::ONE,
913 ));
914 return;
915 }
916 }
917
918 let Some(mfp) = &mfp_after2 else { return };
920 let temp_storage = RowArena::new();
921 let datum_iter = key.to_datum_iter();
922 let mut datums_local = datums2.borrow();
923 datums_local.extend(datum_iter);
924
925 let mut source_iters = source
926 .iter()
927 .map(|(values, _cnt)| *values)
928 .collect::<Vec<_>>();
929 for func in aggr_funcs2.iter() {
930 let column_iter = (0..source_iters.len())
931 .map(|i| source_iters[i].next().unwrap());
932 datums_local.push(func.eval(column_iter, &temp_storage));
933 }
934 if let Result::Err(e) =
935 mfp.evaluate_inner(&mut datums_local, &temp_storage)
936 {
937 target.push((e.into(), Diff::ONE));
938 }
939 },
940 )
941 .as_collection(|_, v| v.clone())
942 .leave_region();
943 if let Some(e) = &err_output {
944 err_output = Some(e.concat(&errs));
945 } else {
946 err_output = Some(errs);
947 }
948 }
949 arranged
950 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
951 "ReduceMinsMaxes",
952 move |key, source, target| {
953 let temp_storage = RowArena::new();
954 let datum_iter = key.to_datum_iter();
955 let mut datums_local = datums1.borrow();
956 datums_local.extend(datum_iter);
957 let key_len = datums_local.len();
958
959 let mut source_iters = source
960 .iter()
961 .map(|(values, _cnt)| *values)
962 .collect::<Vec<_>>();
963 for func in aggr_funcs.iter() {
964 let column_iter =
965 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
966 datums_local.push(func.eval(column_iter, &temp_storage));
967 }
968
969 if let Some(row) = evaluate_mfp_after(
970 &mfp_after1,
971 &mut datums_local,
972 &temp_storage,
973 key_len,
974 ) {
975 target.push((row, Diff::ONE));
976 }
977 },
978 )
979 .leave_region()
980 });
981 (
982 arranged_output,
983 err_output.expect("expected to validate in one level of the hierarchy"),
984 )
985 }
986
987 fn build_bucketed_stage<S>(
994 &self,
995 aggr_funcs: &Vec<AggregateFunc>,
996 input: &VecCollection<S, (Row, Row), Diff>,
997 validating: bool,
998 ) -> (
999 VecCollection<S, (Row, Row), Diff>,
1000 Option<VecCollection<S, DataflowError, Diff>>,
1001 )
1002 where
1003 S: Scope<Timestamp = G::Timestamp>,
1004 {
1005 let (input, negated_output, errs) = if validating {
1006 let (input, reduced) = self
1007 .build_bucketed_negated_output::<
1008 _,
1009 RowValBuilder<_, _, _>,
1010 RowValSpine<Result<Row, Row>, _, _>,
1011 >(
1012 input,
1013 aggr_funcs.clone(),
1014 );
1015 let (oks, errs) = reduced
1016 .as_collection(|k, v| (k.to_row(), v.clone()))
1017 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1018 "Checked Invalid Accumulations",
1019 |(hash_key, result)| match result {
1020 Err(hash_key) => {
1021 let mut hash_key_iter = hash_key.iter();
1022 let _hash = hash_key_iter.next();
1023 let key = SharedRow::pack(hash_key_iter);
1024 let message = format!(
1025 "Invalid data in source, saw non-positive accumulation \
1026 for key {key:?} in hierarchical mins-maxes aggregate"
1027 );
1028 Err(EvalError::Internal(message.into()).into())
1029 }
1030 Ok(values) => Ok((hash_key, values)),
1031 },
1032 );
1033 (input, oks, Some(errs))
1034 } else {
1035 let (input, reduced) = self
1036 .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1037 input,
1038 aggr_funcs.clone(),
1039 );
1040 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1043 (input, oks, None)
1044 };
1045
1046 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1047 let oks = negated_output.concat(&input);
1048 (oks, errs)
1049 }
1050
1051 fn build_bucketed_negated_output<S, Bu, Tr>(
1055 &self,
1056 input: &VecCollection<S, (Row, Row), Diff>,
1057 aggrs: Vec<AggregateFunc>,
1058 ) -> (
1059 Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1060 Arranged<S, TraceAgent<Tr>>,
1061 )
1062 where
1063 S: Scope<Timestamp = G::Timestamp>,
1064 Tr: for<'a> Trace<
1065 Key<'a> = DatumSeq<'a>,
1066 KeyOwn = Row,
1067 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1068 Time = G::Timestamp,
1069 Diff = Diff,
1070 > + 'static,
1071 Bu: Builder<
1072 Time = G::Timestamp,
1073 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1074 Output = Tr::Batch,
1075 >,
1076 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1077 {
1078 let error_logger = self.error_logger();
1079 let arranged_input = input
1082 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1083 "Arranged MinsMaxesHierarchical input",
1084 );
1085
1086 let reduced = arranged_input.mz_reduce_abelian::<_, Bu, Tr>(
1087 "Reduced Fallibly MinsMaxesHierarchical",
1088 move |key, source, target| {
1089 if let Some(err) = Tr::ValOwn::into_error() {
1090 for (value, count) in source.iter() {
1092 if count.is_positive() {
1093 continue;
1094 }
1095 error_logger.log(
1096 "Non-positive accumulation in MinsMaxesHierarchical",
1097 &format!("key={key:?}, value={value:?}, count={count}"),
1098 );
1099 target.push((err(Tr::owned_key(key)), Diff::ONE));
1102 return;
1103 }
1104 }
1105
1106 let mut row_builder = SharedRow::get();
1107 let mut row_packer = row_builder.packer();
1108
1109 let mut source_iters = source
1110 .iter()
1111 .map(|(values, _cnt)| *values)
1112 .collect::<Vec<_>>();
1113 for func in aggrs.iter() {
1114 let column_iter =
1115 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1116 row_packer.push(func.eval(column_iter, &RowArena::new()));
1117 }
1118 target.reserve(source.len().saturating_add(1));
1124 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1125 target.extend(source.iter().map(|(values, cnt)| {
1126 let mut cnt = *cnt;
1127 cnt.negate();
1128 (Tr::ValOwn::ok(values.to_row()), cnt)
1129 }));
1130 },
1131 );
1132 (arranged_input, reduced)
1133 }
1134
1135 fn build_monotonic<S>(
1138 &self,
1139 collection: VecCollection<S, (Row, Row), Diff>,
1140 MonotonicPlan {
1141 aggr_funcs,
1142 must_consolidate,
1143 }: MonotonicPlan,
1144 mfp_after: Option<SafeMfpPlan>,
1145 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1146 where
1147 S: Scope<Timestamp = G::Timestamp>,
1148 {
1149 let aggregations = aggr_funcs.len();
1150 let collection = collection
1152 .map(move |(key, row)| {
1153 let mut row_builder = SharedRow::get();
1154 let mut values = Vec::with_capacity(aggregations);
1155 values.extend(
1156 row.iter()
1157 .take(aggregations)
1158 .map(|v| row_builder.pack_using(std::iter::once(v))),
1159 );
1160
1161 (key, values)
1162 })
1163 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1164 must_consolidate,
1165 "Consolidated ReduceMonotonic input",
1166 );
1167
1168 let error_logger = self.error_logger();
1170 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1171 error_logger.log(
1172 "Non-monotonic input to ReduceMonotonic",
1173 &format!("data={data:?}, diff={diff}"),
1174 );
1175 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1176 (EvalError::Internal(m).into(), Diff::ONE)
1177 });
1178 let partial = partial.explode_one(move |(key, values)| {
1182 let mut output = Vec::new();
1183 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1184 output.push(monoids::get_monoid(row, func).expect(
1185 "hierarchical aggregations are expected to have monoid implementations",
1186 ));
1187 }
1188 (key, output)
1189 });
1190
1191 let mut datums1 = DatumVec::new();
1193 let mut datums2 = DatumVec::new();
1194 let mfp_after1 = mfp_after.clone();
1195 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1196
1197 let partial: KeyCollection<_, _, _> = partial.into();
1198 let arranged = partial
1199 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1200 "ArrangeMonotonic [val: empty]",
1201 );
1202 let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1203 "ReduceMonotonic",
1204 {
1205 move |key, input, output| {
1206 let temp_storage = RowArena::new();
1207 let datum_iter = key.to_datum_iter();
1208 let mut datums_local = datums1.borrow();
1209 datums_local.extend(datum_iter);
1210 let key_len = datums_local.len();
1211 let accum = &input[0].1;
1212 for monoid in accum.iter() {
1213 datums_local.extend(monoid.finalize().iter());
1214 }
1215
1216 if let Some(row) =
1217 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1218 {
1219 output.push((row, Diff::ONE));
1220 }
1221 }
1222 },
1223 );
1224
1225 if let Some(mfp) = mfp_after2 {
1230 let mfp_errs = arranged
1231 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1232 "ReduceMonotonic Error Check",
1233 move |key, input, output| {
1234 let temp_storage = RowArena::new();
1235 let datum_iter = key.to_datum_iter();
1236 let mut datums_local = datums2.borrow();
1237 datums_local.extend(datum_iter);
1238 let accum = &input[0].1;
1239 for monoid in accum.iter() {
1240 datums_local.extend(monoid.finalize().iter());
1241 }
1242 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1243 {
1244 output.push((e.into(), Diff::ONE));
1245 }
1246 },
1247 )
1248 .as_collection(|_k, v| v.clone());
1249 (output, validation_errs.concat(&mfp_errs))
1250 } else {
1251 (output, validation_errs)
1252 }
1253 }
1254
1255 fn build_accumulable<S>(
1262 &self,
1263 collection: VecCollection<S, (Row, Row), Diff>,
1264 AccumulablePlan {
1265 full_aggrs,
1266 simple_aggrs,
1267 distinct_aggrs,
1268 }: AccumulablePlan,
1269 key_arity: usize,
1270 mfp_after: Option<SafeMfpPlan>,
1271 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1272 where
1273 S: Scope<Timestamp = G::Timestamp>,
1274 {
1275 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1277 self.error_logger().soft_panic_or_log(
1278 "Incorrect numbers of aggregates in accummulable reduction rendering",
1279 &format!(
1280 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1281 full_aggrs.len(),
1282 simple_aggrs.len(),
1283 distinct_aggrs.len(),
1284 ),
1285 );
1286 }
1287
1288 let zero_diffs: (Vec<_>, Diff) = (
1300 full_aggrs
1301 .iter()
1302 .map(|f| accumulable_zero(&f.func))
1303 .collect(),
1304 Diff::ZERO,
1305 );
1306
1307 let mut to_aggregate = Vec::new();
1308 if simple_aggrs.len() > 0 {
1309 let easy_cases = collection.explode_one({
1311 let zero_diffs = zero_diffs.clone();
1312 move |(key, row)| {
1313 let mut diffs = zero_diffs.clone();
1314 let mut row_iter = row.iter().enumerate();
1320 for (datum_index, aggr) in simple_aggrs.iter() {
1321 let mut datum = row_iter.next().unwrap();
1322 while datum_index != &datum.0 {
1323 datum = row_iter.next().unwrap();
1324 }
1325 let datum = datum.1;
1326 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1327 diffs.1 = Diff::ONE;
1328 }
1329 ((key, ()), diffs)
1330 }
1331 });
1332 to_aggregate.push(easy_cases);
1333 }
1334
1335 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1337 let pairer = Pairer::new(key_arity);
1338 let collection = collection
1339 .map(move |(key, row)| {
1340 let value = row.iter().nth(datum_index).unwrap();
1341 (pairer.merge(&key, std::iter::once(value)), ())
1342 })
1343 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1344 "Arranged Accumulable Distinct [val: empty]",
1345 )
1346 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1347 "Reduced Accumulable Distinct [val: empty]",
1348 move |_k, _s, t| t.push(((), Diff::ONE)),
1349 )
1350 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1351 .explode_one({
1352 let zero_diffs = zero_diffs.clone();
1353 move |(key, row)| {
1354 let datum = row.iter().next().unwrap();
1355 let mut diffs = zero_diffs.clone();
1356 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1357 diffs.1 = Diff::ONE;
1358 ((key, ()), diffs)
1359 }
1360 });
1361 to_aggregate.push(collection);
1362 }
1363
1364 let collection = if to_aggregate.len() == 1 {
1366 to_aggregate.remove(0)
1367 } else {
1368 differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1369 };
1370
1371 let mut datums1 = DatumVec::new();
1373 let mut datums2 = DatumVec::new();
1374 let mfp_after1 = mfp_after.clone();
1375 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1376 let full_aggrs2 = full_aggrs.clone();
1377
1378 let error_logger = self.error_logger();
1379 let err_full_aggrs = full_aggrs.clone();
1380 let (arranged_output, arranged_errs) = collection
1381 .mz_arrange::<
1382 RowBatcher<_, _>,
1383 RowBuilder<_, _>,
1384 RowSpine<_, (Vec<Accum>, Diff)>,
1385 >("ArrangeAccumulable [val: empty]")
1386 .reduce_pair::<
1387 _,
1388 RowRowBuilder<_, _>,
1389 RowRowSpine<_, _>,
1390 _,
1391 RowErrBuilder<_, _>,
1392 RowErrSpine<_, _>,
1393 >(
1394 "ReduceAccumulable",
1395 "AccumulableErrorCheck",
1396 {
1397 move |key, input, output| {
1398 let (ref accums, total) = input[0].1;
1399
1400 let temp_storage = RowArena::new();
1401 let datum_iter = key.to_datum_iter();
1402 let mut datums_local = datums1.borrow();
1403 datums_local.extend(datum_iter);
1404 let key_len = datums_local.len();
1405 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1406 datums_local.push(finalize_accum(&aggr.func, accum, total));
1407 }
1408
1409 if let Some(row) = evaluate_mfp_after(
1410 &mfp_after1,
1411 &mut datums_local,
1412 &temp_storage,
1413 key_len,
1414 ) {
1415 output.push((row, Diff::ONE));
1416 }
1417 }
1418 },
1419 move |key, input, output| {
1420 let (ref accums, total) = input[0].1;
1421 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1422 if total == Diff::ZERO && !accum.is_zero() {
1425 error_logger.log(
1426 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1427 &format!("aggr={aggr:?}, accum={accum:?}"),
1428 );
1429 let key = key.to_row();
1430 let message = format!(
1431 "Invalid data in source, saw net-zero records for key {key} \
1432 with non-zero accumulation in accumulable aggregate"
1433 );
1434 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1435 }
1436 match (&aggr.func, &accum) {
1437 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1438 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1439 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1440 if accum.is_negative() {
1441 error_logger.log(
1442 "Invalid negative unsigned aggregation in ReduceAccumulable",
1443 &format!("aggr={aggr:?}, accum={accum:?}"),
1444 );
1445 let key = key.to_row();
1446 let message = format!(
1447 "Invalid data in source, saw negative accumulation with \
1448 unsigned type for key {key}"
1449 );
1450 let err =
1451 EvalError::Internal(message.into());
1452 output.push((err.into(), Diff::ONE));
1453 }
1454 }
1455 _ => (), }
1457 }
1458
1459 let Some(mfp) = &mfp_after2 else { return };
1461 let temp_storage = RowArena::new();
1462 let datum_iter = key.to_datum_iter();
1463 let mut datums_local = datums2.borrow();
1464 datums_local.extend(datum_iter);
1465 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1466 datums_local.push(finalize_accum(&aggr.func, accum, total));
1467 }
1468
1469 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1470 output.push((e.into(), Diff::ONE));
1471 }
1472 },
1473 );
1474 (
1475 arranged_output,
1476 arranged_errs.as_collection(|_key, error| error.clone()),
1477 )
1478 }
1479}
1480
1481fn evaluate_mfp_after<'a, 'b>(
1485 mfp_after: &'a Option<SafeMfpPlan>,
1486 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1487 temp_storage: &'a RowArena,
1488 key_len: usize,
1489) -> Option<Row> {
1490 let mut row_builder = SharedRow::get();
1491 if let Some(mfp) = mfp_after {
1494 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1497 Some(row_builder.pack_using(iter.skip(key_len)))
1500 } else {
1501 None
1502 }
1503 } else {
1504 Some(row_builder.pack_using(&datums_local[key_len..]))
1505 }
1506}
1507
1508fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1509 match aggr_func {
1510 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1511 trues: Diff::ZERO,
1512 falses: Diff::ZERO,
1513 },
1514 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1515 accum: AccumCount::ZERO,
1516 pos_infs: Diff::ZERO,
1517 neg_infs: Diff::ZERO,
1518 nans: Diff::ZERO,
1519 non_nulls: Diff::ZERO,
1520 },
1521 AggregateFunc::SumNumeric => Accum::Numeric {
1522 accum: OrderedDecimal(NumericAgg::zero()),
1523 pos_infs: Diff::ZERO,
1524 neg_infs: Diff::ZERO,
1525 nans: Diff::ZERO,
1526 non_nulls: Diff::ZERO,
1527 },
1528 _ => Accum::SimpleNumber {
1529 accum: AccumCount::ZERO,
1530 non_nulls: Diff::ZERO,
1531 },
1532 }
1533}
1534
1535static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1536
1537fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1538 match aggregate_func {
1539 AggregateFunc::Count => Accum::SimpleNumber {
1540 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1542 Diff::ZERO
1543 } else {
1544 Diff::ONE
1545 },
1546 },
1547 AggregateFunc::Any | AggregateFunc::All => match datum {
1548 Datum::True => Accum::Bool {
1549 trues: Diff::ONE,
1550 falses: Diff::ZERO,
1551 },
1552 Datum::Null => Accum::Bool {
1553 trues: Diff::ZERO,
1554 falses: Diff::ZERO,
1555 },
1556 Datum::False => Accum::Bool {
1557 trues: Diff::ZERO,
1558 falses: Diff::ONE,
1559 },
1560 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1561 },
1562 AggregateFunc::Dummy => match datum {
1563 Datum::Dummy => Accum::SimpleNumber {
1564 accum: AccumCount::ZERO,
1565 non_nulls: Diff::ZERO,
1566 },
1567 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1568 },
1569 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1570 let n = match datum {
1571 Datum::Float32(n) => f64::from(*n),
1572 Datum::Float64(n) => *n,
1573 Datum::Null => 0f64,
1574 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1575 };
1576
1577 let nans = Diff::from(n.is_nan());
1578 let pos_infs = Diff::from(n == f64::INFINITY);
1579 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1580 let non_nulls = Diff::from(datum != Datum::Null);
1581
1582 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1585 AccumCount::ZERO
1586 } else {
1587 #[allow(clippy::as_conversions)]
1590 { (n * *FLOAT_SCALE) as i128 }.into()
1591 };
1592
1593 Accum::Float {
1594 accum,
1595 pos_infs,
1596 neg_infs,
1597 nans,
1598 non_nulls,
1599 }
1600 }
1601 AggregateFunc::SumNumeric => match datum {
1602 Datum::Numeric(n) => {
1603 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1604 if n.0.is_negative() {
1605 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1606 } else {
1607 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1608 }
1609 } else if n.0.is_nan() {
1610 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1611 } else {
1612 let mut cx_agg = numeric::cx_agg();
1615 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1616 };
1617
1618 Accum::Numeric {
1619 accum: OrderedDecimal(accum),
1620 pos_infs,
1621 neg_infs,
1622 nans,
1623 non_nulls: Diff::ONE,
1624 }
1625 }
1626 Datum::Null => Accum::Numeric {
1627 accum: OrderedDecimal(NumericAgg::zero()),
1628 pos_infs: Diff::ZERO,
1629 neg_infs: Diff::ZERO,
1630 nans: Diff::ZERO,
1631 non_nulls: Diff::ZERO,
1632 },
1633 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1634 },
1635 _ => {
1636 match datum {
1640 Datum::Int16(i) => Accum::SimpleNumber {
1641 accum: i.into(),
1642 non_nulls: Diff::ONE,
1643 },
1644 Datum::Int32(i) => Accum::SimpleNumber {
1645 accum: i.into(),
1646 non_nulls: Diff::ONE,
1647 },
1648 Datum::Int64(i) => Accum::SimpleNumber {
1649 accum: i.into(),
1650 non_nulls: Diff::ONE,
1651 },
1652 Datum::UInt16(u) => Accum::SimpleNumber {
1653 accum: u.into(),
1654 non_nulls: Diff::ONE,
1655 },
1656 Datum::UInt32(u) => Accum::SimpleNumber {
1657 accum: u.into(),
1658 non_nulls: Diff::ONE,
1659 },
1660 Datum::UInt64(u) => Accum::SimpleNumber {
1661 accum: u.into(),
1662 non_nulls: Diff::ONE,
1663 },
1664 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1665 accum: u64::from(t).into(),
1666 non_nulls: Diff::ONE,
1667 },
1668 Datum::Null => Accum::SimpleNumber {
1669 accum: AccumCount::ZERO,
1670 non_nulls: Diff::ZERO,
1671 },
1672 x => panic!("Accumulating non-integer data: {x:?}"),
1673 }
1674 }
1675 }
1676}
1677
1678fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1679 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1683 Datum::Null
1684 } else {
1685 match (&aggr_func, &accum) {
1686 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1687 Datum::Int64(non_nulls.into_inner())
1688 }
1689 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1690 if falses.is_positive() {
1692 Datum::False
1693 } else if *trues == total {
1694 Datum::True
1695 } else {
1696 Datum::Null
1697 }
1698 }
1699 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1700 if trues.is_positive() {
1702 Datum::True
1703 } else if *falses == total {
1704 Datum::False
1705 } else {
1706 Datum::Null
1707 }
1708 }
1709 (AggregateFunc::Dummy, _) => Datum::Dummy,
1710 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1712 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1713 #[allow(clippy::as_conversions)]
1718 Datum::Int64(accum.into_inner() as i64)
1719 }
1720 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1721 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1722 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1723 if !accum.is_negative() {
1724 #[allow(clippy::as_conversions)]
1730 Datum::UInt64(accum.into_inner() as u64)
1731 } else {
1732 Datum::Null
1736 }
1737 }
1738 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1739 if !accum.is_negative() {
1740 Datum::from(*accum)
1741 } else {
1742 Datum::Null
1746 }
1747 }
1748 (
1749 AggregateFunc::SumFloat32,
1750 Accum::Float {
1751 accum,
1752 pos_infs,
1753 neg_infs,
1754 nans,
1755 non_nulls: _,
1756 },
1757 ) => {
1758 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1759 Datum::from(f32::NAN)
1762 } else if pos_infs.is_positive() {
1763 Datum::from(f32::INFINITY)
1764 } else if neg_infs.is_positive() {
1765 Datum::from(f32::NEG_INFINITY)
1766 } else {
1767 #[allow(clippy::as_conversions)]
1769 {
1770 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1771 }
1772 }
1773 }
1774 (
1775 AggregateFunc::SumFloat64,
1776 Accum::Float {
1777 accum,
1778 pos_infs,
1779 neg_infs,
1780 nans,
1781 non_nulls: _,
1782 },
1783 ) => {
1784 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1785 Datum::from(f64::NAN)
1788 } else if pos_infs.is_positive() {
1789 Datum::from(f64::INFINITY)
1790 } else if neg_infs.is_positive() {
1791 Datum::from(f64::NEG_INFINITY)
1792 } else {
1793 #[allow(clippy::as_conversions)]
1795 {
1796 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1797 }
1798 }
1799 }
1800 (
1801 AggregateFunc::SumNumeric,
1802 Accum::Numeric {
1803 accum,
1804 pos_infs,
1805 neg_infs,
1806 nans,
1807 non_nulls: _,
1808 },
1809 ) => {
1810 let mut cx_datum = numeric::cx_datum();
1811 let d = cx_datum.to_width(accum.0);
1812 let inf_d = d.is_infinite();
1818 let neg_d = d.is_negative();
1819 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1820 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1821 if nans.is_positive() || (pos_inf && neg_inf) {
1822 Datum::from(Numeric::nan())
1825 } else if pos_inf {
1826 Datum::from(Numeric::infinity())
1827 } else if neg_inf {
1828 let mut cx = numeric::cx_datum();
1829 let mut d = Numeric::infinity();
1830 cx.neg(&mut d);
1831 Datum::from(d)
1832 } else {
1833 Datum::from(d)
1834 }
1835 }
1836 _ => panic!(
1837 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1838 aggr_func
1839 ),
1840 }
1841 }
1842}
1843
1844type AccumCount = mz_ore::Overflowing<i128>;
1846
1847#[derive(
1858 Debug,
1859 Clone,
1860 Copy,
1861 PartialEq,
1862 Eq,
1863 PartialOrd,
1864 Ord,
1865 Serialize,
1866 Deserialize
1867)]
1868enum Accum {
1869 Bool {
1871 trues: Diff,
1873 falses: Diff,
1875 },
1876 SimpleNumber {
1878 accum: AccumCount,
1880 non_nulls: Diff,
1882 },
1883 Float {
1885 accum: AccumCount,
1888 pos_infs: Diff,
1890 neg_infs: Diff,
1892 nans: Diff,
1894 non_nulls: Diff,
1896 },
1897 Numeric {
1899 accum: OrderedDecimal<NumericAgg>,
1901 pos_infs: Diff,
1903 neg_infs: Diff,
1905 nans: Diff,
1907 non_nulls: Diff,
1909 },
1910}
1911
1912impl IsZero for Accum {
1913 fn is_zero(&self) -> bool {
1914 match self {
1915 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1916 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1917 Accum::Float {
1918 accum,
1919 pos_infs,
1920 neg_infs,
1921 nans,
1922 non_nulls,
1923 } => {
1924 accum.is_zero()
1925 && pos_infs.is_zero()
1926 && neg_infs.is_zero()
1927 && nans.is_zero()
1928 && non_nulls.is_zero()
1929 }
1930 Accum::Numeric {
1931 accum,
1932 pos_infs,
1933 neg_infs,
1934 nans,
1935 non_nulls,
1936 } => {
1937 accum.0.is_zero()
1938 && pos_infs.is_zero()
1939 && neg_infs.is_zero()
1940 && nans.is_zero()
1941 && non_nulls.is_zero()
1942 }
1943 }
1944 }
1945}
1946
1947impl Semigroup for Accum {
1948 fn plus_equals(&mut self, other: &Accum) {
1949 match (&mut *self, other) {
1950 (
1951 Accum::Bool { trues, falses },
1952 Accum::Bool {
1953 trues: other_trues,
1954 falses: other_falses,
1955 },
1956 ) => {
1957 *trues += other_trues;
1958 *falses += other_falses;
1959 }
1960 (
1961 Accum::SimpleNumber { accum, non_nulls },
1962 Accum::SimpleNumber {
1963 accum: other_accum,
1964 non_nulls: other_non_nulls,
1965 },
1966 ) => {
1967 *accum += other_accum;
1968 *non_nulls += other_non_nulls;
1969 }
1970 (
1971 Accum::Float {
1972 accum,
1973 pos_infs,
1974 neg_infs,
1975 nans,
1976 non_nulls,
1977 },
1978 Accum::Float {
1979 accum: other_accum,
1980 pos_infs: other_pos_infs,
1981 neg_infs: other_neg_infs,
1982 nans: other_nans,
1983 non_nulls: other_non_nulls,
1984 },
1985 ) => {
1986 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1987 warn!("Float accumulator overflow. Incorrect results possible");
1988 accum.wrapping_add(*other_accum)
1989 });
1990 *pos_infs += other_pos_infs;
1991 *neg_infs += other_neg_infs;
1992 *nans += other_nans;
1993 *non_nulls += other_non_nulls;
1994 }
1995 (
1996 Accum::Numeric {
1997 accum,
1998 pos_infs,
1999 neg_infs,
2000 nans,
2001 non_nulls,
2002 },
2003 Accum::Numeric {
2004 accum: other_accum,
2005 pos_infs: other_pos_infs,
2006 neg_infs: other_neg_infs,
2007 nans: other_nans,
2008 non_nulls: other_non_nulls,
2009 },
2010 ) => {
2011 let mut cx_agg = numeric::cx_agg();
2012 cx_agg.add(&mut accum.0, &other_accum.0);
2013 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2019 cx_agg.reduce(&mut accum.0);
2038 *pos_infs += other_pos_infs;
2039 *neg_infs += other_neg_infs;
2040 *nans += other_nans;
2041 *non_nulls += other_non_nulls;
2042 }
2043 (l, r) => unreachable!(
2044 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2045 ),
2046 }
2047 }
2048}
2049
2050impl Multiply<Diff> for Accum {
2051 type Output = Accum;
2052
2053 fn multiply(self, factor: &Diff) -> Accum {
2054 let factor = *factor;
2055 match self {
2056 Accum::Bool { trues, falses } => Accum::Bool {
2057 trues: trues * factor,
2058 falses: falses * factor,
2059 },
2060 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2061 accum: accum * AccumCount::from(factor),
2062 non_nulls: non_nulls * factor,
2063 },
2064 Accum::Float {
2065 accum,
2066 pos_infs,
2067 neg_infs,
2068 nans,
2069 non_nulls,
2070 } => Accum::Float {
2071 accum: accum
2072 .checked_mul(AccumCount::from(factor))
2073 .unwrap_or_else(|| {
2074 warn!("Float accumulator overflow. Incorrect results possible");
2075 accum.wrapping_mul(AccumCount::from(factor))
2076 }),
2077 pos_infs: pos_infs * factor,
2078 neg_infs: neg_infs * factor,
2079 nans: nans * factor,
2080 non_nulls: non_nulls * factor,
2081 },
2082 Accum::Numeric {
2083 accum,
2084 pos_infs,
2085 neg_infs,
2086 nans,
2087 non_nulls,
2088 } => {
2089 let mut cx = numeric::cx_agg();
2090 let mut f = NumericAgg::from(factor.into_inner());
2091 cx.mul(&mut f, &accum.0);
2095 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2101 Accum::Numeric {
2102 accum: OrderedDecimal(f),
2103 pos_infs: pos_infs * factor,
2104 neg_infs: neg_infs * factor,
2105 nans: nans * factor,
2106 non_nulls: non_nulls * factor,
2107 }
2108 }
2109 }
2110 }
2111}
2112
2113impl Columnation for Accum {
2114 type InnerRegion = CopyRegion<Self>;
2115}
2116
2117mod monoids {
2119
2120 use differential_dataflow::containers::{Columnation, Region};
2136 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2137 use mz_expr::AggregateFunc;
2138 use mz_ore::soft_panic_or_log;
2139 use mz_repr::{Datum, Diff, Row};
2140 use serde::{Deserialize, Serialize};
2141
2142 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2144 pub enum ReductionMonoid {
2145 Min(Row),
2146 Max(Row),
2147 }
2148
2149 impl ReductionMonoid {
2150 pub fn finalize(&self) -> &Row {
2151 use ReductionMonoid::*;
2152 match self {
2153 Min(row) | Max(row) => row,
2154 }
2155 }
2156 }
2157
2158 impl Clone for ReductionMonoid {
2159 fn clone(&self) -> Self {
2160 use ReductionMonoid::*;
2161 match self {
2162 Min(row) => Min(row.clone()),
2163 Max(row) => Max(row.clone()),
2164 }
2165 }
2166
2167 fn clone_from(&mut self, source: &Self) {
2168 use ReductionMonoid::*;
2169
2170 let mut row = std::mem::take(match self {
2171 Min(row) | Max(row) => row,
2172 });
2173
2174 let source_row = match source {
2175 Min(row) | Max(row) => row,
2176 };
2177
2178 row.clone_from(source_row);
2179
2180 match source {
2181 Min(_) => *self = Min(row),
2182 Max(_) => *self = Max(row),
2183 }
2184 }
2185 }
2186
2187 impl Multiply<Diff> for ReductionMonoid {
2188 type Output = Self;
2189
2190 fn multiply(self, factor: &Diff) -> Self {
2191 assert!(factor.is_positive());
2196 self
2197 }
2198 }
2199
2200 impl Semigroup for ReductionMonoid {
2201 fn plus_equals(&mut self, rhs: &Self) {
2202 match (self, rhs) {
2203 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2204 let swap = {
2205 let lhs_val = lhs.unpack_first();
2206 let rhs_val = rhs.unpack_first();
2207 match (lhs_val, rhs_val) {
2209 (_, Datum::Null) => false,
2210 (Datum::Null, _) => true,
2211 (lhs, rhs) => rhs < lhs,
2212 }
2213 };
2214 if swap {
2215 lhs.clone_from(rhs);
2216 }
2217 }
2218 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2219 let swap = {
2220 let lhs_val = lhs.unpack_first();
2221 let rhs_val = rhs.unpack_first();
2222 match (lhs_val, rhs_val) {
2224 (_, Datum::Null) => false,
2225 (Datum::Null, _) => true,
2226 (lhs, rhs) => rhs > lhs,
2227 }
2228 };
2229 if swap {
2230 lhs.clone_from(rhs);
2231 }
2232 }
2233 (lhs, rhs) => {
2234 soft_panic_or_log!(
2235 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2236 );
2237 }
2238 }
2239 }
2240 }
2241
2242 impl IsZero for ReductionMonoid {
2243 fn is_zero(&self) -> bool {
2244 false
2250 }
2251 }
2252
2253 impl Columnation for ReductionMonoid {
2254 type InnerRegion = ReductionMonoidRegion;
2255 }
2256
2257 #[derive(Default)]
2261 pub struct ReductionMonoidRegion {
2262 inner: <Row as Columnation>::InnerRegion,
2263 }
2264
2265 impl Region for ReductionMonoidRegion {
2266 type Item = ReductionMonoid;
2267
2268 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2269 use ReductionMonoid::*;
2270 match item {
2271 Min(row) => Min(unsafe { self.inner.copy(row) }),
2272 Max(row) => Max(unsafe { self.inner.copy(row) }),
2273 }
2274 }
2275
2276 fn clear(&mut self) {
2277 self.inner.clear();
2278 }
2279
2280 fn reserve_items<'a, I>(&mut self, items: I)
2281 where
2282 Self: 'a,
2283 I: Iterator<Item = &'a Self::Item> + Clone,
2284 {
2285 self.inner
2286 .reserve_items(items.map(ReductionMonoid::finalize));
2287 }
2288
2289 fn reserve_regions<'a, I>(&mut self, regions: I)
2290 where
2291 Self: 'a,
2292 I: Iterator<Item = &'a Self> + Clone,
2293 {
2294 self.inner.reserve_regions(regions.map(|r| &r.inner));
2295 }
2296
2297 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2298 self.inner.heap_size(callback);
2299 }
2300 }
2301
2302 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2305 match func {
2306 AggregateFunc::MaxNumeric
2307 | AggregateFunc::MaxInt16
2308 | AggregateFunc::MaxInt32
2309 | AggregateFunc::MaxInt64
2310 | AggregateFunc::MaxUInt16
2311 | AggregateFunc::MaxUInt32
2312 | AggregateFunc::MaxUInt64
2313 | AggregateFunc::MaxMzTimestamp
2314 | AggregateFunc::MaxFloat32
2315 | AggregateFunc::MaxFloat64
2316 | AggregateFunc::MaxBool
2317 | AggregateFunc::MaxString
2318 | AggregateFunc::MaxDate
2319 | AggregateFunc::MaxTimestamp
2320 | AggregateFunc::MaxTimestampTz
2321 | AggregateFunc::MaxInterval
2322 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2323 AggregateFunc::MinNumeric
2324 | AggregateFunc::MinInt16
2325 | AggregateFunc::MinInt32
2326 | AggregateFunc::MinInt64
2327 | AggregateFunc::MinUInt16
2328 | AggregateFunc::MinUInt32
2329 | AggregateFunc::MinUInt64
2330 | AggregateFunc::MinMzTimestamp
2331 | AggregateFunc::MinFloat32
2332 | AggregateFunc::MinFloat64
2333 | AggregateFunc::MinBool
2334 | AggregateFunc::MinString
2335 | AggregateFunc::MinDate
2336 | AggregateFunc::MinTimestamp
2337 | AggregateFunc::MinTimestampTz
2338 | AggregateFunc::MinInterval
2339 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2340 AggregateFunc::SumInt16
2341 | AggregateFunc::SumInt32
2342 | AggregateFunc::SumInt64
2343 | AggregateFunc::SumUInt16
2344 | AggregateFunc::SumUInt32
2345 | AggregateFunc::SumUInt64
2346 | AggregateFunc::SumFloat32
2347 | AggregateFunc::SumFloat64
2348 | AggregateFunc::SumNumeric
2349 | AggregateFunc::Count
2350 | AggregateFunc::Any
2351 | AggregateFunc::All
2352 | AggregateFunc::Dummy
2353 | AggregateFunc::JsonbAgg { .. }
2354 | AggregateFunc::JsonbObjectAgg { .. }
2355 | AggregateFunc::MapAgg { .. }
2356 | AggregateFunc::ArrayConcat { .. }
2357 | AggregateFunc::ListConcat { .. }
2358 | AggregateFunc::StringAgg { .. }
2359 | AggregateFunc::RowNumber { .. }
2360 | AggregateFunc::Rank { .. }
2361 | AggregateFunc::DenseRank { .. }
2362 | AggregateFunc::LagLead { .. }
2363 | AggregateFunc::FirstValue { .. }
2364 | AggregateFunc::LastValue { .. }
2365 | AggregateFunc::WindowAggregate { .. }
2366 | AggregateFunc::FusedValueWindowFunc { .. }
2367 | AggregateFunc::FusedWindowAggregate { .. } => None,
2368 }
2369 }
2370}
2371
2372mod window_agg_helpers {
2373 use crate::render::reduce::*;
2374
2375 pub enum OneByOneAggrImpls {
2380 Accumulable(AccumulableOneByOneAggr),
2381 Hierarchical(HierarchicalOneByOneAggr),
2382 Basic(mz_expr::NaiveOneByOneAggr),
2383 }
2384
2385 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2386 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2387 match reduction_type(agg) {
2388 ReductionType::Basic => {
2389 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2390 }
2391 ReductionType::Accumulable => {
2392 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2393 }
2394 ReductionType::Hierarchical => {
2395 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2396 }
2397 }
2398 }
2399
2400 fn give(&mut self, d: &Datum) {
2401 match self {
2402 OneByOneAggrImpls::Basic(i) => i.give(d),
2403 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2404 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2405 }
2406 }
2407
2408 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2409 match self {
2411 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2412 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2413 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2414 }
2415 }
2416 }
2417
2418 pub struct AccumulableOneByOneAggr {
2419 aggr_func: AggregateFunc,
2420 accum: Accum,
2421 total: Diff,
2422 }
2423
2424 impl AccumulableOneByOneAggr {
2425 fn new(aggr_func: &AggregateFunc) -> Self {
2426 AccumulableOneByOneAggr {
2427 aggr_func: aggr_func.clone(),
2428 accum: accumulable_zero(aggr_func),
2429 total: Diff::ZERO,
2430 }
2431 }
2432
2433 fn give(&mut self, d: &Datum) {
2434 self.accum
2435 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2436 self.total += Diff::ONE;
2437 }
2438
2439 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2440 temp_storage.make_datum(|packer| {
2441 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2442 })
2443 }
2444 }
2445
2446 pub struct HierarchicalOneByOneAggr {
2447 aggr_func: AggregateFunc,
2448 monoid: ReductionMonoid,
2451 }
2452
2453 impl HierarchicalOneByOneAggr {
2454 fn new(aggr_func: &AggregateFunc) -> Self {
2455 let mut row_buf = Row::default();
2456 row_buf.packer().push(Datum::Null);
2457 HierarchicalOneByOneAggr {
2458 aggr_func: aggr_func.clone(),
2459 monoid: get_monoid(row_buf, aggr_func)
2460 .expect("aggr_func should be a hierarchical aggregation function"),
2461 }
2462 }
2463
2464 fn give(&mut self, d: &Datum) {
2465 let mut row_buf = Row::default();
2466 row_buf.packer().push(d);
2467 let m = get_monoid(row_buf, &self.aggr_func)
2468 .expect("aggr_func should be a hierarchical aggregation function");
2469 self.monoid.plus_equals(&m);
2470 }
2471
2472 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2473 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2474 }
2475 }
2476}