1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
26use differential_dataflow::trace::{Builder, Trace};
27use differential_dataflow::{Data, VecCollection};
28use itertools::Itertools;
29use mz_compute_types::plan::reduce::{
30 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
31 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
32};
33use mz_expr::{
34 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
35};
36use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
37use mz_repr::fixed_length::ToDatumIter;
38use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
39use mz_storage_types::errors::DataflowError;
40use mz_timely_util::operator::CollectionExt;
41use serde::{Deserialize, Serialize};
42use timely::Container;
43use timely::container::{CapacityContainerBuilder, PushInto};
44use timely::dataflow::Scope;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58 ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
59 RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64 G: Scope,
65 G::Timestamp: MzTimestamp + Refines<T>,
66 T: MzTimestamp,
67{
68 pub fn render_reduce(
71 &self,
72 input_key: Option<Vec<MirScalarExpr>>,
73 input: CollectionBundle<G, T>,
74 key_val_plan: KeyValPlan,
75 reduce_plan: ReducePlan,
76 mfp_after: Option<MapFilterProject>,
77 ) -> CollectionBundle<G, T> {
78 let mfp_after = mfp_after.map(|m| {
80 m.into_plan()
81 .expect("MFP planning must succeed")
82 .into_nontemporal()
83 .expect("Fused Reduce MFPs do not have temporal predicates")
84 });
85
86 input.scope().region_named("Reduce", |inner| {
87 let KeyValPlan {
88 mut key_plan,
89 mut val_plan,
90 } = key_val_plan;
91 let key_arity = key_plan.projection.len();
92 let mut datums = DatumVec::new();
93
94 let mut demand = Vec::new();
96 demand.extend(key_plan.demand());
97 demand.extend(val_plan.demand());
98 demand.sort();
99 demand.dedup();
100
101 let mut demand_map = BTreeMap::new();
103 for column in demand.iter() {
104 demand_map.insert(*column, demand_map.len());
105 }
106 let demand_map_len = demand_map.len();
107 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113 input_key.map(|k| (k, None)),
114 max_demand,
115 move |row_datums, time, diff| {
116 let mut row_builder = SharedRow::get();
117 let temp_storage = RowArena::new();
118
119 let mut row_iter = row_datums.drain(..);
120 let mut datums_local = datums.borrow();
121 for skip in skips.iter() {
123 datums_local.push(row_iter.nth(*skip).unwrap());
124 }
125
126 let key =
128 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
129 let key = match key {
130 Err(e) => {
131 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
132 }
133 Ok(Some(key)) => key.clone(),
134 Ok(None) => panic!("Row expected as no predicate was used"),
135 };
136
137 datums_local.truncate(skips.len());
140 let val =
141 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
142 let val = match val {
143 Err(e) => {
144 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
145 }
146 Ok(Some(val)) => val.clone(),
147 Ok(None) => panic!("Row expected as no predicate was used"),
148 };
149
150 Some((Ok((key, val)), time.clone(), diff.clone()))
151 },
152 );
153
154 type CB<T> = ConsolidatingContainerBuilder<T>;
156 let (ok, mut err) = key_val_input
157 .as_collection()
158 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
159
160 err = err.concat(err_input);
161
162 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
164 .leave_region()
165 })
166 }
167
168 fn render_reduce_plan<S>(
174 &self,
175 plan: ReducePlan,
176 collection: VecCollection<S, (Row, Row), Diff>,
177 err_input: VecCollection<S, DataflowError, Diff>,
178 key_arity: usize,
179 mfp_after: Option<SafeMfpPlan>,
180 ) -> CollectionBundle<S, T>
181 where
182 S: Scope<Timestamp = G::Timestamp>,
183 {
184 let mut errors = Default::default();
185 let arrangement =
186 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
187 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
188 CollectionBundle::from_columns(
189 0..key_arity,
190 ArrangementFlavor::Local(
191 arrangement,
192 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
193 ),
194 )
195 }
196
197 fn render_reduce_plan_inner<S>(
198 &self,
199 plan: ReducePlan,
200 collection: VecCollection<S, (Row, Row), Diff>,
201 errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
202 key_arity: usize,
203 mfp_after: Option<SafeMfpPlan>,
204 ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
205 where
206 S: Scope<Timestamp = G::Timestamp>,
207 {
208 let arrangement = match plan {
211 ReducePlan::Distinct => {
214 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
215 errors.push(errs);
216 arranged_output
217 }
218 ReducePlan::Accumulable(expr) => {
219 let (arranged_output, errs) =
220 self.build_accumulable(collection, expr, key_arity, mfp_after);
221 errors.push(errs);
222 arranged_output
223 }
224 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
225 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
226 errors.push(errs);
227 output
228 }
229 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
230 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
231 errors.push(errs);
232 output
233 }
234 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
235 expr,
236 fused_unnest_list,
237 })) => {
238 let validating = !fused_unnest_list;
242 let (output, errs) = self.build_basic_aggregate(
243 collection,
244 0,
245 &expr,
246 validating,
247 key_arity,
248 mfp_after,
249 fused_unnest_list,
250 );
251 if validating {
252 errors.push(errs.expect("validation should have occurred as it was requested"));
253 }
254 output
255 }
256 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257 let (output, errs) =
258 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259 errors.push(errs);
260 output
261 }
262 };
263 arrangement
264 }
265
266 fn build_distinct<S>(
268 &self,
269 collection: VecCollection<S, (Row, Row), Diff>,
270 mfp_after: Option<SafeMfpPlan>,
271 ) -> (
272 Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
273 VecCollection<S, DataflowError, Diff>,
274 )
275 where
276 S: Scope<Timestamp = G::Timestamp>,
277 {
278 let error_logger = self.error_logger();
279
280 let mut datums1 = DatumVec::new();
282 let mut datums2 = DatumVec::new();
283 let mfp_after1 = mfp_after.clone();
284 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
285
286 let (output, errors) = collection
287 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
288 "Arranged DistinctBy",
289 )
290 .reduce_pair::<
291 _,
292 RowRowBuilder<_, _>,
293 RowRowSpine<_, _>,
294 _,
295 RowErrBuilder<_, _>,
296 RowErrSpine<_, _>,
297 >(
298 "DistinctBy",
299 "DistinctByErrorCheck",
300 move |key, _input, output| {
301 let temp_storage = RowArena::new();
302 let mut datums_local = datums1.borrow();
303 datums_local.extend(key.to_datum_iter());
304
305 if mfp_after1
309 .as_ref()
310 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
311 .unwrap_or(Ok(true))
312 == Ok(true)
313 {
314 output.push((Row::default(), Diff::ONE));
318 }
319 },
320 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
321 for (_, count) in input.iter() {
322 if count.is_positive() {
323 continue;
324 }
325 let message = "Non-positive multiplicity in DistinctBy";
326 error_logger.log(message, &format!("row={key:?}, count={count}"));
327 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
328 return;
329 }
330 let Some(mfp) = &mfp_after2 else { return };
332 let temp_storage = RowArena::new();
333 let datum_iter = key.to_datum_iter();
334 let mut datums_local = datums2.borrow();
335 datums_local.extend(datum_iter);
336
337 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
338 output.push((e.into(), Diff::ONE));
339 }
340 },
341 );
342 (output, errors.as_collection(|_k, v| v.clone()))
343 }
344
345 fn build_basic_aggregates<S>(
353 &self,
354 input: VecCollection<S, (Row, Row), Diff>,
355 aggrs: Vec<AggregateExpr>,
356 key_arity: usize,
357 mfp_after: Option<SafeMfpPlan>,
358 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
359 where
360 S: Scope<Timestamp = G::Timestamp>,
361 {
362 if aggrs.len() <= 1 {
365 self.error_logger().soft_panic_or_log(
366 "Too few aggregations when building basic aggregates",
367 &format!("len={}", aggrs.len()),
368 )
369 }
370 let mut err_output = None;
371 let mut to_collect = Vec::new();
372 for (index, aggr) in aggrs.into_iter().enumerate() {
373 let (result, errs) = self.build_basic_aggregate(
374 input.clone(),
375 index,
376 &aggr,
377 err_output.is_none(),
378 key_arity,
379 None,
380 false,
381 );
382 if errs.is_some() {
383 err_output = errs
384 }
385 to_collect
386 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
387 }
388
389 let mut datums1 = DatumVec::new();
391 let mut datums2 = DatumVec::new();
392 let mfp_after1 = mfp_after.clone();
393 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
394
395 let arranged =
396 differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
397 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
398 "Arranged ReduceFuseBasic input",
399 );
400
401 let output = arranged
402 .clone()
403 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
404 move |key, input, output| {
405 let temp_storage = RowArena::new();
406 let datum_iter = key.to_datum_iter();
407 let mut datums_local = datums1.borrow();
408 datums_local.extend(datum_iter);
409 let key_len = datums_local.len();
410
411 for ((_, row), _) in input.iter() {
412 datums_local.push(row.unpack_first());
413 }
414
415 if let Some(row) =
416 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
417 {
418 output.push((row, Diff::ONE));
419 }
420 }
421 });
422 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
427 if let Some(mfp) = mfp_after2 {
428 let mfp_errs = arranged
429 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
430 "ReduceFuseBasic Error Check",
431 move |key, input, output| {
432 let temp_storage = RowArena::new();
435 let datum_iter = key.to_datum_iter();
436 let mut datums_local = datums2.borrow();
437 datums_local.extend(datum_iter);
438
439 for ((_, row), _) in input.iter() {
440 datums_local.push(row.unpack_first());
441 }
442
443 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
444 output.push((e.into(), Diff::ONE));
445 }
446 },
447 )
448 .as_collection(|_, v| v.clone());
449 (output, validation_errs.concat(mfp_errs))
450 } else {
451 (output, validation_errs)
452 }
453 }
454
455 fn build_basic_aggregate<S>(
459 &self,
460 input: VecCollection<S, (Row, Row), Diff>,
461 index: usize,
462 aggr: &AggregateExpr,
463 validating: bool,
464 key_arity: usize,
465 mfp_after: Option<SafeMfpPlan>,
466 fused_unnest_list: bool,
467 ) -> (
468 RowRowArrangement<S>,
469 Option<VecCollection<S, DataflowError, Diff>>,
470 )
471 where
472 S: Scope<Timestamp = G::Timestamp>,
473 {
474 let AggregateExpr {
475 func,
476 expr: _,
477 distinct,
478 } = aggr.clone();
479
480 let mut partial = input.map(move |(key, row)| {
482 let mut row_builder = SharedRow::get();
483 let value = row.iter().nth(index).unwrap();
484 row_builder.packer().push(value);
485 (key, row_builder.clone())
486 });
487
488 let mut err_output = None;
489
490 if distinct {
492 let pairer = Pairer::new(key_arity);
494 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
495 if validating {
496 let (oks, errs) = self
497 .build_reduce_inaccumulable_distinct::<
498 _,
499 RowValBuilder<Result<(), String>, _, _>,
500 RowValSpine<Result<(), String>, _, _>,
501 >(keyed, None)
502 .as_collection(|k, v| {
503 (
504 k.to_row(),
505 v.as_ref()
506 .map(|&()| ())
507 .map_err(|m| m.as_str().into()),
508 )
509 })
510 .map_fallible::<
511 CapacityContainerBuilder<_>,
512 CapacityContainerBuilder<_>,
513 _,
514 _,
515 _,
516 >(
517 "Demux Errors",
518 move |(key_val, result)| match result {
519 Ok(()) => Ok(pairer.split(&key_val)),
520 Err(m) => {
521 Err(EvalError::Internal(m).into())
522 }
523 },
524 );
525 err_output = Some(errs);
526 partial = oks;
527 } else {
528 partial = self
529 .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
530 keyed,
531 Some(" [val: empty]"),
532 )
533 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
534 }
535 }
536
537 let mut datums1 = DatumVec::new();
539 let mut datums2 = DatumVec::new();
540 let mut datums_key_1 = DatumVec::new();
541 let mut datums_key_2 = DatumVec::new();
542 let mfp_after1 = mfp_after.clone();
543 let func2 = func.clone();
544
545 let name = if !fused_unnest_list {
546 "ReduceInaccumulable"
547 } else {
548 "FusedReduceUnnestList"
549 };
550 let arranged = partial
551 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
552 "Arranged {name}"
553 ));
554 let oks = if !fused_unnest_list {
555 arranged
556 .clone()
557 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
558 move |key, source, target| {
559 let iter = source.iter().flat_map(|(v, w)| {
563 let count = usize::try_from(w.into_inner()).unwrap_or(0);
566 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
567 });
568
569 let temp_storage = RowArena::new();
570 let datum_iter = key.to_datum_iter();
571 let mut datums_local = datums1.borrow();
572 datums_local.extend(datum_iter);
573 let key_len = datums_local.len();
574 datums_local.push(
575 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
578 iter,
579 &temp_storage,
580 ),
581 );
582
583 if let Some(row) = evaluate_mfp_after(
584 &mfp_after1,
585 &mut datums_local,
586 &temp_storage,
587 key_len,
588 ) {
589 target.push((row, Diff::ONE));
590 }
591 }
592 })
593 } else {
594 arranged
595 .clone()
596 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
597 move |key, source, target| {
598 let iter = source.iter().flat_map(|(v, w)| {
600 let count = usize::try_from(w.into_inner()).unwrap_or(0);
601 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
602 });
603
604 let temp_storage = RowArena::new();
606 let mut datums_local = datums_key_1.borrow();
607 datums_local.extend(key.to_datum_iter());
608 let key_len = datums_local.len();
609 for datum in func
610 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
611 iter,
612 &temp_storage,
613 )
614 {
615 datums_local.truncate(key_len);
616 datums_local.push(datum);
617 if let Some(row) = evaluate_mfp_after(
618 &mfp_after1,
619 &mut datums_local,
620 &temp_storage,
621 key_len,
622 ) {
623 target.push((row, Diff::ONE));
624 }
625 }
626 }
627 })
628 };
629
630 let must_validate = validating && err_output.is_none();
634 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
635 if must_validate || mfp_after2.is_some() {
636 let error_logger = self.error_logger();
637
638 let errs = if !fused_unnest_list {
639 arranged
640 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
641 &format!("{name} Error Check"),
642 move |key, source, target| {
643 if must_validate {
647 for (value, count) in source.iter() {
648 if count.is_positive() {
649 continue;
650 }
651 let value = value.to_row();
652 let message =
653 "Non-positive accumulation in ReduceInaccumulable";
654 error_logger
655 .log(message, &format!("value={value:?}, count={count}"));
656 let err = EvalError::Internal(message.into());
657 target.push((err.into(), Diff::ONE));
658 return;
659 }
660 }
661
662 let Some(mfp) = &mfp_after2 else { return };
664 let iter = source.iter().flat_map(|&(mut v, ref w)| {
665 let count = usize::try_from(w.into_inner()).unwrap_or(0);
666 std::iter::repeat(v.next().unwrap()).take(count)
669 });
670
671 let temp_storage = RowArena::new();
672 let datum_iter = key.to_datum_iter();
673 let mut datums_local = datums2.borrow();
674 datums_local.extend(datum_iter);
675 datums_local.push(
676 func2.eval_with_fast_window_agg::<
677 _,
678 window_agg_helpers::OneByOneAggrImpls,
679 >(
680 iter, &temp_storage
681 ),
682 );
683 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
684 target.push((e.into(), Diff::ONE));
685 }
686 },
687 )
688 .as_collection(|_, v| v.clone())
689 } else {
690 assert!(!must_validate);
692 let Some(mfp) = mfp_after2 else {
695 unreachable!()
696 };
697 arranged
698 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
699 &format!("{name} Error Check"),
700 move |key, source, target| {
701 let iter = source.iter().flat_map(|&(mut v, ref w)| {
702 let count = usize::try_from(w.into_inner()).unwrap_or(0);
703 std::iter::repeat(v.next().unwrap()).take(count)
706 });
707
708 let temp_storage = RowArena::new();
709 let mut datums_local = datums_key_2.borrow();
710 datums_local.extend(key.to_datum_iter());
711 let key_len = datums_local.len();
712 for datum in func2
713 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
714 iter,
715 &temp_storage,
716 )
717 {
718 datums_local.truncate(key_len);
719 datums_local.push(datum);
720 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
723 {
724 target.push((e.into(), Diff::ONE));
725 }
726 }
727 },
728 )
729 .as_collection(|_, v| v.clone())
730 };
731
732 if let Some(e) = err_output {
733 err_output = Some(e.concat(errs));
734 } else {
735 err_output = Some(errs);
736 }
737 }
738 (oks, err_output)
739 }
740
741 fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
742 &self,
743 input: VecCollection<S, Row, Diff>,
744 name_tag: Option<&str>,
745 ) -> Arranged<S, TraceAgent<Tr>>
746 where
747 S: Scope<Timestamp = G::Timestamp>,
748 Tr: for<'a> Trace<
749 Key<'a> = DatumSeq<'a>,
750 KeyOwn = Row,
751 Time = G::Timestamp,
752 Diff = Diff,
753 ValOwn: Data + MaybeValidatingRow<(), String>,
754 > + 'static,
755 Bu: Builder<
756 Time = G::Timestamp,
757 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
758 Output = Tr::Batch,
759 >,
760 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
761 {
762 let error_logger = self.error_logger();
763
764 let output_name = format!(
765 "ReduceInaccumulable Distinct{}",
766 name_tag.unwrap_or_default()
767 );
768
769 let input: KeyCollection<_, _, _> = input.into();
770 input
771 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
772 "Arranged ReduceInaccumulable Distinct [val: empty]",
773 )
774 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
775 if let Some(err) = Tr::ValOwn::into_error() {
776 for (value, count) in source.iter() {
777 if count.is_positive() {
778 continue;
779 }
780
781 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
782 error_logger.log(message, &format!("value={value:?}, count={count}"));
783 t.push((err(message.to_string()), Diff::ONE));
784 return;
785 }
786 }
787 t.push((Tr::ValOwn::ok(()), Diff::ONE))
788 })
789 }
790
791 fn build_bucketed<S>(
809 &self,
810 input: VecCollection<S, (Row, Row), Diff>,
811 BucketedPlan {
812 aggr_funcs,
813 buckets,
814 }: BucketedPlan,
815 key_arity: usize,
816 mfp_after: Option<SafeMfpPlan>,
817 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
818 where
819 S: Scope<Timestamp = G::Timestamp>,
820 {
821 let mut err_output: Option<VecCollection<S, _, _>> = None;
822 let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
823 let input = input.enter(inner);
824
825 let first_mod = buckets.get(0).copied().unwrap_or(1);
827 let aggregations = aggr_funcs.len();
828
829 let mut stage = input.map(move |(key, row)| {
831 let mut row_builder = SharedRow::get();
832 let mut row_packer = row_builder.packer();
833 row_packer.extend(row.iter().take(aggregations));
834 let values = row_builder.clone();
835
836 let hash = values.hashed() % first_mod;
838 let hash_key =
839 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
840 (hash_key, values)
841 });
842
843 for (index, b) in buckets.into_iter().enumerate() {
845 let input = if index == 0 {
847 stage
848 } else {
849 stage.map(move |(hash_key, values)| {
850 let mut hash_key_iter = hash_key.iter();
851 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
852 let hash_key = SharedRow::pack(
854 std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
855 );
856 (hash_key, values)
857 })
858 };
859
860 let validating = err_output.is_none();
864
865 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
866 if let Some(errs) = errs {
867 err_output = Some(errs.leave_region());
868 }
869 stage = oks
870 }
871
872 let partial = stage.map(move |(hash_key, values)| {
874 let mut hash_key_iter = hash_key.iter();
875 let _hash = hash_key_iter.next();
876 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
877 });
878
879 let mut datums1 = DatumVec::new();
881 let mut datums2 = DatumVec::new();
882 let mfp_after1 = mfp_after.clone();
883 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
884 let aggr_funcs2 = aggr_funcs.clone();
885
886 let error_logger = self.error_logger();
889 let arranged = partial
892 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
893 "Arrange ReduceMinsMaxes",
894 );
895 let must_validate = err_output.is_none();
899 if must_validate || mfp_after2.is_some() {
900 let errs = arranged
901 .clone()
902 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
903 "ReduceMinsMaxes Error Check",
904 move |key, source, target| {
905 if must_validate {
909 for (val, count) in source.iter() {
910 if count.is_positive() {
911 continue;
912 }
913 let val = val.to_row();
914 let message = "Non-positive accumulation in ReduceMinsMaxes";
915 error_logger
916 .log(message, &format!("val={val:?}, count={count}"));
917 target.push((
918 EvalError::Internal(message.into()).into(),
919 Diff::ONE,
920 ));
921 return;
922 }
923 }
924
925 let Some(mfp) = &mfp_after2 else { return };
927 let temp_storage = RowArena::new();
928 let datum_iter = key.to_datum_iter();
929 let mut datums_local = datums2.borrow();
930 datums_local.extend(datum_iter);
931
932 let mut source_iters = source
933 .iter()
934 .map(|(values, _cnt)| *values)
935 .collect::<Vec<_>>();
936 for func in aggr_funcs2.iter() {
937 let column_iter = (0..source_iters.len())
938 .map(|i| source_iters[i].next().unwrap());
939 datums_local.push(func.eval(column_iter, &temp_storage));
940 }
941 if let Result::Err(e) =
942 mfp.evaluate_inner(&mut datums_local, &temp_storage)
943 {
944 target.push((e.into(), Diff::ONE));
945 }
946 },
947 )
948 .as_collection(|_, v| v.clone())
949 .leave_region();
950 if let Some(e) = err_output.take() {
951 err_output = Some(e.concat(errs));
952 } else {
953 err_output = Some(errs);
954 }
955 }
956 arranged
957 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
958 "ReduceMinsMaxes",
959 move |key, source, target| {
960 let temp_storage = RowArena::new();
961 let datum_iter = key.to_datum_iter();
962 let mut datums_local = datums1.borrow();
963 datums_local.extend(datum_iter);
964 let key_len = datums_local.len();
965
966 let mut source_iters = source
967 .iter()
968 .map(|(values, _cnt)| *values)
969 .collect::<Vec<_>>();
970 for func in aggr_funcs.iter() {
971 let column_iter =
972 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
973 datums_local.push(func.eval(column_iter, &temp_storage));
974 }
975
976 if let Some(row) = evaluate_mfp_after(
977 &mfp_after1,
978 &mut datums_local,
979 &temp_storage,
980 key_len,
981 ) {
982 target.push((row, Diff::ONE));
983 }
984 },
985 )
986 .leave_region()
987 });
988 (
989 arranged_output,
990 err_output.expect("expected to validate in one level of the hierarchy"),
991 )
992 }
993
994 fn build_bucketed_stage<S>(
1001 &self,
1002 aggr_funcs: &Vec<AggregateFunc>,
1003 input: VecCollection<S, (Row, Row), Diff>,
1004 validating: bool,
1005 ) -> (
1006 VecCollection<S, (Row, Row), Diff>,
1007 Option<VecCollection<S, DataflowError, Diff>>,
1008 )
1009 where
1010 S: Scope<Timestamp = G::Timestamp>,
1011 {
1012 let (input, negated_output, errs) = if validating {
1013 let (input, reduced) = self
1014 .build_bucketed_negated_output::<
1015 _,
1016 RowValBuilder<_, _, _>,
1017 RowValSpine<Result<Row, Row>, _, _>,
1018 >(
1019 input.clone(),
1020 aggr_funcs.clone(),
1021 );
1022 let (oks, errs) = reduced
1023 .as_collection(|k, v| (k.to_row(), v.clone()))
1024 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1025 "Checked Invalid Accumulations",
1026 |(hash_key, result)| match result {
1027 Err(hash_key) => {
1028 let mut hash_key_iter = hash_key.iter();
1029 let _hash = hash_key_iter.next();
1030 let key = SharedRow::pack(hash_key_iter);
1031 let message = format!(
1032 "Invalid data in source, saw non-positive accumulation \
1033 for key {key:?} in hierarchical mins-maxes aggregate"
1034 );
1035 Err(EvalError::Internal(message.into()).into())
1036 }
1037 Ok(values) => Ok((hash_key, values)),
1038 },
1039 );
1040 (input, oks, Some(errs))
1041 } else {
1042 let (input, reduced) = self
1043 .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1044 input,
1045 aggr_funcs.clone(),
1046 );
1047 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1050 (input, oks, None)
1051 };
1052
1053 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1054 let oks = negated_output.concat(input);
1055 (oks, errs)
1056 }
1057
1058 fn build_bucketed_negated_output<S, Bu, Tr>(
1062 &self,
1063 input: VecCollection<S, (Row, Row), Diff>,
1064 aggrs: Vec<AggregateFunc>,
1065 ) -> (
1066 Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1067 Arranged<S, TraceAgent<Tr>>,
1068 )
1069 where
1070 S: Scope<Timestamp = G::Timestamp>,
1071 Tr: for<'a> Trace<
1072 Key<'a> = DatumSeq<'a>,
1073 KeyOwn = Row,
1074 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1075 Time = G::Timestamp,
1076 Diff = Diff,
1077 > + 'static,
1078 Bu: Builder<
1079 Time = G::Timestamp,
1080 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1081 Output = Tr::Batch,
1082 >,
1083 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1084 {
1085 let error_logger = self.error_logger();
1086 let arranged_input = input
1089 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1090 "Arranged MinsMaxesHierarchical input",
1091 );
1092
1093 let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1094 "Reduced Fallibly MinsMaxesHierarchical",
1095 move |key, source, target| {
1096 if let Some(err) = Tr::ValOwn::into_error() {
1097 for (value, count) in source.iter() {
1099 if count.is_positive() {
1100 continue;
1101 }
1102 error_logger.log(
1103 "Non-positive accumulation in MinsMaxesHierarchical",
1104 &format!("key={key:?}, value={value:?}, count={count}"),
1105 );
1106 target.push((err(Tr::owned_key(key)), Diff::ONE));
1109 return;
1110 }
1111 }
1112
1113 let mut row_builder = SharedRow::get();
1114 let mut row_packer = row_builder.packer();
1115
1116 let mut source_iters = source
1117 .iter()
1118 .map(|(values, _cnt)| *values)
1119 .collect::<Vec<_>>();
1120 for func in aggrs.iter() {
1121 let column_iter =
1122 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1123 row_packer.push(func.eval(column_iter, &RowArena::new()));
1124 }
1125 target.reserve(source.len().saturating_add(1));
1131 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1132 target.extend(source.iter().map(|(values, cnt)| {
1133 let mut cnt = *cnt;
1134 cnt.negate();
1135 (Tr::ValOwn::ok(values.to_row()), cnt)
1136 }));
1137 },
1138 );
1139 (arranged_input, reduced)
1140 }
1141
1142 fn build_monotonic<S>(
1145 &self,
1146 collection: VecCollection<S, (Row, Row), Diff>,
1147 MonotonicPlan {
1148 aggr_funcs,
1149 must_consolidate,
1150 }: MonotonicPlan,
1151 mfp_after: Option<SafeMfpPlan>,
1152 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1153 where
1154 S: Scope<Timestamp = G::Timestamp>,
1155 {
1156 let aggregations = aggr_funcs.len();
1157 let collection = collection
1159 .map(move |(key, row)| {
1160 let mut row_builder = SharedRow::get();
1161 let mut values = Vec::with_capacity(aggregations);
1162 values.extend(
1163 row.iter()
1164 .take(aggregations)
1165 .map(|v| row_builder.pack_using(std::iter::once(v))),
1166 );
1167
1168 (key, values)
1169 })
1170 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1171 must_consolidate,
1172 "Consolidated ReduceMonotonic input",
1173 );
1174
1175 let error_logger = self.error_logger();
1177 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1178 error_logger.log(
1179 "Non-monotonic input to ReduceMonotonic",
1180 &format!("data={data:?}, diff={diff}"),
1181 );
1182 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1183 (EvalError::Internal(m).into(), Diff::ONE)
1184 });
1185 let partial = partial.explode_one(move |(key, values)| {
1189 let mut output = Vec::new();
1190 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1191 output.push(monoids::get_monoid(row, func).expect(
1192 "hierarchical aggregations are expected to have monoid implementations",
1193 ));
1194 }
1195 (key, output)
1196 });
1197
1198 let mut datums1 = DatumVec::new();
1200 let mut datums2 = DatumVec::new();
1201 let mfp_after1 = mfp_after.clone();
1202 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1203
1204 let partial: KeyCollection<_, _, _> = partial.into();
1205 let arranged = partial
1206 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1207 "ArrangeMonotonic [val: empty]",
1208 );
1209 let output = arranged
1210 .clone()
1211 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1212 move |key, input, output| {
1213 let temp_storage = RowArena::new();
1214 let datum_iter = key.to_datum_iter();
1215 let mut datums_local = datums1.borrow();
1216 datums_local.extend(datum_iter);
1217 let key_len = datums_local.len();
1218 let accum = &input[0].1;
1219 for monoid in accum.iter() {
1220 datums_local.extend(monoid.finalize().iter());
1221 }
1222
1223 if let Some(row) =
1224 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1225 {
1226 output.push((row, Diff::ONE));
1227 }
1228 }
1229 });
1230
1231 if let Some(mfp) = mfp_after2 {
1236 let mfp_errs = arranged
1237 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1238 "ReduceMonotonic Error Check",
1239 move |key, input, output| {
1240 let temp_storage = RowArena::new();
1241 let datum_iter = key.to_datum_iter();
1242 let mut datums_local = datums2.borrow();
1243 datums_local.extend(datum_iter);
1244 let accum = &input[0].1;
1245 for monoid in accum.iter() {
1246 datums_local.extend(monoid.finalize().iter());
1247 }
1248 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1249 {
1250 output.push((e.into(), Diff::ONE));
1251 }
1252 },
1253 )
1254 .as_collection(|_k, v| v.clone());
1255 (output, validation_errs.concat(mfp_errs))
1256 } else {
1257 (output, validation_errs)
1258 }
1259 }
1260
1261 fn build_accumulable<S>(
1268 &self,
1269 collection: VecCollection<S, (Row, Row), Diff>,
1270 AccumulablePlan {
1271 full_aggrs,
1272 simple_aggrs,
1273 distinct_aggrs,
1274 }: AccumulablePlan,
1275 key_arity: usize,
1276 mfp_after: Option<SafeMfpPlan>,
1277 ) -> (RowRowArrangement<S>, VecCollection<S, DataflowError, Diff>)
1278 where
1279 S: Scope<Timestamp = G::Timestamp>,
1280 {
1281 let mut collection_scope = collection.scope();
1282
1283 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1285 self.error_logger().soft_panic_or_log(
1286 "Incorrect numbers of aggregates in accummulable reduction rendering",
1287 &format!(
1288 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1289 full_aggrs.len(),
1290 simple_aggrs.len(),
1291 distinct_aggrs.len(),
1292 ),
1293 );
1294 }
1295
1296 let zero_diffs: (Vec<_>, Diff) = (
1308 full_aggrs
1309 .iter()
1310 .map(|f| accumulable_zero(&f.func))
1311 .collect(),
1312 Diff::ZERO,
1313 );
1314
1315 let mut to_aggregate = Vec::new();
1316 if simple_aggrs.len() > 0 {
1317 let collection = collection.clone();
1319 let easy_cases = collection.explode_one({
1320 let zero_diffs = zero_diffs.clone();
1321 move |(key, row)| {
1322 let mut diffs = zero_diffs.clone();
1323 let mut row_iter = row.iter().enumerate();
1329 for (datum_index, aggr) in simple_aggrs.iter() {
1330 let mut datum = row_iter.next().unwrap();
1331 while datum_index != &datum.0 {
1332 datum = row_iter.next().unwrap();
1333 }
1334 let datum = datum.1;
1335 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1336 diffs.1 = Diff::ONE;
1337 }
1338 ((key, ()), diffs)
1339 }
1340 });
1341 to_aggregate.push(easy_cases);
1342 }
1343
1344 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1346 let pairer = Pairer::new(key_arity);
1347 let collection = collection
1348 .clone()
1349 .map(move |(key, row)| {
1350 let value = row.iter().nth(datum_index).unwrap();
1351 (pairer.merge(&key, std::iter::once(value)), ())
1352 })
1353 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1354 "Arranged Accumulable Distinct [val: empty]",
1355 )
1356 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1357 "Reduced Accumulable Distinct [val: empty]",
1358 move |_k, _s, t| t.push(((), Diff::ONE)),
1359 )
1360 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1361 .explode_one({
1362 let zero_diffs = zero_diffs.clone();
1363 move |(key, row)| {
1364 let datum = row.iter().next().unwrap();
1365 let mut diffs = zero_diffs.clone();
1366 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1367 diffs.1 = Diff::ONE;
1368 ((key, ()), diffs)
1369 }
1370 });
1371 to_aggregate.push(collection);
1372 }
1373
1374 let collection = if to_aggregate.len() == 1 {
1376 to_aggregate.remove(0)
1377 } else {
1378 differential_dataflow::collection::concatenate(&mut collection_scope, to_aggregate)
1379 };
1380
1381 let mut datums1 = DatumVec::new();
1383 let mut datums2 = DatumVec::new();
1384 let mfp_after1 = mfp_after.clone();
1385 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1386 let full_aggrs2 = full_aggrs.clone();
1387
1388 let error_logger = self.error_logger();
1389 let err_full_aggrs = full_aggrs.clone();
1390 let (arranged_output, arranged_errs) = collection
1391 .mz_arrange::<
1392 RowBatcher<_, _>,
1393 RowBuilder<_, _>,
1394 RowSpine<_, (Vec<Accum>, Diff)>,
1395 >("ArrangeAccumulable [val: empty]")
1396 .reduce_pair::<
1397 _,
1398 RowRowBuilder<_, _>,
1399 RowRowSpine<_, _>,
1400 _,
1401 RowErrBuilder<_, _>,
1402 RowErrSpine<_, _>,
1403 >(
1404 "ReduceAccumulable",
1405 "AccumulableErrorCheck",
1406 {
1407 move |key, input, output| {
1408 let (ref accums, total) = input[0].1;
1409
1410 let temp_storage = RowArena::new();
1411 let datum_iter = key.to_datum_iter();
1412 let mut datums_local = datums1.borrow();
1413 datums_local.extend(datum_iter);
1414 let key_len = datums_local.len();
1415 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1416 datums_local.push(finalize_accum(&aggr.func, accum, total));
1417 }
1418
1419 if let Some(row) = evaluate_mfp_after(
1420 &mfp_after1,
1421 &mut datums_local,
1422 &temp_storage,
1423 key_len,
1424 ) {
1425 output.push((row, Diff::ONE));
1426 }
1427 }
1428 },
1429 move |key, input, output| {
1430 let (ref accums, total) = input[0].1;
1431 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1432 if total == Diff::ZERO && !accum.is_zero() {
1435 error_logger.log(
1436 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1437 &format!("aggr={aggr:?}, accum={accum:?}"),
1438 );
1439 let key = key.to_row();
1440 let message = format!(
1441 "Invalid data in source, saw net-zero records for key {key} \
1442 with non-zero accumulation in accumulable aggregate"
1443 );
1444 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1445 }
1446 match (&aggr.func, &accum) {
1447 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1448 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1449 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1450 if accum.is_negative() {
1451 error_logger.log(
1452 "Invalid negative unsigned aggregation in ReduceAccumulable",
1453 &format!("aggr={aggr:?}, accum={accum:?}"),
1454 );
1455 let key = key.to_row();
1456 let message = format!(
1457 "Invalid data in source, saw negative accumulation with \
1458 unsigned type for key {key}"
1459 );
1460 let err =
1461 EvalError::Internal(message.into());
1462 output.push((err.into(), Diff::ONE));
1463 }
1464 }
1465 _ => (), }
1467 }
1468
1469 let Some(mfp) = &mfp_after2 else { return };
1471 let temp_storage = RowArena::new();
1472 let datum_iter = key.to_datum_iter();
1473 let mut datums_local = datums2.borrow();
1474 datums_local.extend(datum_iter);
1475 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1476 datums_local.push(finalize_accum(&aggr.func, accum, total));
1477 }
1478
1479 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1480 output.push((e.into(), Diff::ONE));
1481 }
1482 },
1483 );
1484 (
1485 arranged_output,
1486 arranged_errs.as_collection(|_key, error| error.clone()),
1487 )
1488 }
1489}
1490
1491fn evaluate_mfp_after<'a, 'b>(
1495 mfp_after: &'a Option<SafeMfpPlan>,
1496 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1497 temp_storage: &'a RowArena,
1498 key_len: usize,
1499) -> Option<Row> {
1500 let mut row_builder = SharedRow::get();
1501 if let Some(mfp) = mfp_after {
1504 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1507 Some(row_builder.pack_using(iter.skip(key_len)))
1510 } else {
1511 None
1512 }
1513 } else {
1514 Some(row_builder.pack_using(&datums_local[key_len..]))
1515 }
1516}
1517
1518fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1519 match aggr_func {
1520 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1521 trues: Diff::ZERO,
1522 falses: Diff::ZERO,
1523 },
1524 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1525 accum: AccumCount::ZERO,
1526 pos_infs: Diff::ZERO,
1527 neg_infs: Diff::ZERO,
1528 nans: Diff::ZERO,
1529 non_nulls: Diff::ZERO,
1530 },
1531 AggregateFunc::SumNumeric => Accum::Numeric {
1532 accum: OrderedDecimal(NumericAgg::zero()),
1533 pos_infs: Diff::ZERO,
1534 neg_infs: Diff::ZERO,
1535 nans: Diff::ZERO,
1536 non_nulls: Diff::ZERO,
1537 },
1538 _ => Accum::SimpleNumber {
1539 accum: AccumCount::ZERO,
1540 non_nulls: Diff::ZERO,
1541 },
1542 }
1543}
1544
1545static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1546
1547fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1548 match aggregate_func {
1549 AggregateFunc::Count => Accum::SimpleNumber {
1550 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1552 Diff::ZERO
1553 } else {
1554 Diff::ONE
1555 },
1556 },
1557 AggregateFunc::Any | AggregateFunc::All => match datum {
1558 Datum::True => Accum::Bool {
1559 trues: Diff::ONE,
1560 falses: Diff::ZERO,
1561 },
1562 Datum::Null => Accum::Bool {
1563 trues: Diff::ZERO,
1564 falses: Diff::ZERO,
1565 },
1566 Datum::False => Accum::Bool {
1567 trues: Diff::ZERO,
1568 falses: Diff::ONE,
1569 },
1570 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1571 },
1572 AggregateFunc::Dummy => match datum {
1573 Datum::Dummy => Accum::SimpleNumber {
1574 accum: AccumCount::ZERO,
1575 non_nulls: Diff::ZERO,
1576 },
1577 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1578 },
1579 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1580 let n = match datum {
1581 Datum::Float32(n) => f64::from(*n),
1582 Datum::Float64(n) => *n,
1583 Datum::Null => 0f64,
1584 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1585 };
1586
1587 let nans = Diff::from(n.is_nan());
1588 let pos_infs = Diff::from(n == f64::INFINITY);
1589 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1590 let non_nulls = Diff::from(datum != Datum::Null);
1591
1592 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1595 AccumCount::ZERO
1596 } else {
1597 #[allow(clippy::as_conversions)]
1600 { (n * *FLOAT_SCALE) as i128 }.into()
1601 };
1602
1603 Accum::Float {
1604 accum,
1605 pos_infs,
1606 neg_infs,
1607 nans,
1608 non_nulls,
1609 }
1610 }
1611 AggregateFunc::SumNumeric => match datum {
1612 Datum::Numeric(n) => {
1613 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1614 if n.0.is_negative() {
1615 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1616 } else {
1617 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1618 }
1619 } else if n.0.is_nan() {
1620 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1621 } else {
1622 let mut cx_agg = numeric::cx_agg();
1625 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1626 };
1627
1628 Accum::Numeric {
1629 accum: OrderedDecimal(accum),
1630 pos_infs,
1631 neg_infs,
1632 nans,
1633 non_nulls: Diff::ONE,
1634 }
1635 }
1636 Datum::Null => Accum::Numeric {
1637 accum: OrderedDecimal(NumericAgg::zero()),
1638 pos_infs: Diff::ZERO,
1639 neg_infs: Diff::ZERO,
1640 nans: Diff::ZERO,
1641 non_nulls: Diff::ZERO,
1642 },
1643 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1644 },
1645 _ => {
1646 match datum {
1650 Datum::Int16(i) => Accum::SimpleNumber {
1651 accum: i.into(),
1652 non_nulls: Diff::ONE,
1653 },
1654 Datum::Int32(i) => Accum::SimpleNumber {
1655 accum: i.into(),
1656 non_nulls: Diff::ONE,
1657 },
1658 Datum::Int64(i) => Accum::SimpleNumber {
1659 accum: i.into(),
1660 non_nulls: Diff::ONE,
1661 },
1662 Datum::UInt16(u) => Accum::SimpleNumber {
1663 accum: u.into(),
1664 non_nulls: Diff::ONE,
1665 },
1666 Datum::UInt32(u) => Accum::SimpleNumber {
1667 accum: u.into(),
1668 non_nulls: Diff::ONE,
1669 },
1670 Datum::UInt64(u) => Accum::SimpleNumber {
1671 accum: u.into(),
1672 non_nulls: Diff::ONE,
1673 },
1674 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1675 accum: u64::from(t).into(),
1676 non_nulls: Diff::ONE,
1677 },
1678 Datum::Null => Accum::SimpleNumber {
1679 accum: AccumCount::ZERO,
1680 non_nulls: Diff::ZERO,
1681 },
1682 x => panic!("Accumulating non-integer data: {x:?}"),
1683 }
1684 }
1685 }
1686}
1687
1688fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1689 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1693 Datum::Null
1694 } else {
1695 match (&aggr_func, &accum) {
1696 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1697 Datum::Int64(non_nulls.into_inner())
1698 }
1699 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1700 if falses.is_positive() {
1702 Datum::False
1703 } else if *trues == total {
1704 Datum::True
1705 } else {
1706 Datum::Null
1707 }
1708 }
1709 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1710 if trues.is_positive() {
1712 Datum::True
1713 } else if *falses == total {
1714 Datum::False
1715 } else {
1716 Datum::Null
1717 }
1718 }
1719 (AggregateFunc::Dummy, _) => Datum::Dummy,
1720 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1722 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1723 #[allow(clippy::as_conversions)]
1728 Datum::Int64(accum.into_inner() as i64)
1729 }
1730 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1731 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1732 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1733 if !accum.is_negative() {
1734 #[allow(clippy::as_conversions)]
1740 Datum::UInt64(accum.into_inner() as u64)
1741 } else {
1742 Datum::Null
1746 }
1747 }
1748 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1749 if !accum.is_negative() {
1750 Datum::from(*accum)
1751 } else {
1752 Datum::Null
1756 }
1757 }
1758 (
1759 AggregateFunc::SumFloat32,
1760 Accum::Float {
1761 accum,
1762 pos_infs,
1763 neg_infs,
1764 nans,
1765 non_nulls: _,
1766 },
1767 ) => {
1768 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1769 Datum::from(f32::NAN)
1772 } else if pos_infs.is_positive() {
1773 Datum::from(f32::INFINITY)
1774 } else if neg_infs.is_positive() {
1775 Datum::from(f32::NEG_INFINITY)
1776 } else {
1777 #[allow(clippy::as_conversions)]
1779 {
1780 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1781 }
1782 }
1783 }
1784 (
1785 AggregateFunc::SumFloat64,
1786 Accum::Float {
1787 accum,
1788 pos_infs,
1789 neg_infs,
1790 nans,
1791 non_nulls: _,
1792 },
1793 ) => {
1794 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1795 Datum::from(f64::NAN)
1798 } else if pos_infs.is_positive() {
1799 Datum::from(f64::INFINITY)
1800 } else if neg_infs.is_positive() {
1801 Datum::from(f64::NEG_INFINITY)
1802 } else {
1803 #[allow(clippy::as_conversions)]
1805 {
1806 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1807 }
1808 }
1809 }
1810 (
1811 AggregateFunc::SumNumeric,
1812 Accum::Numeric {
1813 accum,
1814 pos_infs,
1815 neg_infs,
1816 nans,
1817 non_nulls: _,
1818 },
1819 ) => {
1820 let mut cx_datum = numeric::cx_datum();
1821 let d = cx_datum.to_width(accum.0);
1822 let inf_d = d.is_infinite();
1828 let neg_d = d.is_negative();
1829 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1830 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1831 if nans.is_positive() || (pos_inf && neg_inf) {
1832 Datum::from(Numeric::nan())
1835 } else if pos_inf {
1836 Datum::from(Numeric::infinity())
1837 } else if neg_inf {
1838 let mut cx = numeric::cx_datum();
1839 let mut d = Numeric::infinity();
1840 cx.neg(&mut d);
1841 Datum::from(d)
1842 } else {
1843 Datum::from(d)
1844 }
1845 }
1846 _ => panic!(
1847 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1848 aggr_func
1849 ),
1850 }
1851 }
1852}
1853
1854type AccumCount = mz_ore::Overflowing<i128>;
1856
1857#[derive(
1868 Debug,
1869 Clone,
1870 Copy,
1871 PartialEq,
1872 Eq,
1873 PartialOrd,
1874 Ord,
1875 Serialize,
1876 Deserialize
1877)]
1878enum Accum {
1879 Bool {
1881 trues: Diff,
1883 falses: Diff,
1885 },
1886 SimpleNumber {
1888 accum: AccumCount,
1890 non_nulls: Diff,
1892 },
1893 Float {
1895 accum: AccumCount,
1898 pos_infs: Diff,
1900 neg_infs: Diff,
1902 nans: Diff,
1904 non_nulls: Diff,
1906 },
1907 Numeric {
1909 accum: OrderedDecimal<NumericAgg>,
1911 pos_infs: Diff,
1913 neg_infs: Diff,
1915 nans: Diff,
1917 non_nulls: Diff,
1919 },
1920}
1921
1922impl IsZero for Accum {
1923 fn is_zero(&self) -> bool {
1924 match self {
1925 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
1926 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
1927 Accum::Float {
1928 accum,
1929 pos_infs,
1930 neg_infs,
1931 nans,
1932 non_nulls,
1933 } => {
1934 accum.is_zero()
1935 && pos_infs.is_zero()
1936 && neg_infs.is_zero()
1937 && nans.is_zero()
1938 && non_nulls.is_zero()
1939 }
1940 Accum::Numeric {
1941 accum,
1942 pos_infs,
1943 neg_infs,
1944 nans,
1945 non_nulls,
1946 } => {
1947 accum.0.is_zero()
1948 && pos_infs.is_zero()
1949 && neg_infs.is_zero()
1950 && nans.is_zero()
1951 && non_nulls.is_zero()
1952 }
1953 }
1954 }
1955}
1956
1957impl Semigroup for Accum {
1958 fn plus_equals(&mut self, other: &Accum) {
1959 match (&mut *self, other) {
1960 (
1961 Accum::Bool { trues, falses },
1962 Accum::Bool {
1963 trues: other_trues,
1964 falses: other_falses,
1965 },
1966 ) => {
1967 *trues += other_trues;
1968 *falses += other_falses;
1969 }
1970 (
1971 Accum::SimpleNumber { accum, non_nulls },
1972 Accum::SimpleNumber {
1973 accum: other_accum,
1974 non_nulls: other_non_nulls,
1975 },
1976 ) => {
1977 *accum += other_accum;
1978 *non_nulls += other_non_nulls;
1979 }
1980 (
1981 Accum::Float {
1982 accum,
1983 pos_infs,
1984 neg_infs,
1985 nans,
1986 non_nulls,
1987 },
1988 Accum::Float {
1989 accum: other_accum,
1990 pos_infs: other_pos_infs,
1991 neg_infs: other_neg_infs,
1992 nans: other_nans,
1993 non_nulls: other_non_nulls,
1994 },
1995 ) => {
1996 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
1997 warn!("Float accumulator overflow. Incorrect results possible");
1998 accum.wrapping_add(*other_accum)
1999 });
2000 *pos_infs += other_pos_infs;
2001 *neg_infs += other_neg_infs;
2002 *nans += other_nans;
2003 *non_nulls += other_non_nulls;
2004 }
2005 (
2006 Accum::Numeric {
2007 accum,
2008 pos_infs,
2009 neg_infs,
2010 nans,
2011 non_nulls,
2012 },
2013 Accum::Numeric {
2014 accum: other_accum,
2015 pos_infs: other_pos_infs,
2016 neg_infs: other_neg_infs,
2017 nans: other_nans,
2018 non_nulls: other_non_nulls,
2019 },
2020 ) => {
2021 let mut cx_agg = numeric::cx_agg();
2022 cx_agg.add(&mut accum.0, &other_accum.0);
2023 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2029 cx_agg.reduce(&mut accum.0);
2048 *pos_infs += other_pos_infs;
2049 *neg_infs += other_neg_infs;
2050 *nans += other_nans;
2051 *non_nulls += other_non_nulls;
2052 }
2053 (l, r) => unreachable!(
2054 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2055 ),
2056 }
2057 }
2058}
2059
2060impl Multiply<Diff> for Accum {
2061 type Output = Accum;
2062
2063 fn multiply(self, factor: &Diff) -> Accum {
2064 let factor = *factor;
2065 match self {
2066 Accum::Bool { trues, falses } => Accum::Bool {
2067 trues: trues * factor,
2068 falses: falses * factor,
2069 },
2070 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2071 accum: accum * AccumCount::from(factor),
2072 non_nulls: non_nulls * factor,
2073 },
2074 Accum::Float {
2075 accum,
2076 pos_infs,
2077 neg_infs,
2078 nans,
2079 non_nulls,
2080 } => Accum::Float {
2081 accum: accum
2082 .checked_mul(AccumCount::from(factor))
2083 .unwrap_or_else(|| {
2084 warn!("Float accumulator overflow. Incorrect results possible");
2085 accum.wrapping_mul(AccumCount::from(factor))
2086 }),
2087 pos_infs: pos_infs * factor,
2088 neg_infs: neg_infs * factor,
2089 nans: nans * factor,
2090 non_nulls: non_nulls * factor,
2091 },
2092 Accum::Numeric {
2093 accum,
2094 pos_infs,
2095 neg_infs,
2096 nans,
2097 non_nulls,
2098 } => {
2099 let mut cx = numeric::cx_agg();
2100 let mut f = NumericAgg::from(factor.into_inner());
2101 cx.mul(&mut f, &accum.0);
2105 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2111 Accum::Numeric {
2112 accum: OrderedDecimal(f),
2113 pos_infs: pos_infs * factor,
2114 neg_infs: neg_infs * factor,
2115 nans: nans * factor,
2116 non_nulls: non_nulls * factor,
2117 }
2118 }
2119 }
2120 }
2121}
2122
2123impl Columnation for Accum {
2124 type InnerRegion = CopyRegion<Self>;
2125}
2126
2127mod monoids {
2129
2130 use differential_dataflow::containers::{Columnation, Region};
2146 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2147 use mz_expr::AggregateFunc;
2148 use mz_ore::soft_panic_or_log;
2149 use mz_repr::{Datum, Diff, Row};
2150 use serde::{Deserialize, Serialize};
2151
2152 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2154 pub enum ReductionMonoid {
2155 Min(Row),
2156 Max(Row),
2157 }
2158
2159 impl ReductionMonoid {
2160 pub fn finalize(&self) -> &Row {
2161 use ReductionMonoid::*;
2162 match self {
2163 Min(row) | Max(row) => row,
2164 }
2165 }
2166 }
2167
2168 impl Clone for ReductionMonoid {
2169 fn clone(&self) -> Self {
2170 use ReductionMonoid::*;
2171 match self {
2172 Min(row) => Min(row.clone()),
2173 Max(row) => Max(row.clone()),
2174 }
2175 }
2176
2177 fn clone_from(&mut self, source: &Self) {
2178 use ReductionMonoid::*;
2179
2180 let mut row = std::mem::take(match self {
2181 Min(row) | Max(row) => row,
2182 });
2183
2184 let source_row = match source {
2185 Min(row) | Max(row) => row,
2186 };
2187
2188 row.clone_from(source_row);
2189
2190 match source {
2191 Min(_) => *self = Min(row),
2192 Max(_) => *self = Max(row),
2193 }
2194 }
2195 }
2196
2197 impl Multiply<Diff> for ReductionMonoid {
2198 type Output = Self;
2199
2200 fn multiply(self, factor: &Diff) -> Self {
2201 assert!(factor.is_positive());
2206 self
2207 }
2208 }
2209
2210 impl Semigroup for ReductionMonoid {
2211 fn plus_equals(&mut self, rhs: &Self) {
2212 match (self, rhs) {
2213 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2214 let swap = {
2215 let lhs_val = lhs.unpack_first();
2216 let rhs_val = rhs.unpack_first();
2217 match (lhs_val, rhs_val) {
2219 (_, Datum::Null) => false,
2220 (Datum::Null, _) => true,
2221 (lhs, rhs) => rhs < lhs,
2222 }
2223 };
2224 if swap {
2225 lhs.clone_from(rhs);
2226 }
2227 }
2228 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2229 let swap = {
2230 let lhs_val = lhs.unpack_first();
2231 let rhs_val = rhs.unpack_first();
2232 match (lhs_val, rhs_val) {
2234 (_, Datum::Null) => false,
2235 (Datum::Null, _) => true,
2236 (lhs, rhs) => rhs > lhs,
2237 }
2238 };
2239 if swap {
2240 lhs.clone_from(rhs);
2241 }
2242 }
2243 (lhs, rhs) => {
2244 soft_panic_or_log!(
2245 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2246 );
2247 }
2248 }
2249 }
2250 }
2251
2252 impl IsZero for ReductionMonoid {
2253 fn is_zero(&self) -> bool {
2254 false
2260 }
2261 }
2262
2263 impl Columnation for ReductionMonoid {
2264 type InnerRegion = ReductionMonoidRegion;
2265 }
2266
2267 #[derive(Default)]
2271 pub struct ReductionMonoidRegion {
2272 inner: <Row as Columnation>::InnerRegion,
2273 }
2274
2275 impl Region for ReductionMonoidRegion {
2276 type Item = ReductionMonoid;
2277
2278 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2279 use ReductionMonoid::*;
2280 match item {
2281 Min(row) => Min(unsafe { self.inner.copy(row) }),
2282 Max(row) => Max(unsafe { self.inner.copy(row) }),
2283 }
2284 }
2285
2286 fn clear(&mut self) {
2287 self.inner.clear();
2288 }
2289
2290 fn reserve_items<'a, I>(&mut self, items: I)
2291 where
2292 Self: 'a,
2293 I: Iterator<Item = &'a Self::Item> + Clone,
2294 {
2295 self.inner
2296 .reserve_items(items.map(ReductionMonoid::finalize));
2297 }
2298
2299 fn reserve_regions<'a, I>(&mut self, regions: I)
2300 where
2301 Self: 'a,
2302 I: Iterator<Item = &'a Self> + Clone,
2303 {
2304 self.inner.reserve_regions(regions.map(|r| &r.inner));
2305 }
2306
2307 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2308 self.inner.heap_size(callback);
2309 }
2310 }
2311
2312 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2315 match func {
2316 AggregateFunc::MaxNumeric
2317 | AggregateFunc::MaxInt16
2318 | AggregateFunc::MaxInt32
2319 | AggregateFunc::MaxInt64
2320 | AggregateFunc::MaxUInt16
2321 | AggregateFunc::MaxUInt32
2322 | AggregateFunc::MaxUInt64
2323 | AggregateFunc::MaxMzTimestamp
2324 | AggregateFunc::MaxFloat32
2325 | AggregateFunc::MaxFloat64
2326 | AggregateFunc::MaxBool
2327 | AggregateFunc::MaxString
2328 | AggregateFunc::MaxDate
2329 | AggregateFunc::MaxTimestamp
2330 | AggregateFunc::MaxTimestampTz
2331 | AggregateFunc::MaxInterval
2332 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2333 AggregateFunc::MinNumeric
2334 | AggregateFunc::MinInt16
2335 | AggregateFunc::MinInt32
2336 | AggregateFunc::MinInt64
2337 | AggregateFunc::MinUInt16
2338 | AggregateFunc::MinUInt32
2339 | AggregateFunc::MinUInt64
2340 | AggregateFunc::MinMzTimestamp
2341 | AggregateFunc::MinFloat32
2342 | AggregateFunc::MinFloat64
2343 | AggregateFunc::MinBool
2344 | AggregateFunc::MinString
2345 | AggregateFunc::MinDate
2346 | AggregateFunc::MinTimestamp
2347 | AggregateFunc::MinTimestampTz
2348 | AggregateFunc::MinInterval
2349 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2350 AggregateFunc::SumInt16
2351 | AggregateFunc::SumInt32
2352 | AggregateFunc::SumInt64
2353 | AggregateFunc::SumUInt16
2354 | AggregateFunc::SumUInt32
2355 | AggregateFunc::SumUInt64
2356 | AggregateFunc::SumFloat32
2357 | AggregateFunc::SumFloat64
2358 | AggregateFunc::SumNumeric
2359 | AggregateFunc::Count
2360 | AggregateFunc::Any
2361 | AggregateFunc::All
2362 | AggregateFunc::Dummy
2363 | AggregateFunc::JsonbAgg { .. }
2364 | AggregateFunc::JsonbObjectAgg { .. }
2365 | AggregateFunc::MapAgg { .. }
2366 | AggregateFunc::ArrayConcat { .. }
2367 | AggregateFunc::ListConcat { .. }
2368 | AggregateFunc::StringAgg { .. }
2369 | AggregateFunc::RowNumber { .. }
2370 | AggregateFunc::Rank { .. }
2371 | AggregateFunc::DenseRank { .. }
2372 | AggregateFunc::LagLead { .. }
2373 | AggregateFunc::FirstValue { .. }
2374 | AggregateFunc::LastValue { .. }
2375 | AggregateFunc::WindowAggregate { .. }
2376 | AggregateFunc::FusedValueWindowFunc { .. }
2377 | AggregateFunc::FusedWindowAggregate { .. } => None,
2378 }
2379 }
2380}
2381
2382mod window_agg_helpers {
2383 use crate::render::reduce::*;
2384
2385 pub enum OneByOneAggrImpls {
2390 Accumulable(AccumulableOneByOneAggr),
2391 Hierarchical(HierarchicalOneByOneAggr),
2392 Basic(mz_expr::NaiveOneByOneAggr),
2393 }
2394
2395 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2396 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2397 match reduction_type(agg) {
2398 ReductionType::Basic => {
2399 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2400 }
2401 ReductionType::Accumulable => {
2402 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2403 }
2404 ReductionType::Hierarchical => {
2405 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2406 }
2407 }
2408 }
2409
2410 fn give(&mut self, d: &Datum) {
2411 match self {
2412 OneByOneAggrImpls::Basic(i) => i.give(d),
2413 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2414 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2415 }
2416 }
2417
2418 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2419 match self {
2421 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2422 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2423 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2424 }
2425 }
2426 }
2427
2428 pub struct AccumulableOneByOneAggr {
2429 aggr_func: AggregateFunc,
2430 accum: Accum,
2431 total: Diff,
2432 }
2433
2434 impl AccumulableOneByOneAggr {
2435 fn new(aggr_func: &AggregateFunc) -> Self {
2436 AccumulableOneByOneAggr {
2437 aggr_func: aggr_func.clone(),
2438 accum: accumulable_zero(aggr_func),
2439 total: Diff::ZERO,
2440 }
2441 }
2442
2443 fn give(&mut self, d: &Datum) {
2444 self.accum
2445 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2446 self.total += Diff::ONE;
2447 }
2448
2449 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2450 temp_storage.make_datum(|packer| {
2451 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2452 })
2453 }
2454 }
2455
2456 pub struct HierarchicalOneByOneAggr {
2457 aggr_func: AggregateFunc,
2458 monoid: ReductionMonoid,
2461 }
2462
2463 impl HierarchicalOneByOneAggr {
2464 fn new(aggr_func: &AggregateFunc) -> Self {
2465 let mut row_buf = Row::default();
2466 row_buf.packer().push(Datum::Null);
2467 HierarchicalOneByOneAggr {
2468 aggr_func: aggr_func.clone(),
2469 monoid: get_monoid(row_buf, aggr_func)
2470 .expect("aggr_func should be a hierarchical aggregation function"),
2471 }
2472 }
2473
2474 fn give(&mut self, d: &Datum) {
2475 let mut row_buf = Row::default();
2476 row_buf.packer().push(d);
2477 let m = get_monoid(row_buf, &self.aggr_func)
2478 .expect("aggr_func should be a hierarchical aggregation function");
2479 self.monoid.plus_equals(&m);
2480 }
2481
2482 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2483 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2484 }
2485 }
2486}