1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Data, VecCollection};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58 ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59 RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64 G: Scope,
65 G::Timestamp: MzTimestamp + Refines<T>,
66 T: MzTimestamp,
67{
68 pub fn render_reduce(
71 &self,
72 input_key: Option<Vec<MirScalarExpr>>,
73 input: CollectionBundle<G, T>,
74 key_val_plan: KeyValPlan,
75 reduce_plan: ReducePlan,
76 mfp_after: Option<MapFilterProject>,
77 ) -> CollectionBundle<G, T> {
78 let mfp_after = mfp_after.map(|m| {
80 m.into_plan()
81 .expect("MFP planning must succeed")
82 .into_nontemporal()
83 .expect("Fused Reduce MFPs do not have temporal predicates")
84 });
85
86 input.scope().region_named("Reduce", |inner| {
87 let KeyValPlan {
88 mut key_plan,
89 mut val_plan,
90 } = key_val_plan;
91 let key_arity = key_plan.projection.len();
92 let mut datums = DatumVec::new();
93
94 let mut demand = Vec::new();
96 demand.extend(key_plan.demand());
97 demand.extend(val_plan.demand());
98 demand.sort();
99 demand.dedup();
100
101 let mut demand_map = BTreeMap::new();
103 for column in demand.iter() {
104 demand_map.insert(*column, demand_map.len());
105 }
106 let demand_map_len = demand_map.len();
107 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113 input_key.map(|k| (k, None)),
114 max_demand,
115 move |row_datums, time, diff| {
116 let mut row_builder = SharedRow::get();
117 let temp_storage = RowArena::new();
118
119 let mut row_iter = row_datums.drain(..);
120 let mut datums_local = datums.borrow();
121 for skip in skips.iter() {
123 datums_local.push(row_iter.nth(*skip).unwrap());
124 }
125
126 let key =
128 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129 let key = match key {
130 Err(e) => {
131 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132 }
133 Ok(Some(key)) => key.clone(),
134 Ok(None) => panic!("Row expected as no predicate was used"),
135 };
136
137 datums_local.truncate(skips.len());
140 let val =
141 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142 let val = match val {
143 Err(e) => {
144 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145 }
146 Ok(Some(val)) => val.clone(),
147 Ok(None) => panic!("Row expected as no predicate was used"),
148 };
149
150 Some((Ok((key, val)), time.clone(), diff.clone()))
151 },
152 );
153
154 type CB<T> = ConsolidatingContainerBuilder<T>;
156 let (ok, mut err) = key_val_input
157 .as_collection()
158 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160 err = err.concat(err_input);
161
162 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164 .leave_region()
165 })
166 }
167
168 fn render_reduce_plan<S>(
174 &self,
175 plan: ReducePlan,
176 collection: VecCollection<S, (Row, Row), Diff>,
177 err_input: VecCollection<S, DataflowError, Diff>,
178 key_arity: usize,
179 mfp_after: Option<SafeMfpPlan>,
180 ) -> CollectionBundle<S, T>
181 where
182 S: Scope<Timestamp = G::Timestamp>,
183 {
184 let mut errors = Default::default();
185 let arrangement =
186 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188 CollectionBundle::from_columns(
189 0..key_arity,
190 ArrangementFlavor::Local(
191 arrangement,
192 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193 ),
194 )
195 }
196
197 fn render_reduce_plan_inner<S>(
198 &self,
199 plan: ReducePlan,
200 collection: VecCollection<S, (Row, Row), Diff>,
201 errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
202 key_arity: usize,
203 mfp_after: Option<SafeMfpPlan>,
204 ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205 where
206 S: Scope<Timestamp = G::Timestamp>,
207 {
208 let arrangement = match plan {
211 ReducePlan::Distinct => {
214 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215 errors.push(errs);
216 arranged_output
217 }
218 ReducePlan::Accumulable(expr) => {
219 let (arranged_output, errs) =
220 self.build_accumulable(collection, expr, key_arity, mfp_after);
221 errors.push(errs);
222 arranged_output
223 }
224 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226 errors.push(errs);
227 output
228 }
229 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231 errors.push(errs);
232 output
233 }
234 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235 expr,
236 fused_unnest_list,
237 })) => {
238 let validating = !fused_unnest_list;
242 let (output, errs) = self.build_basic_aggregate(
243 collection,
244 0,
245 &expr,
246 validating,
247 key_arity,
248 mfp_after,
249 fused_unnest_list,
250 );
251 if validating {
252 errors.push(errs.expect("validation should have occurred as it was requested"));
253 }
254 output
255 }
256 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257 let (output, errs) =
258 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259 errors.push(errs);
260 output
261 }
262 };
263 arrangement
264 }
265
266 fn build_distinct<S>(
268 &self,
269 collection: VecCollection<S, (Row, Row), Diff>,
270 mfp_after: Option<SafeMfpPlan>,
271 ) -> (
272 Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
273 VecCollection<S, DataflowError, Diff>,
274 )
275 where
276 S: Scope<Timestamp = G::Timestamp>,
277 {
278 let error_logger = self.error_logger();
279
280 let mut datums1 = DatumVec::new();
282 let mut datums2 = DatumVec::new();
283 let mfp_after1 = mfp_after.clone();
284 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
285
286 let (output, errors) = collection
287 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
288 "Arranged DistinctBy",
289 )
290 .reduce_pair::<
291 _,
292 RowRowBuilder<_, _>,
293 RowRowSpine<_, _>,
294 _,
295 RowErrBuilder<_, _>,
296 RowErrSpine<_, _>,
297 >(
298 "DistinctBy",
299 "DistinctByErrorCheck",
300 move |key, _input, output| {
301 let temp_storage = RowArena::new();
302 let mut datums_local = datums1.borrow();
303 datums_local.extend(key.to_datum_iter());
304
305 if mfp_after1
309 .as_ref()
310 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
311 .unwrap_or(Ok(true))
312 == Ok(true)
313 {
314 output.push((Row::default(), Diff::ONE));
318 }
319 },
320 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
321 for (_, count) in input.iter() {
322 if count.is_positive() {
323 continue;
324 }
325 let message = "Non-positive multiplicity in DistinctBy";
326 error_logger.log(message, &format!("row={key:?}, count={count}"));
327 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
328 return;
329 }
330 let Some(mfp) = &mfp_after2 else { return };
332 let temp_storage = RowArena::new();
333 let datum_iter = key.to_datum_iter();
334 let mut datums_local = datums2.borrow();
335 datums_local.extend(datum_iter);
336
337 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
338 output.push((e.into(), Diff::ONE));
339 }
340 },
341 );
342 (output, errors.as_collection(|_k, v| v.clone()))
343 }
344
345 fn build_basic_aggregates<S>(
353 &self,
354 input: VecCollection<S, (Row, Row), Diff>,
355 aggrs: Vec<AggregateExpr>,
356 key_arity: usize,
357 mfp_after: Option<SafeMfpPlan>,
358 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
359 where
360 S: Scope<Timestamp = G::Timestamp>,
361 {
362 if aggrs.len() <= 1 {
365 self.error_logger().soft_panic_or_log(
366 "Too few aggregations when building basic aggregates",
367 &format!("len={}", aggrs.len()),
368 )
369 }
370 let mut err_output = None;
371 let mut to_collect = Vec::new();
372 for (index, aggr) in aggrs.into_iter().enumerate() {
373 let (result, errs) = self.build_basic_aggregate(
374 input.clone(),
375 index,
376 &aggr,
377 err_output.is_none(),
378 key_arity,
379 None,
380 false,
381 );
382 if errs.is_some() {
383 err_output = errs
384 }
385 to_collect
386 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
387 }
388
389 let mut datums1 = DatumVec::new();
391 let mut datums2 = DatumVec::new();
392 let mfp_after1 = mfp_after.clone();
393 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
394
395 let arranged =
396 differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
397 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
398 "Arranged ReduceFuseBasic input",
399 );
400
401 let output = arranged
402 .clone()
403 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
404 move |key, input, output| {
405 let temp_storage = RowArena::new();
406 let datum_iter = key.to_datum_iter();
407 let mut datums_local = datums1.borrow();
408 datums_local.extend(datum_iter);
409 let key_len = datums_local.len();
410
411 for ((_, row), _) in input.iter() {
412 datums_local.push(row.unpack_first());
413 }
414
415 if let Some(row) =
416 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
417 {
418 output.push((row, Diff::ONE));
419 }
420 }
421 });
422 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
427 if let Some(mfp) = mfp_after2 {
428 let mfp_errs = arranged
429 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
430 "ReduceFuseBasic Error Check",
431 move |key, input, output| {
432 let temp_storage = RowArena::new();
435 let datum_iter = key.to_datum_iter();
436 let mut datums_local = datums2.borrow();
437 datums_local.extend(datum_iter);
438
439 for ((_, row), _) in input.iter() {
440 datums_local.push(row.unpack_first());
441 }
442
443 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
444 output.push((e.into(), Diff::ONE));
445 }
446 },
447 )
448 .as_collection(|_, v| v.clone());
449 (output, validation_errs.concat(mfp_errs))
450 } else {
451 (output, validation_errs)
452 }
453 }
454
455 fn build_basic_aggregate<S>(
459 &self,
460 input: VecCollection<S, (Row, Row), Diff>,
461 index: usize,
462 aggr: &AggregateExpr,
463 validating: bool,
464 key_arity: usize,
465 mfp_after: Option<SafeMfpPlan>,
466 fused_unnest_list: bool,
467 ) -> (
468 RowRowArrangement<S>,
469 Option<VecCollection<S, DataflowError, Diff>>,
470 )
471 where
472 S: Scope<Timestamp = G::Timestamp>,
473 {
474 let AggregateExpr {
475 func,
476 expr: _,
477 distinct,
478 } = aggr.clone();
479
480 let mut partial = input.map(move |(key, row)| {
482 let mut row_builder = SharedRow::get();
483 let value = row.iter().nth(index).unwrap();
484 row_builder.packer().push(value);
485 (key, row_builder.clone())
486 });
487
488 let mut err_output = None;
489
490 if distinct {
492 let pairer = Pairer::new(key_arity);
494 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
495 if validating {
496 let (oks, errs) = self
497 .build_reduce_inaccumulable_distinct::<
498 _,
499 RowValBuilder<Result<(), String>, _, _>,
500 RowValSpine<Result<(), String>, _, _>,
501 >(keyed, None)
502 .as_collection(|k, v| {
503 (
504 k.to_row(),
505 v.as_ref()
506 .map(|&()| ())
507 .map_err(|m| m.as_str().into()),
508 )
509 })
510 .map_fallible::<
511 CapacityContainerBuilder<_>,
512 CapacityContainerBuilder<_>,
513 _,
514 _,
515 _,
516 >(
517 "Demux Errors",
518 move |(key_val, result)| match result {
519 Ok(()) => Ok(pairer.split(&key_val)),
520 Err(m) => {
521 Err(EvalError::Internal(m).into())
522 }
523 },
524 );
525 err_output = Some(errs);
526 partial = oks;
527 } else {
528 partial = self
529 .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
530 keyed,
531 Some(" [val: empty]"),
532 )
533 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
534 }
535 }
536
537 let mut datums1 = DatumVec::new();
539 let mut datums2 = DatumVec::new();
540 let mut datums_key_1 = DatumVec::new();
541 let mut datums_key_2 = DatumVec::new();
542 let mfp_after1 = mfp_after.clone();
543 let func2 = func.clone();
544
545 let name = if !fused_unnest_list {
546 "ReduceInaccumulable"
547 } else {
548 "FusedReduceUnnestList"
549 };
550 let arranged = partial
551 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
552 "Arranged {name}"
553 ));
554 let oks = if !fused_unnest_list {
555 arranged
556 .clone()
557 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
558 move |key, source, target| {
559 let iter = source.iter().flat_map(|(v, w)| {
563 let count = usize::try_from(w.into_inner()).unwrap_or(0);
566 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
567 });
568
569 let temp_storage = RowArena::new();
570 let datum_iter = key.to_datum_iter();
571 let mut datums_local = datums1.borrow();
572 datums_local.extend(datum_iter);
573 let key_len = datums_local.len();
574 datums_local.push(
575 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
578 iter,
579 &temp_storage,
580 ),
581 );
582
583 if let Some(row) = evaluate_mfp_after(
584 &mfp_after1,
585 &mut datums_local,
586 &temp_storage,
587 key_len,
588 ) {
589 target.push((row, Diff::ONE));
590 }
591 }
592 })
593 } else {
594 arranged
595 .clone()
596 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
597 move |key, source, target| {
598 let iter = source.iter().flat_map(|(v, w)| {
600 let count = usize::try_from(w.into_inner()).unwrap_or(0);
601 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
602 });
603
604 let temp_storage = RowArena::new();
606 let mut datums_local = datums_key_1.borrow();
607 datums_local.extend(key.to_datum_iter());
608 let key_len = datums_local.len();
609 for datum in func
610 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
611 iter,
612 &temp_storage,
613 )
614 {
615 datums_local.truncate(key_len);
616 datums_local.push(datum);
617 if let Some(row) = evaluate_mfp_after(
618 &mfp_after1,
619 &mut datums_local,
620 &temp_storage,
621 key_len,
622 ) {
623 target.push((row, Diff::ONE));
624 }
625 }
626 }
627 })
628 };
629
630 let must_validate = validating && err_output.is_none();
634 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
635 if must_validate || mfp_after2.is_some() {
636 let error_logger = self.error_logger();
637
638 let errs = if !fused_unnest_list {
639 arranged
640 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
641 &format!("{name} Error Check"),
642 move |key, source, target| {
643 if must_validate {
647 for (value, count) in source.iter() {
648 if count.is_positive() {
649 continue;
650 }
651 let value = value.to_row();
652 let message =
653 "Non-positive accumulation in ReduceInaccumulable";
654 error_logger
655 .log(message, &format!("value={value:?}, count={count}"));
656 let err = EvalError::Internal(message.into());
657 target.push((err.into(), Diff::ONE));
658 return;
659 }
660 }
661
662 let Some(mfp) = &mfp_after2 else { return };
664 let iter = source.iter().flat_map(|&(mut v, ref w)| {
665 let count = usize::try_from(w.into_inner()).unwrap_or(0);
666 std::iter::repeat(v.next().unwrap()).take(count)
669 });
670
671 let temp_storage = RowArena::new();
672 let datum_iter = key.to_datum_iter();
673 let mut datums_local = datums2.borrow();
674 datums_local.extend(datum_iter);
675 datums_local.push(
676 func2.eval_with_fast_window_agg::<
677 _,
678 window_agg_helpers::OneByOneAggrImpls,
679 >(
680 iter, &temp_storage
681 ),
682 );
683 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
684 target.push((e.into(), Diff::ONE));
685 }
686 },
687 )
688 .as_collection(|_, v| v.clone())
689 } else {
690 assert!(!must_validate);
692 let Some(mfp) = mfp_after2 else {
695 unreachable!()
696 };
697 arranged
698 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
699 &format!("{name} Error Check"),
700 move |key, source, target| {
701 let iter = source.iter().flat_map(|&(mut v, ref w)| {
702 let count = usize::try_from(w.into_inner()).unwrap_or(0);
703 std::iter::repeat(v.next().unwrap()).take(count)
706 });
707
708 let temp_storage = RowArena::new();
709 let mut datums_local = datums_key_2.borrow();
710 datums_local.extend(key.to_datum_iter());
711 let key_len = datums_local.len();
712 for datum in func2
713 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
714 iter,
715 &temp_storage,
716 )
717 {
718 datums_local.truncate(key_len);
719 datums_local.push(datum);
720 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
723 {
724 target.push((e.into(), Diff::ONE));
725 }
726 }
727 },
728 )
729 .as_collection(|_, v| v.clone())
730 };
731
732 if let Some(e) = err_output {
733 err_output = Some(e.concat(errs));
734 } else {
735 err_output = Some(errs);
736 }
737 }
738 (oks, err_output)
739 }
740
741 fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
742 &self,
743 input: VecCollection<S, Row, Diff>,
744 name_tag: Option<&str>,
745 ) -> Arranged<S, TraceAgent<Tr>>
746 where
747 S: Scope<Timestamp = G::Timestamp>,
748 Tr: for<'a> Trace<
749 Key<'a> = DatumSeq<'a>,
750 KeyOwn = Row,
751 Time = G::Timestamp,
752 Diff = Diff,
753 ValOwn: Data + MaybeValidatingRow<(), String>,
754 > + 'static,
755 Bu: Builder<
756 Time = G::Timestamp,
757 Input: Container
758 + InternalMerge
759 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
760 Output = Tr::Batch,
761 >,
762 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
763 {
764 let error_logger = self.error_logger();
765
766 let output_name = format!(
767 "ReduceInaccumulable Distinct{}",
768 name_tag.unwrap_or_default()
769 );
770
771 let input: KeyCollection<_, _, _> = input.into();
772 input
773 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
774 "Arranged ReduceInaccumulable Distinct [val: empty]",
775 )
776 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
777 if let Some(err) = Tr::ValOwn::into_error() {
778 for (value, count) in source.iter() {
779 if count.is_positive() {
780 continue;
781 }
782
783 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
784 error_logger.log(message, &format!("value={value:?}, count={count}"));
785 t.push((err(message.to_string()), Diff::ONE));
786 return;
787 }
788 }
789 t.push((Tr::ValOwn::ok(()), Diff::ONE))
790 })
791 }
792
793 fn build_bucketed<S>(
811 &self,
812 input: VecCollection<S, (Row, Row), Diff>,
813 BucketedPlan {
814 aggr_funcs,
815 buckets,
816 }: BucketedPlan,
817 key_arity: usize,
818 mfp_after: Option<SafeMfpPlan>,
819 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
820 where
821 S: Scope<Timestamp = G::Timestamp>,
822 {
823 let mut err_output: Option<VecCollection<S, _, _>> = None;
824 let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
825 let input = input.enter(inner);
826
827 let first_mod = buckets.get(0).copied().unwrap_or(1);
829 let aggregations = aggr_funcs.len();
830
831 let mut stage = input.map(move |(key, row)| {
833 let mut row_builder = SharedRow::get();
834 let mut row_packer = row_builder.packer();
835 row_packer.extend(row.iter().take(aggregations));
836 let values = row_builder.clone();
837
838 let hash = values.hashed() % first_mod;
840 let hash_key =
841 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
842 (hash_key, values)
843 });
844
845 for (index, b) in buckets.into_iter().enumerate() {
847 let input = if index == 0 {
849 stage
850 } else {
851 stage.map(move |(hash_key, values)| {
852 let mut hash_key_iter = hash_key.iter();
853 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
854 let hash_key = SharedRow::pack(
856 std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
857 );
858 (hash_key, values)
859 })
860 };
861
862 let validating = err_output.is_none();
866
867 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
868 if let Some(errs) = errs {
869 err_output = Some(errs.leave_region());
870 }
871 stage = oks
872 }
873
874 let partial = stage.map(move |(hash_key, values)| {
876 let mut hash_key_iter = hash_key.iter();
877 let _hash = hash_key_iter.next();
878 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
879 });
880
881 let mut datums1 = DatumVec::new();
883 let mut datums2 = DatumVec::new();
884 let mfp_after1 = mfp_after.clone();
885 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
886 let aggr_funcs2 = aggr_funcs.clone();
887
888 let error_logger = self.error_logger();
891 let arranged = partial
894 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
895 "Arrange ReduceMinsMaxes",
896 );
897 let must_validate = err_output.is_none();
901 if must_validate || mfp_after2.is_some() {
902 let errs = arranged
903 .clone()
904 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
905 "ReduceMinsMaxes Error Check",
906 move |key, source, target| {
907 if must_validate {
911 for (val, count) in source.iter() {
912 if count.is_positive() {
913 continue;
914 }
915 let val = val.to_row();
916 let message = "Non-positive accumulation in ReduceMinsMaxes";
917 error_logger
918 .log(message, &format!("val={val:?}, count={count}"));
919 target.push((
920 EvalError::Internal(message.into()).into(),
921 Diff::ONE,
922 ));
923 return;
924 }
925 }
926
927 let Some(mfp) = &mfp_after2 else { return };
929 let temp_storage = RowArena::new();
930 let datum_iter = key.to_datum_iter();
931 let mut datums_local = datums2.borrow();
932 datums_local.extend(datum_iter);
933
934 let mut source_iters = source
935 .iter()
936 .map(|(values, _cnt)| *values)
937 .collect::<Vec<_>>();
938 for func in aggr_funcs2.iter() {
939 let column_iter = (0..source_iters.len())
940 .map(|i| source_iters[i].next().unwrap());
941 datums_local.push(func.eval(column_iter, &temp_storage));
942 }
943 if let Result::Err(e) =
944 mfp.evaluate_inner(&mut datums_local, &temp_storage)
945 {
946 target.push((e.into(), Diff::ONE));
947 }
948 },
949 )
950 .as_collection(|_, v| v.clone())
951 .leave_region();
952 if let Some(e) = err_output.take() {
953 err_output = Some(e.concat(errs));
954 } else {
955 err_output = Some(errs);
956 }
957 }
958 arranged
959 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
960 "ReduceMinsMaxes",
961 move |key, source, target| {
962 let temp_storage = RowArena::new();
963 let datum_iter = key.to_datum_iter();
964 let mut datums_local = datums1.borrow();
965 datums_local.extend(datum_iter);
966 let key_len = datums_local.len();
967
968 let mut source_iters = source
969 .iter()
970 .map(|(values, _cnt)| *values)
971 .collect::<Vec<_>>();
972 for func in aggr_funcs.iter() {
973 let column_iter =
974 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
975 datums_local.push(func.eval(column_iter, &temp_storage));
976 }
977
978 if let Some(row) = evaluate_mfp_after(
979 &mfp_after1,
980 &mut datums_local,
981 &temp_storage,
982 key_len,
983 ) {
984 target.push((row, Diff::ONE));
985 }
986 },
987 )
988 .leave_region()
989 });
990 (
991 arranged_output,
992 err_output.expect("expected to validate in one level of the hierarchy"),
993 )
994 }
995
996 fn build_bucketed_stage<S>(
1003 &self,
1004 aggr_funcs: &Vec<AggregateFunc>,
1005 input: VecCollection<S, (Row, Row), Diff>,
1006 validating: bool,
1007 ) -> (
1008 VecCollection<S, (Row, Row), Diff>,
1009 Option<VecCollection<S, DataflowError, Diff>>,
1010 )
1011 where
1012 S: Scope<Timestamp = G::Timestamp>,
1013 {
1014 let (input, negated_output, errs) = if validating {
1015 let (input, reduced) = self
1016 .build_bucketed_negated_output::<
1017 _,
1018 RowValBuilder<_, _, _>,
1019 RowValSpine<Result<Row, Row>, _, _>,
1020 >(
1021 input.clone(),
1022 aggr_funcs.clone(),
1023 );
1024 let (oks, errs) = reduced
1025 .as_collection(|k, v| (k.to_row(), v.clone()))
1026 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1027 "Checked Invalid Accumulations",
1028 |(hash_key, result)| match result {
1029 Err(hash_key) => {
1030 let mut hash_key_iter = hash_key.iter();
1031 let _hash = hash_key_iter.next();
1032 let key = SharedRow::pack(hash_key_iter);
1033 let message = format!(
1034 "Invalid data in source, saw non-positive accumulation \
1035 for key {key:?} in hierarchical mins-maxes aggregate"
1036 );
1037 Err(EvalError::Internal(message.into()).into())
1038 }
1039 Ok(values) => Ok((hash_key, values)),
1040 },
1041 );
1042 (input, oks, Some(errs))
1043 } else {
1044 let (input, reduced) = self
1045 .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1046 input,
1047 aggr_funcs.clone(),
1048 );
1049 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1052 (input, oks, None)
1053 };
1054
1055 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1056 let oks = negated_output.concat(input);
1057 (oks, errs)
1058 }
1059
1060 fn build_bucketed_negated_output<S, Bu, Tr>(
1064 &self,
1065 input: VecCollection<S, (Row, Row), Diff>,
1066 aggrs: Vec<AggregateFunc>,
1067 ) -> (
1068 Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1069 Arranged<S, TraceAgent<Tr>>,
1070 )
1071 where
1072 S: Scope<Timestamp = G::Timestamp>,
1073 Tr: for<'a> Trace<
1074 Key<'a> = DatumSeq<'a>,
1075 KeyOwn = Row,
1076 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1077 Time = G::Timestamp,
1078 Diff = Diff,
1079 > + 'static,
1080 Bu: Builder<
1081 Time = G::Timestamp,
1082 Input: Container
1083 + InternalMerge
1084 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1085 Output = Tr::Batch,
1086 >,
1087 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1088 {
1089 let error_logger = self.error_logger();
1090 let arranged_input = input
1093 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1094 "Arranged MinsMaxesHierarchical input",
1095 );
1096
1097 let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1098 "Reduced Fallibly MinsMaxesHierarchical",
1099 move |key, source, target| {
1100 if let Some(err) = Tr::ValOwn::into_error() {
1101 for (value, count) in source.iter() {
1103 if count.is_positive() {
1104 continue;
1105 }
1106 error_logger.log(
1107 "Non-positive accumulation in MinsMaxesHierarchical",
1108 &format!("key={key:?}, value={value:?}, count={count}"),
1109 );
1110 target.push((err(Tr::owned_key(key)), Diff::ONE));
1113 return;
1114 }
1115 }
1116
1117 let mut row_builder = SharedRow::get();
1118 let mut row_packer = row_builder.packer();
1119
1120 let mut source_iters = source
1121 .iter()
1122 .map(|(values, _cnt)| *values)
1123 .collect::<Vec<_>>();
1124 for func in aggrs.iter() {
1125 let column_iter =
1126 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1127 row_packer.push(func.eval(column_iter, &RowArena::new()));
1128 }
1129 target.reserve(source.len().saturating_add(1));
1135 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1136 target.extend(source.iter().map(|(values, cnt)| {
1137 let mut cnt = *cnt;
1138 cnt.negate();
1139 (Tr::ValOwn::ok(values.to_row()), cnt)
1140 }));
1141 },
1142 );
1143 (arranged_input, reduced)
1144 }
1145
1146 fn build_monotonic<S>(
1149 &self,
1150 collection: VecCollection<S, (Row, Row), Diff>,
1151 MonotonicPlan {
1152 aggr_funcs,
1153 must_consolidate,
1154 }: MonotonicPlan,
1155 mfp_after: Option<SafeMfpPlan>,
1156 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1157 where
1158 S: Scope<Timestamp = G::Timestamp>,
1159 {
1160 let aggregations = aggr_funcs.len();
1161 let collection = collection
1163 .map(move |(key, row)| {
1164 let mut row_builder = SharedRow::get();
1165 let mut values = Vec::with_capacity(aggregations);
1166 values.extend(
1167 row.iter()
1168 .take(aggregations)
1169 .map(|v| row_builder.pack_using(std::iter::once(v))),
1170 );
1171
1172 (key, values)
1173 })
1174 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1175 must_consolidate,
1176 "Consolidated ReduceMonotonic input",
1177 );
1178
1179 let error_logger = self.error_logger();
1181 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1182 error_logger.log(
1183 "Non-monotonic input to ReduceMonotonic",
1184 &format!("data={data:?}, diff={diff}"),
1185 );
1186 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1187 (EvalError::Internal(m).into(), Diff::ONE)
1188 });
1189 let partial = partial.explode_one(move |(key, values)| {
1193 let mut output = Vec::new();
1194 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1195 output.push(monoids::get_monoid(row, func).expect(
1196 "hierarchical aggregations are expected to have monoid implementations",
1197 ));
1198 }
1199 (key, output)
1200 });
1201
1202 let mut datums1 = DatumVec::new();
1204 let mut datums2 = DatumVec::new();
1205 let mfp_after1 = mfp_after.clone();
1206 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1207
1208 let partial: KeyCollection<_, _, _> = partial.into();
1209 let arranged = partial
1210 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1211 "ArrangeMonotonic [val: empty]",
1212 );
1213 let output = arranged
1214 .clone()
1215 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1216 move |key, input, output| {
1217 let temp_storage = RowArena::new();
1218 let datum_iter = key.to_datum_iter();
1219 let mut datums_local = datums1.borrow();
1220 datums_local.extend(datum_iter);
1221 let key_len = datums_local.len();
1222 let accum = &input[0].1;
1223 for monoid in accum.iter() {
1224 datums_local.extend(monoid.finalize().iter());
1225 }
1226
1227 if let Some(row) =
1228 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1229 {
1230 output.push((row, Diff::ONE));
1231 }
1232 }
1233 });
1234
1235 if let Some(mfp) = mfp_after2 {
1240 let mfp_errs = arranged
1241 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1242 "ReduceMonotonic Error Check",
1243 move |key, input, output| {
1244 let temp_storage = RowArena::new();
1245 let datum_iter = key.to_datum_iter();
1246 let mut datums_local = datums2.borrow();
1247 datums_local.extend(datum_iter);
1248 let accum = &input[0].1;
1249 for monoid in accum.iter() {
1250 datums_local.extend(monoid.finalize().iter());
1251 }
1252 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1253 {
1254 output.push((e.into(), Diff::ONE));
1255 }
1256 },
1257 )
1258 .as_collection(|_k, v| v.clone());
1259 (output, validation_errs.concat(mfp_errs))
1260 } else {
1261 (output, validation_errs)
1262 }
1263 }
1264
1265 fn build_accumulable<S>(
1272 &self,
1273 collection: VecCollection<S, (Row, Row), Diff>,
1274 AccumulablePlan {
1275 full_aggrs,
1276 simple_aggrs,
1277 distinct_aggrs,
1278 }: AccumulablePlan,
1279 key_arity: usize,
1280 mfp_after: Option<SafeMfpPlan>,
1281 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1282 where
1283 S: Scope<Timestamp = G::Timestamp>,
1284 {
1285 let mut collection_scope = collection.scope();
1286
1287 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1289 self.error_logger().soft_panic_or_log(
1290 "Incorrect numbers of aggregates in accummulable reduction rendering",
1291 &format!(
1292 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1293 full_aggrs.len(),
1294 simple_aggrs.len(),
1295 distinct_aggrs.len(),
1296 ),
1297 );
1298 }
1299
1300 let zero_diffs: (Vec<_>, Diff) = (
1312 full_aggrs
1313 .iter()
1314 .map(|f| accumulable_zero(&f.func))
1315 .collect(),
1316 Diff::ZERO,
1317 );
1318
1319 let mut to_aggregate = Vec::new();
1320 if simple_aggrs.len() > 0 {
1321 let collection = collection.clone();
1323 let easy_cases = collection.explode_one({
1324 let zero_diffs = zero_diffs.clone();
1325 move |(key, row)| {
1326 let mut diffs = zero_diffs.clone();
1327 let mut row_iter = row.iter().enumerate();
1333 for (datum_index, aggr) in simple_aggrs.iter() {
1334 let mut datum = row_iter.next().unwrap();
1335 while datum_index != &datum.0 {
1336 datum = row_iter.next().unwrap();
1337 }
1338 let datum = datum.1;
1339 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1340 diffs.1 = Diff::ONE;
1341 }
1342 ((key, ()), diffs)
1343 }
1344 });
1345 to_aggregate.push(easy_cases);
1346 }
1347
1348 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1350 let pairer = Pairer::new(key_arity);
1351 let collection = collection
1352 .clone()
1353 .map(move |(key, row)| {
1354 let value = row.iter().nth(datum_index).unwrap();
1355 (pairer.merge(&key, std::iter::once(value)), ())
1356 })
1357 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1358 "Arranged Accumulable Distinct [val: empty]",
1359 )
1360 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1361 "Reduced Accumulable Distinct [val: empty]",
1362 move |_k, _s, t| t.push(((), Diff::ONE)),
1363 )
1364 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1365 .explode_one({
1366 let zero_diffs = zero_diffs.clone();
1367 move |(key, row)| {
1368 let datum = row.iter().next().unwrap();
1369 let mut diffs = zero_diffs.clone();
1370 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1371 diffs.1 = Diff::ONE;
1372 ((key, ()), diffs)
1373 }
1374 });
1375 to_aggregate.push(collection);
1376 }
1377
1378 let collection = if to_aggregate.len() == 1 {
1380 to_aggregate.remove(0)
1381 } else {
1382 differential_dataflow::collection::concatenate(&mut collection_scope, to_aggregate)
1383 };
1384
1385 let mut datums1 = DatumVec::new();
1387 let mut datums2 = DatumVec::new();
1388 let mfp_after1 = mfp_after.clone();
1389 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1390 let full_aggrs2 = full_aggrs.clone();
1391
1392 let error_logger = self.error_logger();
1393 let err_full_aggrs = full_aggrs.clone();
1394 let (arranged_output, arranged_errs) = collection
1395 .mz_arrange::<
1396 RowBatcher<_, _>,
1397 RowBuilder<_, _>,
1398 RowSpine<_, (Vec<Accum>, Diff)>,
1399 >("ArrangeAccumulable [val: empty]")
1400 .reduce_pair::<
1401 _,
1402 RowRowBuilder<_, _>,
1403 RowRowSpine<_, _>,
1404 _,
1405 RowErrBuilder<_, _>,
1406 RowErrSpine<_, _>,
1407 >(
1408 "ReduceAccumulable",
1409 "AccumulableErrorCheck",
1410 {
1411 move |key, input, output| {
1412 let (ref accums, total) = input[0].1;
1413
1414 let temp_storage = RowArena::new();
1415 let datum_iter = key.to_datum_iter();
1416 let mut datums_local = datums1.borrow();
1417 datums_local.extend(datum_iter);
1418 let key_len = datums_local.len();
1419 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1420 datums_local.push(finalize_accum(&aggr.func, accum, total));
1421 }
1422
1423 if let Some(row) = evaluate_mfp_after(
1424 &mfp_after1,
1425 &mut datums_local,
1426 &temp_storage,
1427 key_len,
1428 ) {
1429 output.push((row, Diff::ONE));
1430 }
1431 }
1432 },
1433 move |key, input, output| {
1434 let (ref accums, total) = input[0].1;
1435 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1436 if total == Diff::ZERO && !accum.is_zero() {
1439 error_logger.log(
1440 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1441 &format!("aggr={aggr:?}, accum={accum:?}"),
1442 );
1443 let key = key.to_row();
1444 let message = format!(
1445 "Invalid data in source, saw net-zero records for key {key} \
1446 with non-zero accumulation in accumulable aggregate"
1447 );
1448 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1449 }
1450 match (&aggr.func, &accum) {
1451 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1452 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1453 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1454 if accum.is_negative() {
1455 error_logger.log(
1456 "Invalid negative unsigned aggregation in ReduceAccumulable",
1457 &format!("aggr={aggr:?}, accum={accum:?}"),
1458 );
1459 let key = key.to_row();
1460 let message = format!(
1461 "Invalid data in source, saw negative accumulation with \
1462 unsigned type for key {key}"
1463 );
1464 let err =
1465 EvalError::Internal(message.into());
1466 output.push((err.into(), Diff::ONE));
1467 }
1468 }
1469 _ => (), }
1471 }
1472
1473 let Some(mfp) = &mfp_after2 else { return };
1475 let temp_storage = RowArena::new();
1476 let datum_iter = key.to_datum_iter();
1477 let mut datums_local = datums2.borrow();
1478 datums_local.extend(datum_iter);
1479 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1480 datums_local.push(finalize_accum(&aggr.func, accum, total));
1481 }
1482
1483 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1484 output.push((e.into(), Diff::ONE));
1485 }
1486 },
1487 );
1488 (
1489 arranged_output,
1490 arranged_errs.as_collection(|_key, error| error.clone()),
1491 )
1492 }
1493}
1494
1495fn evaluate_mfp_after<'a, 'b>(
1499 mfp_after: &'a Option<SafeMfpPlan>,
1500 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1501 temp_storage: &'a RowArena,
1502 key_len: usize,
1503) -> Option<Row> {
1504 let mut row_builder = SharedRow::get();
1505 if let Some(mfp) = mfp_after {
1508 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1511 Some(row_builder.pack_using(iter.skip(key_len)))
1514 } else {
1515 None
1516 }
1517 } else {
1518 Some(row_builder.pack_using(&datums_local[key_len..]))
1519 }
1520}
1521
1522fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1523 match aggr_func {
1524 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1525 trues: Diff::ZERO,
1526 falses: Diff::ZERO,
1527 },
1528 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1529 accum: AccumCount::ZERO,
1530 pos_infs: Diff::ZERO,
1531 neg_infs: Diff::ZERO,
1532 nans: Diff::ZERO,
1533 non_nulls: Diff::ZERO,
1534 },
1535 AggregateFunc::SumNumeric => Accum::Numeric {
1536 accum: OrderedDecimal(NumericAgg::zero()),
1537 pos_infs: Diff::ZERO,
1538 neg_infs: Diff::ZERO,
1539 nans: Diff::ZERO,
1540 non_nulls: Diff::ZERO,
1541 },
1542 _ => Accum::SimpleNumber {
1543 accum: AccumCount::ZERO,
1544 non_nulls: Diff::ZERO,
1545 },
1546 }
1547}
1548
1549static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1550
1551fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1552 match aggregate_func {
1553 AggregateFunc::Count => Accum::SimpleNumber {
1554 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1556 Diff::ZERO
1557 } else {
1558 Diff::ONE
1559 },
1560 },
1561 AggregateFunc::Any | AggregateFunc::All => match datum {
1562 Datum::True => Accum::Bool {
1563 trues: Diff::ONE,
1564 falses: Diff::ZERO,
1565 },
1566 Datum::Null => Accum::Bool {
1567 trues: Diff::ZERO,
1568 falses: Diff::ZERO,
1569 },
1570 Datum::False => Accum::Bool {
1571 trues: Diff::ZERO,
1572 falses: Diff::ONE,
1573 },
1574 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1575 },
1576 AggregateFunc::Dummy => match datum {
1577 Datum::Dummy => Accum::SimpleNumber {
1578 accum: AccumCount::ZERO,
1579 non_nulls: Diff::ZERO,
1580 },
1581 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1582 },
1583 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1584 let n = match datum {
1585 Datum::Float32(n) => f64::from(*n),
1586 Datum::Float64(n) => *n,
1587 Datum::Null => 0f64,
1588 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1589 };
1590
1591 let nans = Diff::from(n.is_nan());
1592 let pos_infs = Diff::from(n == f64::INFINITY);
1593 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1594 let non_nulls = Diff::from(datum != Datum::Null);
1595
1596 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1599 AccumCount::ZERO
1600 } else {
1601 #[allow(clippy::as_conversions)]
1604 { (n * *FLOAT_SCALE) as i128 }.into()
1605 };
1606
1607 Accum::Float {
1608 accum,
1609 pos_infs,
1610 neg_infs,
1611 nans,
1612 non_nulls,
1613 }
1614 }
1615 AggregateFunc::SumNumeric => match datum {
1616 Datum::Numeric(n) => {
1617 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1618 if n.0.is_negative() {
1619 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1620 } else {
1621 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1622 }
1623 } else if n.0.is_nan() {
1624 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1625 } else {
1626 let mut cx_agg = numeric::cx_agg();
1629 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1630 };
1631
1632 Accum::Numeric {
1633 accum: OrderedDecimal(accum),
1634 pos_infs,
1635 neg_infs,
1636 nans,
1637 non_nulls: Diff::ONE,
1638 }
1639 }
1640 Datum::Null => Accum::Numeric {
1641 accum: OrderedDecimal(NumericAgg::zero()),
1642 pos_infs: Diff::ZERO,
1643 neg_infs: Diff::ZERO,
1644 nans: Diff::ZERO,
1645 non_nulls: Diff::ZERO,
1646 },
1647 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1648 },
1649 _ => {
1650 match datum {
1654 Datum::Int16(i) => Accum::SimpleNumber {
1655 accum: i.into(),
1656 non_nulls: Diff::ONE,
1657 },
1658 Datum::Int32(i) => Accum::SimpleNumber {
1659 accum: i.into(),
1660 non_nulls: Diff::ONE,
1661 },
1662 Datum::Int64(i) => Accum::SimpleNumber {
1663 accum: i.into(),
1664 non_nulls: Diff::ONE,
1665 },
1666 Datum::UInt16(u) => Accum::SimpleNumber {
1667 accum: u.into(),
1668 non_nulls: Diff::ONE,
1669 },
1670 Datum::UInt32(u) => Accum::SimpleNumber {
1671 accum: u.into(),
1672 non_nulls: Diff::ONE,
1673 },
1674 Datum::UInt64(u) => Accum::SimpleNumber {
1675 accum: u.into(),
1676 non_nulls: Diff::ONE,
1677 },
1678 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1679 accum: u64::from(t).into(),
1680 non_nulls: Diff::ONE,
1681 },
1682 Datum::Null => Accum::SimpleNumber {
1683 accum: AccumCount::ZERO,
1684 non_nulls: Diff::ZERO,
1685 },
1686 x => panic!("Accumulating non-integer data: {x:?}"),
1687 }
1688 }
1689 }
1690}
1691
1692fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1693 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1697 Datum::Null
1698 } else {
1699 match (&aggr_func, &accum) {
1700 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1701 Datum::Int64(non_nulls.into_inner())
1702 }
1703 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1704 if falses.is_positive() {
1706 Datum::False
1707 } else if *trues == total {
1708 Datum::True
1709 } else {
1710 Datum::Null
1711 }
1712 }
1713 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1714 if trues.is_positive() {
1716 Datum::True
1717 } else if *falses == total {
1718 Datum::False
1719 } else {
1720 Datum::Null
1721 }
1722 }
1723 (AggregateFunc::Dummy, _) => Datum::Dummy,
1724 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1726 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1727 #[allow(clippy::as_conversions)]
1732 Datum::Int64(accum.into_inner() as i64)
1733 }
1734 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1735 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1736 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1737 if !accum.is_negative() {
1738 #[allow(clippy::as_conversions)]
1744 Datum::UInt64(accum.into_inner() as u64)
1745 } else {
1746 Datum::Null
1750 }
1751 }
1752 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1753 if !accum.is_negative() {
1754 Datum::from(*accum)
1755 } else {
1756 Datum::Null
1760 }
1761 }
1762 (
1763 AggregateFunc::SumFloat32,
1764 Accum::Float {
1765 accum,
1766 pos_infs,
1767 neg_infs,
1768 nans,
1769 non_nulls: _,
1770 },
1771 ) => {
1772 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1773 Datum::from(f32::NAN)
1776 } else if pos_infs.is_positive() {
1777 Datum::from(f32::INFINITY)
1778 } else if neg_infs.is_positive() {
1779 Datum::from(f32::NEG_INFINITY)
1780 } else {
1781 #[allow(clippy::as_conversions)]
1783 {
1784 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1785 }
1786 }
1787 }
1788 (
1789 AggregateFunc::SumFloat64,
1790 Accum::Float {
1791 accum,
1792 pos_infs,
1793 neg_infs,
1794 nans,
1795 non_nulls: _,
1796 },
1797 ) => {
1798 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1799 Datum::from(f64::NAN)
1802 } else if pos_infs.is_positive() {
1803 Datum::from(f64::INFINITY)
1804 } else if neg_infs.is_positive() {
1805 Datum::from(f64::NEG_INFINITY)
1806 } else {
1807 #[allow(clippy::as_conversions)]
1809 {
1810 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1811 }
1812 }
1813 }
1814 (
1815 AggregateFunc::SumNumeric,
1816 Accum::Numeric {
1817 accum,
1818 pos_infs,
1819 neg_infs,
1820 nans,
1821 non_nulls: _,
1822 },
1823 ) => {
1824 let mut cx_datum = numeric::cx_datum();
1825 let d = cx_datum.to_width(accum.0);
1826 let inf_d = d.is_infinite();
1832 let neg_d = d.is_negative();
1833 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1834 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1835 if nans.is_positive() || (pos_inf && neg_inf) {
1836 Datum::from(Numeric::nan())
1839 } else if pos_inf {
1840 Datum::from(Numeric::infinity())
1841 } else if neg_inf {
1842 let mut cx = numeric::cx_datum();
1843 let mut d = Numeric::infinity();
1844 cx.neg(&mut d);
1845 Datum::from(d)
1846 } else {
1847 Datum::from(d)
1848 }
1849 }
1850 _ => panic!(
1851 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1852 aggr_func
1853 ),
1854 }
1855 }
1856}
1857
1858type AccumCount = mz_ore::Overflowing<i128>;
1860
1861#[derive(
1872 Debug,
1873 Clone,
1874 Copy,
1875 PartialEq,
1876 Eq,
1877 PartialOrd,
1878 Ord,
1879 Serialize,
1880 Deserialize
1881)]
1882enum Accum {
1883 Bool {
1885 trues: Diff,
1887 falses: Diff,
1889 },
1890 SimpleNumber {
1892 accum: AccumCount,
1894 non_nulls: Diff,
1896 },
1897 Float {
1899 accum: AccumCount,
1902 pos_infs: Diff,
1904 neg_infs: Diff,
1906 nans: Diff,
1908 non_nulls: Diff,
1910 },
1911 Numeric {
1913 accum: OrderedDecimal<NumericAgg>,
1915 pos_infs: Diff,
1917 neg_infs: Diff,
1919 nans: Diff,
1921 non_nulls: Diff,
1923 },
1924}
1925
1926impl IsZero for Accum {
1927 fn is_zero(&self) -> bool {
1928 match self {
1929 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1930 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1931 Accum::Float {
1932 accum,
1933 pos_infs,
1934 neg_infs,
1935 nans,
1936 non_nulls,
1937 } => {
1938 accum.is_zero()
1939 && pos_infs.is_zero()
1940 && neg_infs.is_zero()
1941 && nans.is_zero()
1942 && non_nulls.is_zero()
1943 }
1944 Accum::Numeric {
1945 accum,
1946 pos_infs,
1947 neg_infs,
1948 nans,
1949 non_nulls,
1950 } => {
1951 accum.0.is_zero()
1952 && pos_infs.is_zero()
1953 && neg_infs.is_zero()
1954 && nans.is_zero()
1955 && non_nulls.is_zero()
1956 }
1957 }
1958 }
1959}
1960
1961impl Semigroup for Accum {
1962 fn plus_equals(&mut self, other: &Accum) {
1963 match (&mut *self, other) {
1964 (
1965 Accum::Bool { trues, falses },
1966 Accum::Bool {
1967 trues: other_trues,
1968 falses: other_falses,
1969 },
1970 ) => {
1971 *trues += other_trues;
1972 *falses += other_falses;
1973 }
1974 (
1975 Accum::SimpleNumber { accum, non_nulls },
1976 Accum::SimpleNumber {
1977 accum: other_accum,
1978 non_nulls: other_non_nulls,
1979 },
1980 ) => {
1981 *accum += other_accum;
1982 *non_nulls += other_non_nulls;
1983 }
1984 (
1985 Accum::Float {
1986 accum,
1987 pos_infs,
1988 neg_infs,
1989 nans,
1990 non_nulls,
1991 },
1992 Accum::Float {
1993 accum: other_accum,
1994 pos_infs: other_pos_infs,
1995 neg_infs: other_neg_infs,
1996 nans: other_nans,
1997 non_nulls: other_non_nulls,
1998 },
1999 ) => {
2000 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2001 warn!("Float accumulator overflow. Incorrect results possible");
2002 accum.wrapping_add(*other_accum)
2003 });
2004 *pos_infs += other_pos_infs;
2005 *neg_infs += other_neg_infs;
2006 *nans += other_nans;
2007 *non_nulls += other_non_nulls;
2008 }
2009 (
2010 Accum::Numeric {
2011 accum,
2012 pos_infs,
2013 neg_infs,
2014 nans,
2015 non_nulls,
2016 },
2017 Accum::Numeric {
2018 accum: other_accum,
2019 pos_infs: other_pos_infs,
2020 neg_infs: other_neg_infs,
2021 nans: other_nans,
2022 non_nulls: other_non_nulls,
2023 },
2024 ) => {
2025 let mut cx_agg = numeric::cx_agg();
2026 cx_agg.add(&mut accum.0, &other_accum.0);
2027 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2033 cx_agg.reduce(&mut accum.0);
2052 *pos_infs += other_pos_infs;
2053 *neg_infs += other_neg_infs;
2054 *nans += other_nans;
2055 *non_nulls += other_non_nulls;
2056 }
2057 (l, r) => unreachable!(
2058 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2059 ),
2060 }
2061 }
2062}
2063
2064impl Multiply<Diff> for Accum {
2065 type Output = Accum;
2066
2067 fn multiply(self, factor: &Diff) -> Accum {
2068 let factor = *factor;
2069 match self {
2070 Accum::Bool { trues, falses } => Accum::Bool {
2071 trues: trues * factor,
2072 falses: falses * factor,
2073 },
2074 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2075 accum: accum * AccumCount::from(factor),
2076 non_nulls: non_nulls * factor,
2077 },
2078 Accum::Float {
2079 accum,
2080 pos_infs,
2081 neg_infs,
2082 nans,
2083 non_nulls,
2084 } => Accum::Float {
2085 accum: accum
2086 .checked_mul(AccumCount::from(factor))
2087 .unwrap_or_else(|| {
2088 warn!("Float accumulator overflow. Incorrect results possible");
2089 accum.wrapping_mul(AccumCount::from(factor))
2090 }),
2091 pos_infs: pos_infs * factor,
2092 neg_infs: neg_infs * factor,
2093 nans: nans * factor,
2094 non_nulls: non_nulls * factor,
2095 },
2096 Accum::Numeric {
2097 accum,
2098 pos_infs,
2099 neg_infs,
2100 nans,
2101 non_nulls,
2102 } => {
2103 let mut cx = numeric::cx_agg();
2104 let mut f = NumericAgg::from(factor.into_inner());
2105 cx.mul(&mut f, &accum.0);
2109 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2115 Accum::Numeric {
2116 accum: OrderedDecimal(f),
2117 pos_infs: pos_infs * factor,
2118 neg_infs: neg_infs * factor,
2119 nans: nans * factor,
2120 non_nulls: non_nulls * factor,
2121 }
2122 }
2123 }
2124 }
2125}
2126
2127impl Columnation for Accum {
2128 type InnerRegion = CopyRegion<Self>;
2129}
2130
2131mod monoids {
2133
2134 use differential_dataflow::containers::{Columnation, Region};
2150 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2151 use mz_expr::AggregateFunc;
2152 use mz_ore::soft_panic_or_log;
2153 use mz_repr::{Datum, Diff, Row};
2154 use serde::{Deserialize, Serialize};
2155
2156 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2158 pub enum ReductionMonoid {
2159 Min(Row),
2160 Max(Row),
2161 }
2162
2163 impl ReductionMonoid {
2164 pub fn finalize(&self) -> &Row {
2165 use ReductionMonoid::*;
2166 match self {
2167 Min(row) | Max(row) => row,
2168 }
2169 }
2170 }
2171
2172 impl Clone for ReductionMonoid {
2173 fn clone(&self) -> Self {
2174 use ReductionMonoid::*;
2175 match self {
2176 Min(row) => Min(row.clone()),
2177 Max(row) => Max(row.clone()),
2178 }
2179 }
2180
2181 fn clone_from(&mut self, source: &Self) {
2182 use ReductionMonoid::*;
2183
2184 let mut row = std::mem::take(match self {
2185 Min(row) | Max(row) => row,
2186 });
2187
2188 let source_row = match source {
2189 Min(row) | Max(row) => row,
2190 };
2191
2192 row.clone_from(source_row);
2193
2194 match source {
2195 Min(_) => *self = Min(row),
2196 Max(_) => *self = Max(row),
2197 }
2198 }
2199 }
2200
2201 impl Multiply<Diff> for ReductionMonoid {
2202 type Output = Self;
2203
2204 fn multiply(self, factor: &Diff) -> Self {
2205 assert!(factor.is_positive());
2210 self
2211 }
2212 }
2213
2214 impl Semigroup for ReductionMonoid {
2215 fn plus_equals(&mut self, rhs: &Self) {
2216 match (self, rhs) {
2217 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2218 let swap = {
2219 let lhs_val = lhs.unpack_first();
2220 let rhs_val = rhs.unpack_first();
2221 match (lhs_val, rhs_val) {
2223 (_, Datum::Null) => false,
2224 (Datum::Null, _) => true,
2225 (lhs, rhs) => rhs < lhs,
2226 }
2227 };
2228 if swap {
2229 lhs.clone_from(rhs);
2230 }
2231 }
2232 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2233 let swap = {
2234 let lhs_val = lhs.unpack_first();
2235 let rhs_val = rhs.unpack_first();
2236 match (lhs_val, rhs_val) {
2238 (_, Datum::Null) => false,
2239 (Datum::Null, _) => true,
2240 (lhs, rhs) => rhs > lhs,
2241 }
2242 };
2243 if swap {
2244 lhs.clone_from(rhs);
2245 }
2246 }
2247 (lhs, rhs) => {
2248 soft_panic_or_log!(
2249 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2250 );
2251 }
2252 }
2253 }
2254 }
2255
2256 impl IsZero for ReductionMonoid {
2257 fn is_zero(&self) -> bool {
2258 false
2264 }
2265 }
2266
2267 impl Columnation for ReductionMonoid {
2268 type InnerRegion = ReductionMonoidRegion;
2269 }
2270
2271 #[derive(Default)]
2275 pub struct ReductionMonoidRegion {
2276 inner: <Row as Columnation>::InnerRegion,
2277 }
2278
2279 impl Region for ReductionMonoidRegion {
2280 type Item = ReductionMonoid;
2281
2282 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2283 use ReductionMonoid::*;
2284 match item {
2285 Min(row) => Min(unsafe { self.inner.copy(row) }),
2286 Max(row) => Max(unsafe { self.inner.copy(row) }),
2287 }
2288 }
2289
2290 fn clear(&mut self) {
2291 self.inner.clear();
2292 }
2293
2294 fn reserve_items<'a, I>(&mut self, items: I)
2295 where
2296 Self: 'a,
2297 I: Iterator<Item = &'a Self::Item> + Clone,
2298 {
2299 self.inner
2300 .reserve_items(items.map(ReductionMonoid::finalize));
2301 }
2302
2303 fn reserve_regions<'a, I>(&mut self, regions: I)
2304 where
2305 Self: 'a,
2306 I: Iterator<Item = &'a Self> + Clone,
2307 {
2308 self.inner.reserve_regions(regions.map(|r| &r.inner));
2309 }
2310
2311 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2312 self.inner.heap_size(callback);
2313 }
2314 }
2315
2316 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2319 match func {
2320 AggregateFunc::MaxNumeric
2321 | AggregateFunc::MaxInt16
2322 | AggregateFunc::MaxInt32
2323 | AggregateFunc::MaxInt64
2324 | AggregateFunc::MaxUInt16
2325 | AggregateFunc::MaxUInt32
2326 | AggregateFunc::MaxUInt64
2327 | AggregateFunc::MaxMzTimestamp
2328 | AggregateFunc::MaxFloat32
2329 | AggregateFunc::MaxFloat64
2330 | AggregateFunc::MaxBool
2331 | AggregateFunc::MaxString
2332 | AggregateFunc::MaxDate
2333 | AggregateFunc::MaxTimestamp
2334 | AggregateFunc::MaxTimestampTz
2335 | AggregateFunc::MaxInterval
2336 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2337 AggregateFunc::MinNumeric
2338 | AggregateFunc::MinInt16
2339 | AggregateFunc::MinInt32
2340 | AggregateFunc::MinInt64
2341 | AggregateFunc::MinUInt16
2342 | AggregateFunc::MinUInt32
2343 | AggregateFunc::MinUInt64
2344 | AggregateFunc::MinMzTimestamp
2345 | AggregateFunc::MinFloat32
2346 | AggregateFunc::MinFloat64
2347 | AggregateFunc::MinBool
2348 | AggregateFunc::MinString
2349 | AggregateFunc::MinDate
2350 | AggregateFunc::MinTimestamp
2351 | AggregateFunc::MinTimestampTz
2352 | AggregateFunc::MinInterval
2353 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2354 AggregateFunc::SumInt16
2355 | AggregateFunc::SumInt32
2356 | AggregateFunc::SumInt64
2357 | AggregateFunc::SumUInt16
2358 | AggregateFunc::SumUInt32
2359 | AggregateFunc::SumUInt64
2360 | AggregateFunc::SumFloat32
2361 | AggregateFunc::SumFloat64
2362 | AggregateFunc::SumNumeric
2363 | AggregateFunc::Count
2364 | AggregateFunc::Any
2365 | AggregateFunc::All
2366 | AggregateFunc::Dummy
2367 | AggregateFunc::JsonbAgg { .. }
2368 | AggregateFunc::JsonbObjectAgg { .. }
2369 | AggregateFunc::MapAgg { .. }
2370 | AggregateFunc::ArrayConcat { .. }
2371 | AggregateFunc::ListConcat { .. }
2372 | AggregateFunc::StringAgg { .. }
2373 | AggregateFunc::RowNumber { .. }
2374 | AggregateFunc::Rank { .. }
2375 | AggregateFunc::DenseRank { .. }
2376 | AggregateFunc::LagLead { .. }
2377 | AggregateFunc::FirstValue { .. }
2378 | AggregateFunc::LastValue { .. }
2379 | AggregateFunc::WindowAggregate { .. }
2380 | AggregateFunc::FusedValueWindowFunc { .. }
2381 | AggregateFunc::FusedWindowAggregate { .. } => None,
2382 }
2383 }
2384}
2385
2386mod window_agg_helpers {
2387 use crate::render::reduce::*;
2388
2389 pub enum OneByOneAggrImpls {
2394 Accumulable(AccumulableOneByOneAggr),
2395 Hierarchical(HierarchicalOneByOneAggr),
2396 Basic(mz_expr::NaiveOneByOneAggr),
2397 }
2398
2399 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2400 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2401 match reduction_type(agg) {
2402 ReductionType::Basic => {
2403 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2404 }
2405 ReductionType::Accumulable => {
2406 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2407 }
2408 ReductionType::Hierarchical => {
2409 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2410 }
2411 }
2412 }
2413
2414 fn give(&mut self, d: &Datum) {
2415 match self {
2416 OneByOneAggrImpls::Basic(i) => i.give(d),
2417 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2418 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2419 }
2420 }
2421
2422 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2423 match self {
2425 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2426 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2427 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2428 }
2429 }
2430 }
2431
2432 pub struct AccumulableOneByOneAggr {
2433 aggr_func: AggregateFunc,
2434 accum: Accum,
2435 total: Diff,
2436 }
2437
2438 impl AccumulableOneByOneAggr {
2439 fn new(aggr_func: &AggregateFunc) -> Self {
2440 AccumulableOneByOneAggr {
2441 aggr_func: aggr_func.clone(),
2442 accum: accumulable_zero(aggr_func),
2443 total: Diff::ZERO,
2444 }
2445 }
2446
2447 fn give(&mut self, d: &Datum) {
2448 self.accum
2449 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2450 self.total += Diff::ONE;
2451 }
2452
2453 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2454 temp_storage.make_datum(|packer| {
2455 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2456 })
2457 }
2458 }
2459
2460 pub struct HierarchicalOneByOneAggr {
2461 aggr_func: AggregateFunc,
2462 monoid: ReductionMonoid,
2465 }
2466
2467 impl HierarchicalOneByOneAggr {
2468 fn new(aggr_func: &AggregateFunc) -> Self {
2469 let mut row_buf = Row::default();
2470 row_buf.packer().push(Datum::Null);
2471 HierarchicalOneByOneAggr {
2472 aggr_func: aggr_func.clone(),
2473 monoid: get_monoid(row_buf, aggr_func)
2474 .expect("aggr_func should be a hierarchical aggregation function"),
2475 }
2476 }
2477
2478 fn give(&mut self, d: &Datum) {
2479 let mut row_buf = Row::default();
2480 row_buf.packer().push(d);
2481 let m = get_monoid(row_buf, &self.aggr_func)
2482 .expect("aggr_func should be a hierarchical aggregation function");
2483 self.monoid.plus_equals(&m);
2484 }
2485
2486 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2487 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2488 }
2489 }
2490}