1use std::collections::BTreeMap;
15
16use columnation::{Columnation, CopyRegion};
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
22use differential_dataflow::hashable::Hashable;
23use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
24use differential_dataflow::trace::implementations::BatchContainer;
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Data, VecCollection};
27use itertools::Itertools;
28use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
29use mz_compute_types::plan::ArrangementStrategy;
30use mz_compute_types::plan::reduce::{
31 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_ore::cast::CastLossy;
38use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
39use mz_repr::fixed_length::ToDatumIter;
40use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
41use mz_timely_util::columnation::ColumnationChunker;
42use mz_timely_util::operator::CollectionExt;
43use num_traits::Float;
44use serde::{Deserialize, Serialize};
45use timely::Container;
46use timely::container::{CapacityContainerBuilder, PushInto};
47use tracing::warn;
48
49use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
50use crate::extensions::reduce::{ClearContainer, MzReduce};
51use crate::render::context::{CollectionBundle, Context};
52use crate::render::errors::DataflowErrorSer;
53use crate::render::errors::MaybeValidatingRow;
54use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
55use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
56use crate::typedefs::{
57 ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58 RowRowSpine, RowSpine, RowValSpine,
59};
60use mz_row_spine::{
61 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
62};
63
64impl<'scope, T: RenderTimestamp> Context<'scope, T> {
65 pub fn render_reduce(
68 &self,
69 input_key: Option<Vec<MirScalarExpr>>,
70 input: CollectionBundle<'scope, T>,
71 key_val_plan: KeyValPlan,
72 reduce_plan: ReducePlan,
73 mfp_after: Option<MapFilterProject>,
74 temporal_bucketing_strategy: ArrangementStrategy,
75 ) -> CollectionBundle<'scope, T>
76 where
77 T: crate::render::MaybeBucketByTime,
78 {
79 let mfp_after = mfp_after.map(|m| {
81 m.into_plan()
82 .expect("MFP planning must succeed")
83 .into_nontemporal()
84 .expect("Fused Reduce MFPs do not have temporal predicates")
85 });
86
87 input.scope().region_named("Reduce", |inner| {
88 let KeyValPlan {
89 mut key_plan,
90 mut val_plan,
91 } = key_val_plan;
92 let key_arity = key_plan.projection.len();
93 let mut datums = DatumVec::new();
94
95 let mut demand = Vec::new();
97 demand.extend(key_plan.demand());
98 demand.extend(val_plan.demand());
99 demand.sort();
100 demand.dedup();
101
102 let mut demand_map = BTreeMap::new();
104 for column in demand.iter() {
105 demand_map.insert(*column, demand_map.len());
106 }
107 let demand_map_len = demand_map.len();
108 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
110 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
111 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
112
113 let (key_val_input, err) = input
114 .enter_region(inner)
115 .flat_map::<_, ConsolidatingContainerBuilder<Vec<((Row, Row), T, Diff)>>, _>(
116 input_key.map(|k| (k, None)),
117 max_demand,
118 move |row_datums, time, diff, ok_session, err_session| {
119 let mut row_builder = SharedRow::get();
120 let temp_storage = RowArena::new();
121
122 let mut row_iter = row_datums.drain(..);
123 let mut datums_local = datums.borrow();
124 for skip in skips.iter() {
126 datums_local.push(row_iter.nth(*skip).unwrap());
127 }
128
129 let key = key_plan.evaluate_into(
131 &mut datums_local,
132 &temp_storage,
133 &mut row_builder,
134 );
135 let key = match key {
136 Err(e) => {
137 err_session.give((e.into(), time, diff));
138 return 1;
139 }
140 Ok(Some(key)) => key.clone(),
141 Ok(None) => panic!("Row expected as no predicate was used"),
142 };
143
144 datums_local.truncate(skips.len());
147 let val = val_plan.evaluate_into(
148 &mut datums_local,
149 &temp_storage,
150 &mut row_builder,
151 );
152 let val = match val {
153 Err(e) => {
154 err_session.give((e.into(), time, diff));
155 return 1;
156 }
157 Ok(Some(val)) => val.clone(),
158 Ok(None) => panic!("Row expected as no predicate was used"),
159 };
160
161 ok_session.give(((key, val), time, diff));
162 1
163 },
164 );
165
166 let key_val_collection = key_val_input.as_collection();
171 let key_val_collection = if matches!(
172 temporal_bucketing_strategy,
173 ArrangementStrategy::TemporalBucketing
174 ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
175 {
176 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
177 .get(&self.config_set)
178 .try_into()
179 .expect("must fit");
180 T::maybe_apply_temporal_bucketing(
181 key_val_collection.inner,
182 self.as_of_frontier.clone(),
183 summary,
184 )
185 } else {
186 key_val_collection
187 };
188
189 self.render_reduce_plan(reduce_plan, key_val_collection, err, key_arity, mfp_after)
191 .leave_region(self.scope)
192 })
193 }
194
195 fn render_reduce_plan<'s>(
201 &self,
202 plan: ReducePlan,
203 collection: VecCollection<'s, T, (Row, Row), Diff>,
204 err_input: VecCollection<'s, T, DataflowErrorSer, Diff>,
205 key_arity: usize,
206 mfp_after: Option<SafeMfpPlan>,
207 ) -> CollectionBundle<'s, T> {
208 let mut errors = Default::default();
209 let arrangement =
210 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
211 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
212 CollectionBundle::from_columns(
213 0..key_arity,
214 ArrangementFlavor::Local(
215 arrangement,
216 errs.mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
217 "Arrange bundle err",
218 ),
219 ),
220 )
221 }
222
223 fn render_reduce_plan_inner<'s>(
224 &self,
225 plan: ReducePlan,
226 collection: VecCollection<'s, T, (Row, Row), Diff>,
227 errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
228 key_arity: usize,
229 mfp_after: Option<SafeMfpPlan>,
230 ) -> Arranged<'s, RowRowAgent<T, Diff>> {
231 let arrangement = match plan {
234 ReducePlan::Distinct => {
237 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
238 errors.push(errs);
239 arranged_output
240 }
241 ReducePlan::Accumulable(expr) => {
242 let (arranged_output, errs) =
243 self.build_accumulable(collection, expr, key_arity, mfp_after);
244 errors.push(errs);
245 arranged_output
246 }
247 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
248 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
249 errors.push(errs);
250 output
251 }
252 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
253 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
254 errors.push(errs);
255 output
256 }
257 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
258 expr,
259 fused_unnest_list,
260 })) => {
261 let validating = !fused_unnest_list;
265 let (output, errs) = self.build_basic_aggregate(
266 collection,
267 0,
268 &expr,
269 validating,
270 key_arity,
271 mfp_after,
272 fused_unnest_list,
273 );
274 if validating {
275 errors.push(errs.expect("validation should have occurred as it was requested"));
276 }
277 output
278 }
279 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
280 let (output, errs) =
281 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
282 errors.push(errs);
283 output
284 }
285 };
286 arrangement
287 }
288
289 fn build_distinct<'s>(
291 &self,
292 collection: VecCollection<'s, T, (Row, Row), Diff>,
293 mfp_after: Option<SafeMfpPlan>,
294 ) -> (
295 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
296 VecCollection<'s, T, DataflowErrorSer, Diff>,
297 ) {
298 let error_logger = self.error_logger();
299
300 let mut datums1 = DatumVec::new();
302 let mut datums2 = DatumVec::new();
303 let mfp_after1 = mfp_after.clone();
304 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
305
306 let arranged = collection
307 .mz_arrange::<
308 ColumnationChunker<_>,
309 RowRowBatcher<_, _>,
310 RowRowBuilder<_, _>,
311 RowRowSpine<_, _>,
312 >(
313 "Arranged DistinctBy",
314 );
315 let output = arranged
316 .clone()
317 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
318 "DistinctBy",
319 move |key, _input, output| {
320 let temp_storage = RowArena::new();
321 let mut datums_local = datums1.borrow();
322 datums_local.extend(key.to_datum_iter());
323
324 if mfp_after1
328 .as_ref()
329 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
330 .unwrap_or(Ok(true))
331 == Ok(true)
332 {
333 output.push((Row::default(), Diff::ONE));
337 }
338 },
339 );
340 let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
341 "DistinctByErrorCheck",
342 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowErrorSer, _)>| {
343 for (_, count) in input.iter() {
344 if count.is_positive() {
345 continue;
346 }
347 let message = "Non-positive multiplicity in DistinctBy";
348 error_logger.log(message, &format!("row={key:?}, count={count}"));
349 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
350 return;
351 }
352 let Some(mfp) = &mfp_after2 else { return };
354 let temp_storage = RowArena::new();
355 let datum_iter = key.to_datum_iter();
356 let mut datums_local = datums2.borrow();
357 datums_local.extend(datum_iter);
358
359 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
360 output.push((e.into(), Diff::ONE));
361 }
362 },
363 );
364 (output, errors.as_collection(|_k, v| v.clone()))
365 }
366
367 fn build_basic_aggregates<'s>(
375 &self,
376 input: VecCollection<'s, T, (Row, Row), Diff>,
377 aggrs: Vec<AggregateExpr>,
378 key_arity: usize,
379 mfp_after: Option<SafeMfpPlan>,
380 ) -> (
381 RowRowArrangement<'s, T>,
382 VecCollection<'s, T, DataflowErrorSer, Diff>,
383 ) {
384 if aggrs.len() <= 1 {
387 self.error_logger().soft_panic_or_log(
388 "Too few aggregations when building basic aggregates",
389 &format!("len={}", aggrs.len()),
390 )
391 }
392 let mut err_output = None;
393 let mut to_collect = Vec::new();
394 for (index, aggr) in aggrs.into_iter().enumerate() {
395 let (result, errs) = self.build_basic_aggregate(
396 input.clone(),
397 index,
398 &aggr,
399 err_output.is_none(),
400 key_arity,
401 None,
402 false,
403 );
404 if errs.is_some() {
405 err_output = errs
406 }
407 to_collect
408 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
409 }
410
411 let mut datums1 = DatumVec::new();
413 let mut datums2 = DatumVec::new();
414 let mfp_after1 = mfp_after.clone();
415 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
416
417 let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
418 .mz_arrange::<
419 ColumnationChunker<_>,
420 RowValBatcher<_, _, _>,
421 RowValBuilder<_, _, _>,
422 RowValSpine<_, _, _>,
423 >(
424 "Arranged ReduceFuseBasic input",
425 );
426
427 let output = arranged
428 .clone()
429 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
430 move |key, input, output| {
431 let temp_storage = RowArena::new();
432 let datum_iter = key.to_datum_iter();
433 let mut datums_local = datums1.borrow();
434 datums_local.extend(datum_iter);
435 let key_len = datums_local.len();
436
437 for ((_, row), _) in input.iter() {
438 datums_local.push(row.unpack_first());
439 }
440
441 if let Some(row) =
442 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
443 {
444 output.push((row, Diff::ONE));
445 }
446 }
447 });
448 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
453 if let Some(mfp) = mfp_after2 {
454 let mfp_errs = arranged
455 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
456 "ReduceFuseBasic Error Check",
457 move |key, input, output| {
458 let temp_storage = RowArena::new();
461 let datum_iter = key.to_datum_iter();
462 let mut datums_local = datums2.borrow();
463 datums_local.extend(datum_iter);
464
465 for ((_, row), _) in input.iter() {
466 datums_local.push(row.unpack_first());
467 }
468
469 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
470 output.push((e.into(), Diff::ONE));
471 }
472 },
473 )
474 .as_collection(|_, v| v.clone());
475 (output, validation_errs.concat(mfp_errs))
476 } else {
477 (output, validation_errs)
478 }
479 }
480
481 fn build_basic_aggregate<'s>(
485 &self,
486 input: VecCollection<'s, T, (Row, Row), Diff>,
487 index: usize,
488 aggr: &AggregateExpr,
489 validating: bool,
490 key_arity: usize,
491 mfp_after: Option<SafeMfpPlan>,
492 fused_unnest_list: bool,
493 ) -> (
494 RowRowArrangement<'s, T>,
495 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
496 ) {
497 let AggregateExpr {
498 func,
499 expr: _,
500 distinct,
501 } = aggr.clone();
502
503 let mut partial = input.map(move |(key, row)| {
505 let mut row_builder = SharedRow::get();
506 let value = row.iter().nth(index).unwrap();
507 row_builder.packer().push(value);
508 (key, row_builder.clone())
509 });
510
511 let mut err_output = None;
512
513 if distinct {
515 let pairer = Pairer::new(key_arity);
517 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
518 if validating {
519 let (oks, errs) = self
520 .build_reduce_inaccumulable_distinct::<
521 RowValBuilder<Result<(), String>, _, _>,
522 RowValSpine<Result<(), String>, _, _>,
523 >(keyed, None)
524 .as_collection(|k, v| {
525 (
526 k.to_row(),
527 v.as_ref()
528 .map(|&()| ())
529 .map_err(|m| m.as_str().into()),
530 )
531 })
532 .map_fallible::<
533 CapacityContainerBuilder<_>,
534 CapacityContainerBuilder<_>,
535 _,
536 _,
537 _,
538 >(
539 "Demux Errors",
540 move |(key_val, result)| match result {
541 Ok(()) => Ok(pairer.split(&key_val)),
542 Err(m) => {
543 Err(EvalError::Internal(m).into())
544 }
545 },
546 );
547 err_output = Some(errs);
548 partial = oks;
549 } else {
550 partial = self
551 .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
552 keyed,
553 Some(" [val: empty]"),
554 )
555 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
556 }
557 }
558
559 let mut datums1 = DatumVec::new();
561 let mut datums2 = DatumVec::new();
562 let mut datums_key_1 = DatumVec::new();
563 let mut datums_key_2 = DatumVec::new();
564 let mfp_after1 = mfp_after.clone();
565 let func2 = func.clone();
566
567 let name = if !fused_unnest_list {
568 "ReduceInaccumulable"
569 } else {
570 "FusedReduceUnnestList"
571 };
572 let arranged = partial
573 .mz_arrange::<
574 ColumnationChunker<_>,
575 RowRowBatcher<_, _>,
576 RowRowBuilder<_, _>,
577 RowRowSpine<_, _>,
578 >(&format!(
579 "Arranged {name}"
580 ));
581 let oks = if !fused_unnest_list {
582 arranged
583 .clone()
584 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
585 move |key, source, target| {
586 let iter = source.iter().flat_map(|(v, w)| {
590 let count = usize::try_from(w.into_inner()).unwrap_or(0);
593 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
594 });
595
596 let temp_storage = RowArena::new();
597 let datum_iter = key.to_datum_iter();
598 let mut datums_local = datums1.borrow();
599 datums_local.extend(datum_iter);
600 let key_len = datums_local.len();
601 datums_local.push(
602 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
605 iter,
606 &temp_storage,
607 ),
608 );
609
610 if let Some(row) = evaluate_mfp_after(
611 &mfp_after1,
612 &mut datums_local,
613 &temp_storage,
614 key_len,
615 ) {
616 target.push((row, Diff::ONE));
617 }
618 }
619 })
620 } else {
621 arranged
622 .clone()
623 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
624 move |key, source, target| {
625 let iter = source.iter().flat_map(|(v, w)| {
627 let count = usize::try_from(w.into_inner()).unwrap_or(0);
628 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
629 });
630
631 let temp_storage = RowArena::new();
633 let mut datums_local = datums_key_1.borrow();
634 datums_local.extend(key.to_datum_iter());
635 let key_len = datums_local.len();
636 for datum in func
637 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
638 iter,
639 &temp_storage,
640 )
641 {
642 datums_local.truncate(key_len);
643 datums_local.push(datum);
644 if let Some(row) = evaluate_mfp_after(
645 &mfp_after1,
646 &mut datums_local,
647 &temp_storage,
648 key_len,
649 ) {
650 target.push((row, Diff::ONE));
651 }
652 }
653 }
654 })
655 };
656
657 let must_validate = validating && err_output.is_none();
661 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
662 if must_validate || mfp_after2.is_some() {
663 let error_logger = self.error_logger();
664
665 let errs = if !fused_unnest_list {
666 arranged
667 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
668 &format!("{name} Error Check"),
669 move |key, source, target| {
670 if must_validate {
674 for (value, count) in source.iter() {
675 if count.is_positive() {
676 continue;
677 }
678 let value = value.to_row();
679 let message =
680 "Non-positive accumulation in ReduceInaccumulable";
681 error_logger
682 .log(message, &format!("value={value:?}, count={count}"));
683 let err = EvalError::Internal(message.into());
684 target.push((err.into(), Diff::ONE));
685 return;
686 }
687 }
688
689 let Some(mfp) = &mfp_after2 else { return };
691 let iter = source.iter().flat_map(|&(mut v, ref w)| {
692 let count = usize::try_from(w.into_inner()).unwrap_or(0);
693 std::iter::repeat(v.next().unwrap()).take(count)
696 });
697
698 let temp_storage = RowArena::new();
699 let datum_iter = key.to_datum_iter();
700 let mut datums_local = datums2.borrow();
701 datums_local.extend(datum_iter);
702 datums_local.push(
703 func2.eval_with_fast_window_agg::<
704 _,
705 window_agg_helpers::OneByOneAggrImpls,
706 >(
707 iter, &temp_storage
708 ),
709 );
710 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
711 target.push((e.into(), Diff::ONE));
712 }
713 },
714 )
715 .as_collection(|_, v| v.clone())
716 } else {
717 assert!(!must_validate);
719 let Some(mfp) = mfp_after2 else {
722 unreachable!()
723 };
724 arranged
725 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
726 &format!("{name} Error Check"),
727 move |key, source, target| {
728 let iter = source.iter().flat_map(|&(mut v, ref w)| {
729 let count = usize::try_from(w.into_inner()).unwrap_or(0);
730 std::iter::repeat(v.next().unwrap()).take(count)
733 });
734
735 let temp_storage = RowArena::new();
736 let mut datums_local = datums_key_2.borrow();
737 datums_local.extend(key.to_datum_iter());
738 let key_len = datums_local.len();
739 for datum in func2
740 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
741 iter,
742 &temp_storage,
743 )
744 {
745 datums_local.truncate(key_len);
746 datums_local.push(datum);
747 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
750 {
751 target.push((e.into(), Diff::ONE));
752 }
753 }
754 },
755 )
756 .as_collection(|_, v| v.clone())
757 };
758
759 if let Some(e) = err_output {
760 err_output = Some(e.concat(errs));
761 } else {
762 err_output = Some(errs);
763 }
764 }
765 (oks, err_output)
766 }
767
768 fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
769 &self,
770 input: VecCollection<'s, T, Row, Diff>,
771 name_tag: Option<&str>,
772 ) -> Arranged<'s, TraceAgent<Tr>>
773 where
774 Tr: for<'a> Trace<
775 Key<'a> = DatumSeq<'a>,
776 KeyContainer: BatchContainer<Owned = Row>,
777 Time = T,
778 Diff = Diff,
779 ValOwn: Data + MaybeValidatingRow<(), String>,
780 > + 'static,
781 Bu: Builder<
782 Time = T,
783 Input: Container
784 + ClearContainer
785 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
786 Output = Tr::Batch,
787 >,
788 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
789 {
790 let error_logger = self.error_logger();
791
792 let output_name = format!(
793 "ReduceInaccumulable Distinct{}",
794 name_tag.unwrap_or_default()
795 );
796
797 let input: KeyCollection<_, _, _> = input.into();
798 input
799 .mz_arrange::<
800 ColumnationChunker<_>,
801 RowBatcher<_, _>,
802 RowBuilder<_, _>,
803 RowSpine<_, _>,
804 >(
805 "Arranged ReduceInaccumulable Distinct [val: empty]",
806 )
807 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
808 if let Some(err) = Tr::ValOwn::into_error() {
809 for (value, count) in source.iter() {
810 if count.is_positive() {
811 continue;
812 }
813
814 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
815 error_logger.log(message, &format!("value={value:?}, count={count}"));
816 t.push((err(message.to_string()), Diff::ONE));
817 return;
818 }
819 }
820 t.push((Tr::ValOwn::ok(()), Diff::ONE))
821 })
822 }
823
824 fn build_bucketed<'s>(
842 &self,
843 input: VecCollection<'s, T, (Row, Row), Diff>,
844 BucketedPlan {
845 aggr_funcs,
846 buckets,
847 }: BucketedPlan,
848 key_arity: usize,
849 mfp_after: Option<SafeMfpPlan>,
850 ) -> (
851 RowRowArrangement<'s, T>,
852 VecCollection<'s, T, DataflowErrorSer, Diff>,
853 ) {
854 let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
855 let outer_scope = input.scope();
856 let arranged_output = outer_scope
857 .clone()
858 .region_named("ReduceHierarchical", |inner| {
859 let input = input.enter(inner);
860
861 let first_mod = buckets.get(0).copied().unwrap_or(1);
863 let aggregations = aggr_funcs.len();
864
865 let mut stage = input.map(move |(key, row)| {
867 let mut row_builder = SharedRow::get();
868 let mut row_packer = row_builder.packer();
869 row_packer.extend(row.iter().take(aggregations));
870 let values = row_builder.clone();
871
872 let hash = values.hashed() % first_mod;
874 let hash_key =
875 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
876 (hash_key, values)
877 });
878
879 for (index, b) in buckets.into_iter().enumerate() {
881 let input = if index == 0 {
883 stage
884 } else {
885 stage.map(move |(hash_key, values)| {
886 let mut hash_key_iter = hash_key.iter();
887 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
888 let hash_key = SharedRow::pack(
890 std::iter::once(Datum::from(hash))
891 .chain(hash_key_iter.take(key_arity)),
892 );
893 (hash_key, values)
894 })
895 };
896
897 let validating = err_output.is_none();
901
902 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
903 if let Some(errs) = errs {
904 err_output = Some(errs.leave_region(outer_scope));
905 }
906 stage = oks
907 }
908
909 let partial = stage.map(move |(hash_key, values)| {
911 let mut hash_key_iter = hash_key.iter();
912 let _hash = hash_key_iter.next();
913 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
914 });
915
916 let mut datums1 = DatumVec::new();
918 let mut datums2 = DatumVec::new();
919 let mfp_after1 = mfp_after.clone();
920 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
921 let aggr_funcs2 = aggr_funcs.clone();
922
923 let error_logger = self.error_logger();
926 let arranged = partial
929 .mz_arrange::<
930 ColumnationChunker<_>,
931 RowRowBatcher<_, _>,
932 RowRowBuilder<_, _>,
933 RowRowSpine<_, _>,
934 >(
935 "Arrange ReduceMinsMaxes",
936 );
937 let must_validate = err_output.is_none();
941 if must_validate || mfp_after2.is_some() {
942 let errs = arranged
943 .clone()
944 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
945 "ReduceMinsMaxes Error Check",
946 move |key, source, target| {
947 if must_validate {
951 for (val, count) in source.iter() {
952 if count.is_positive() {
953 continue;
954 }
955 let val = val.to_row();
956 let message =
957 "Non-positive accumulation in ReduceMinsMaxes";
958 error_logger
959 .log(message, &format!("val={val:?}, count={count}"));
960 target.push((
961 EvalError::Internal(message.into()).into(),
962 Diff::ONE,
963 ));
964 return;
965 }
966 }
967
968 let Some(mfp) = &mfp_after2 else { return };
970 let temp_storage = RowArena::new();
971 let datum_iter = key.to_datum_iter();
972 let mut datums_local = datums2.borrow();
973 datums_local.extend(datum_iter);
974
975 let mut source_iters = source
976 .iter()
977 .map(|(values, _cnt)| *values)
978 .collect::<Vec<_>>();
979 for func in aggr_funcs2.iter() {
980 let column_iter = (0..source_iters.len())
981 .map(|i| source_iters[i].next().unwrap());
982 datums_local.push(func.eval(column_iter, &temp_storage));
983 }
984 if let Result::Err(e) =
985 mfp.evaluate_inner(&mut datums_local, &temp_storage)
986 {
987 target.push((e.into(), Diff::ONE));
988 }
989 },
990 )
991 .as_collection(|_, v| v.clone())
992 .leave_region(outer_scope);
993 if let Some(e) = err_output.take() {
994 err_output = Some(e.concat(errs));
995 } else {
996 err_output = Some(errs);
997 }
998 }
999 arranged
1000 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1001 "ReduceMinsMaxes",
1002 move |key, source, target| {
1003 let temp_storage = RowArena::new();
1004 let datum_iter = key.to_datum_iter();
1005 let mut datums_local = datums1.borrow();
1006 datums_local.extend(datum_iter);
1007 let key_len = datums_local.len();
1008
1009 let mut source_iters = source
1010 .iter()
1011 .map(|(values, _cnt)| *values)
1012 .collect::<Vec<_>>();
1013 for func in aggr_funcs.iter() {
1014 let column_iter = (0..source_iters.len())
1015 .map(|i| source_iters[i].next().unwrap());
1016 datums_local.push(func.eval(column_iter, &temp_storage));
1017 }
1018
1019 if let Some(row) = evaluate_mfp_after(
1020 &mfp_after1,
1021 &mut datums_local,
1022 &temp_storage,
1023 key_len,
1024 ) {
1025 target.push((row, Diff::ONE));
1026 }
1027 },
1028 )
1029 .leave_region(outer_scope)
1030 });
1031 (
1032 arranged_output,
1033 err_output.expect("expected to validate in one level of the hierarchy"),
1034 )
1035 }
1036
1037 fn build_bucketed_stage<'s>(
1044 &self,
1045 aggr_funcs: &Vec<AggregateFunc>,
1046 input: VecCollection<'s, T, (Row, Row), Diff>,
1047 validating: bool,
1048 ) -> (
1049 VecCollection<'s, T, (Row, Row), Diff>,
1050 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
1051 ) {
1052 let (input, negated_output, errs) = if validating {
1053 let (input, reduced) = self
1054 .build_bucketed_negated_output::<
1055 RowValBuilder<_, _, _>,
1056 RowValSpine<Result<Row, Row>, _, _>,
1057 >(
1058 input.clone(),
1059 aggr_funcs.clone(),
1060 );
1061 let (oks, errs) = reduced
1062 .as_collection(|k, v| (k.to_row(), v.clone()))
1063 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1064 "Checked Invalid Accumulations",
1065 |(hash_key, result)| match result {
1066 Err(hash_key) => {
1067 let mut hash_key_iter = hash_key.iter();
1068 let _hash = hash_key_iter.next();
1069 let key = SharedRow::pack(hash_key_iter);
1070 let message = format!(
1071 "Invalid data in source, saw non-positive accumulation \
1072 for key {key:?} in hierarchical mins-maxes aggregate"
1073 );
1074 Err(EvalError::Internal(message.into()).into())
1075 }
1076 Ok(values) => Ok((hash_key, values)),
1077 },
1078 );
1079 (input, oks, Some(errs))
1080 } else {
1081 let (input, reduced) = self
1082 .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1083 input,
1084 aggr_funcs.clone(),
1085 );
1086 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1089 (input, oks, None)
1090 };
1091
1092 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1093 let oks = negated_output.concat(input);
1094 (oks, errs)
1095 }
1096
1097 fn build_bucketed_negated_output<'s, Bu, Tr>(
1101 &self,
1102 input: VecCollection<'s, T, (Row, Row), Diff>,
1103 aggrs: Vec<AggregateFunc>,
1104 ) -> (
1105 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1106 Arranged<'s, TraceAgent<Tr>>,
1107 )
1108 where
1109 Tr: for<'a> Trace<
1110 Key<'a> = DatumSeq<'a>,
1111 KeyContainer: BatchContainer<Owned = Row>,
1112 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1113 Time = T,
1114 Diff = Diff,
1115 > + 'static,
1116 Bu: Builder<
1117 Time = T,
1118 Input: Container
1119 + ClearContainer
1120 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1121 Output = Tr::Batch,
1122 >,
1123 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1124 {
1125 let error_logger = self.error_logger();
1126 let arranged_input = input
1129 .mz_arrange::<
1130 ColumnationChunker<_>,
1131 RowRowBatcher<_, _>,
1132 RowRowBuilder<_, _>,
1133 RowRowSpine<_, _>,
1134 >(
1135 "Arranged MinsMaxesHierarchical input",
1136 );
1137
1138 let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1139 "Reduced Fallibly MinsMaxesHierarchical",
1140 move |key, source, target| {
1141 if let Some(err) = Tr::ValOwn::into_error() {
1142 for (value, count) in source.iter() {
1144 if count.is_positive() {
1145 continue;
1146 }
1147 error_logger.log(
1148 "Non-positive accumulation in MinsMaxesHierarchical",
1149 &format!("key={key:?}, value={value:?}, count={count}"),
1150 );
1151 target.push((
1154 err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1155 Diff::ONE,
1156 ));
1157 return;
1158 }
1159 }
1160
1161 let mut row_builder = SharedRow::get();
1162 let mut row_packer = row_builder.packer();
1163
1164 let mut source_iters = source
1165 .iter()
1166 .map(|(values, _cnt)| *values)
1167 .collect::<Vec<_>>();
1168 for func in aggrs.iter() {
1169 let column_iter =
1170 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1171 row_packer.push(func.eval(column_iter, &RowArena::new()));
1172 }
1173 target.reserve(source.len().saturating_add(1));
1179 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1180 target.extend(source.iter().map(|(values, cnt)| {
1181 let mut cnt = *cnt;
1182 cnt.negate();
1183 (Tr::ValOwn::ok(values.to_row()), cnt)
1184 }));
1185 },
1186 );
1187 (arranged_input, reduced)
1188 }
1189
1190 fn build_monotonic<'s>(
1193 &self,
1194 collection: VecCollection<'s, T, (Row, Row), Diff>,
1195 MonotonicPlan {
1196 aggr_funcs,
1197 must_consolidate,
1198 }: MonotonicPlan,
1199 mfp_after: Option<SafeMfpPlan>,
1200 ) -> (
1201 RowRowArrangement<'s, T>,
1202 VecCollection<'s, T, DataflowErrorSer, Diff>,
1203 ) {
1204 let aggregations = aggr_funcs.len();
1205 let collection = collection
1207 .map(move |(key, row)| {
1208 let mut row_builder = SharedRow::get();
1209 let mut values = Vec::with_capacity(aggregations);
1210 values.extend(
1211 row.iter()
1212 .take(aggregations)
1213 .map(|v| row_builder.pack_using(std::iter::once(v))),
1214 );
1215
1216 (key, values)
1217 })
1218 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1219 must_consolidate,
1220 "Consolidated ReduceMonotonic input",
1221 );
1222
1223 let error_logger = self.error_logger();
1225 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1226 error_logger.log(
1227 "Non-monotonic input to ReduceMonotonic",
1228 &format!("data={data:?}, diff={diff}"),
1229 );
1230 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1231 (EvalError::Internal(m).into(), Diff::ONE)
1232 });
1233 let partial = partial.explode_one(move |(key, values)| {
1237 let mut output = Vec::new();
1238 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1239 output.push(monoids::get_monoid(row, func).expect(
1240 "hierarchical aggregations are expected to have monoid implementations",
1241 ));
1242 }
1243 (key, output)
1244 });
1245
1246 let mut datums1 = DatumVec::new();
1248 let mut datums2 = DatumVec::new();
1249 let mfp_after1 = mfp_after.clone();
1250 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1251
1252 let partial: KeyCollection<_, _, _> = partial.into();
1253 let arranged = partial
1254 .mz_arrange::<
1255 ColumnationChunker<_>,
1256 RowBatcher<_, _>,
1257 RowBuilder<_, _>,
1258 RowSpine<_, Vec<ReductionMonoid>>,
1259 >(
1260 "ArrangeMonotonic [val: empty]",
1261 );
1262 let output = arranged
1263 .clone()
1264 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1265 move |key, input, output| {
1266 let temp_storage = RowArena::new();
1267 let datum_iter = key.to_datum_iter();
1268 let mut datums_local = datums1.borrow();
1269 datums_local.extend(datum_iter);
1270 let key_len = datums_local.len();
1271 let accum = &input[0].1;
1272 for monoid in accum.iter() {
1273 datums_local.extend(monoid.finalize().iter());
1274 }
1275
1276 if let Some(row) =
1277 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1278 {
1279 output.push((row, Diff::ONE));
1280 }
1281 }
1282 });
1283
1284 if let Some(mfp) = mfp_after2 {
1289 let mfp_errs = arranged
1290 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1291 "ReduceMonotonic Error Check",
1292 move |key, input, output| {
1293 let temp_storage = RowArena::new();
1294 let datum_iter = key.to_datum_iter();
1295 let mut datums_local = datums2.borrow();
1296 datums_local.extend(datum_iter);
1297 let accum = &input[0].1;
1298 for monoid in accum.iter() {
1299 datums_local.extend(monoid.finalize().iter());
1300 }
1301 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1302 {
1303 output.push((e.into(), Diff::ONE));
1304 }
1305 },
1306 )
1307 .as_collection(|_k, v| v.clone());
1308 (output, validation_errs.concat(mfp_errs))
1309 } else {
1310 (output, validation_errs)
1311 }
1312 }
1313
1314 fn build_accumulable<'s>(
1321 &self,
1322 collection: VecCollection<'s, T, (Row, Row), Diff>,
1323 AccumulablePlan {
1324 full_aggrs,
1325 simple_aggrs,
1326 distinct_aggrs,
1327 }: AccumulablePlan,
1328 key_arity: usize,
1329 mfp_after: Option<SafeMfpPlan>,
1330 ) -> (
1331 RowRowArrangement<'s, T>,
1332 VecCollection<'s, T, DataflowErrorSer, Diff>,
1333 ) {
1334 let collection_scope = collection.scope();
1335
1336 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1338 self.error_logger().soft_panic_or_log(
1339 "Incorrect numbers of aggregates in accummulable reduction rendering",
1340 &format!(
1341 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1342 full_aggrs.len(),
1343 simple_aggrs.len(),
1344 distinct_aggrs.len(),
1345 ),
1346 );
1347 }
1348
1349 let zero_diffs: (Vec<_>, Diff) = (
1361 full_aggrs
1362 .iter()
1363 .map(|f| accumulable_zero(&f.func))
1364 .collect(),
1365 Diff::ZERO,
1366 );
1367
1368 let mut to_aggregate = Vec::new();
1369 if simple_aggrs.len() > 0 {
1370 let collection = collection.clone();
1372 let easy_cases = collection.explode_one({
1373 let zero_diffs = zero_diffs.clone();
1374 move |(key, row)| {
1375 let mut diffs = zero_diffs.clone();
1376 let mut row_iter = row.iter().enumerate();
1382 for (datum_index, aggr) in simple_aggrs.iter() {
1383 let mut datum = row_iter.next().unwrap();
1384 while datum_index != &datum.0 {
1385 datum = row_iter.next().unwrap();
1386 }
1387 let datum = datum.1;
1388 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1389 diffs.1 = Diff::ONE;
1390 }
1391 ((key, ()), diffs)
1392 }
1393 });
1394 to_aggregate.push(easy_cases);
1395 }
1396
1397 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1399 let pairer = Pairer::new(key_arity);
1400 let collection = collection
1401 .clone()
1402 .map(move |(key, row)| {
1403 let value = row.iter().nth(datum_index).unwrap();
1404 (pairer.merge(&key, std::iter::once(value)), ())
1405 })
1406 .mz_arrange::<
1407 ColumnationChunker<_>,
1408 RowBatcher<_, _>,
1409 RowBuilder<_, _>,
1410 RowSpine<_, _>,
1411 >(
1412 "Arranged Accumulable Distinct [val: empty]",
1413 )
1414 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1415 "Reduced Accumulable Distinct [val: empty]",
1416 move |_k, _s, t| t.push(((), Diff::ONE)),
1417 )
1418 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1419 .explode_one({
1420 let zero_diffs = zero_diffs.clone();
1421 move |(key, row)| {
1422 let datum = row.iter().next().unwrap();
1423 let mut diffs = zero_diffs.clone();
1424 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1425 diffs.1 = Diff::ONE;
1426 ((key, ()), diffs)
1427 }
1428 });
1429 to_aggregate.push(collection);
1430 }
1431
1432 let collection = if to_aggregate.len() == 1 {
1434 to_aggregate.remove(0)
1435 } else {
1436 differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1437 };
1438
1439 let mut datums1 = DatumVec::new();
1441 let mut datums2 = DatumVec::new();
1442 let mfp_after1 = mfp_after.clone();
1443 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1444 let full_aggrs2 = full_aggrs.clone();
1445
1446 let error_logger = self.error_logger();
1447 let err_full_aggrs = full_aggrs.clone();
1448 let arranged = collection
1449 .mz_arrange::<
1450 ColumnationChunker<_>,
1451 RowBatcher<_, _>,
1452 RowBuilder<_, _>,
1453 RowSpine<_, (Vec<Accum>, Diff)>,
1454 >(
1455 "ArrangeAccumulable [val: empty]",
1456 );
1457 let arranged_output = arranged
1458 .clone()
1459 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1460 move |key, input, output| {
1461 let (ref accums, total) = input[0].1;
1462
1463 let temp_storage = RowArena::new();
1464 let datum_iter = key.to_datum_iter();
1465 let mut datums_local = datums1.borrow();
1466 datums_local.extend(datum_iter);
1467 let key_len = datums_local.len();
1468 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1469 datums_local.push(finalize_accum(&aggr.func, accum, total));
1470 }
1471
1472 if let Some(row) =
1473 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1474 {
1475 output.push((row, Diff::ONE));
1476 }
1477 }
1478 });
1479 let arranged_errs = arranged
1480 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1481 "AccumulableErrorCheck",
1482 move |key, input, output| {
1483 let (ref accums, total) = input[0].1;
1484 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1485 if total == Diff::ZERO && !accum.is_zero() {
1488 error_logger.log(
1489 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1490 &format!("aggr={aggr:?}, accum={accum:?}"),
1491 );
1492 let key = key.to_row();
1493 let message = format!(
1494 "Invalid data in source, saw net-zero records for key {key} \
1495 with non-zero accumulation in accumulable aggregate"
1496 );
1497 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1498 }
1499 match (&aggr.func, &accum) {
1500 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1501 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1502 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1503 if accum.is_negative() {
1504 error_logger.log(
1505 "Invalid negative unsigned aggregation in ReduceAccumulable",
1506 &format!("aggr={aggr:?}, accum={accum:?}"),
1507 );
1508 let key = key.to_row();
1509 let message = format!(
1510 "Invalid data in source, saw negative accumulation with \
1511 unsigned type for key {key}"
1512 );
1513 let err = EvalError::Internal(message.into());
1514 output.push((err.into(), Diff::ONE));
1515 }
1516 }
1517 _ => (), }
1519 }
1520
1521 let Some(mfp) = &mfp_after2 else { return };
1523 let temp_storage = RowArena::new();
1524 let datum_iter = key.to_datum_iter();
1525 let mut datums_local = datums2.borrow();
1526 datums_local.extend(datum_iter);
1527 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1528 datums_local.push(finalize_accum(&aggr.func, accum, total));
1529 }
1530
1531 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1532 output.push((e.into(), Diff::ONE));
1533 }
1534 },
1535 );
1536 (
1537 arranged_output,
1538 arranged_errs.as_collection(|_key, error| error.clone()),
1539 )
1540 }
1541}
1542
1543fn evaluate_mfp_after<'a, 'b>(
1547 mfp_after: &'a Option<SafeMfpPlan>,
1548 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1549 temp_storage: &'a RowArena,
1550 key_len: usize,
1551) -> Option<Row> {
1552 let mut row_builder = SharedRow::get();
1553 if let Some(mfp) = mfp_after {
1556 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1559 Some(row_builder.pack_using(iter.skip(key_len)))
1562 } else {
1563 None
1564 }
1565 } else {
1566 Some(row_builder.pack_using(&datums_local[key_len..]))
1567 }
1568}
1569
1570fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1571 match aggr_func {
1572 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1573 trues: Diff::ZERO,
1574 falses: Diff::ZERO,
1575 },
1576 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1577 accum: AccumCount::ZERO,
1578 pos_infs: Diff::ZERO,
1579 neg_infs: Diff::ZERO,
1580 nans: Diff::ZERO,
1581 non_nulls: Diff::ZERO,
1582 },
1583 AggregateFunc::SumNumeric => Accum::Numeric {
1584 accum: OrderedDecimal(NumericAgg::zero()),
1585 pos_infs: Diff::ZERO,
1586 neg_infs: Diff::ZERO,
1587 nans: Diff::ZERO,
1588 non_nulls: Diff::ZERO,
1589 },
1590 _ => Accum::SimpleNumber {
1591 accum: AccumCount::ZERO,
1592 non_nulls: Diff::ZERO,
1593 },
1594 }
1595}
1596
1597const FLOAT_SCALE_EXP: u32 = 24;
1601
1602#[allow(clippy::as_conversions)] const FLOAT_SCALE: f64 = (1_u64 << FLOAT_SCALE_EXP) as f64;
1605
1606fn float_to_fixed_point(n: f64) -> i128 {
1623 debug_assert!(n.is_finite());
1624
1625 let (mantissa, exponent, sign) = Float::integer_decode(n);
1629 let significand = u128::from(mantissa);
1630 let exp = i64::from(exponent) + i64::from(FLOAT_SCALE_EXP);
1631
1632 let magnitude: u128 = if exp >= 0 {
1633 match u32::try_from(exp) {
1636 Ok(shift) if shift < 128 => significand << shift,
1637 _ => 0,
1638 }
1639 } else {
1640 match u32::try_from(-exp) {
1643 Ok(shift) if shift < 128 => significand >> shift,
1644 _ => 0,
1645 }
1646 };
1647
1648 let magnitude = magnitude.cast_signed();
1651 if sign < 0 {
1652 magnitude.wrapping_neg()
1653 } else {
1654 magnitude
1655 }
1656}
1657
1658fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1659 match aggregate_func {
1660 AggregateFunc::Count => Accum::SimpleNumber {
1661 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1663 Diff::ZERO
1664 } else {
1665 Diff::ONE
1666 },
1667 },
1668 AggregateFunc::Any | AggregateFunc::All => match datum {
1669 Datum::True => Accum::Bool {
1670 trues: Diff::ONE,
1671 falses: Diff::ZERO,
1672 },
1673 Datum::Null => Accum::Bool {
1674 trues: Diff::ZERO,
1675 falses: Diff::ZERO,
1676 },
1677 Datum::False => Accum::Bool {
1678 trues: Diff::ZERO,
1679 falses: Diff::ONE,
1680 },
1681 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1682 },
1683 AggregateFunc::Dummy => match datum {
1684 Datum::Dummy => Accum::SimpleNumber {
1685 accum: AccumCount::ZERO,
1686 non_nulls: Diff::ZERO,
1687 },
1688 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1689 },
1690 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1691 let n = match datum {
1692 Datum::Float32(n) => f64::from(*n),
1693 Datum::Float64(n) => *n,
1694 Datum::Null => 0f64,
1695 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1696 };
1697
1698 let nans = Diff::from(n.is_nan());
1699 let pos_infs = Diff::from(n == f64::INFINITY);
1700 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1701 let non_nulls = Diff::from(datum != Datum::Null);
1702
1703 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1706 AccumCount::ZERO
1707 } else {
1708 float_to_fixed_point(n).into()
1712 };
1713
1714 Accum::Float {
1715 accum,
1716 pos_infs,
1717 neg_infs,
1718 nans,
1719 non_nulls,
1720 }
1721 }
1722 AggregateFunc::SumNumeric => match datum {
1723 Datum::Numeric(n) => {
1724 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1725 if n.0.is_negative() {
1726 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1727 } else {
1728 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1729 }
1730 } else if n.0.is_nan() {
1731 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1732 } else {
1733 let mut cx_agg = numeric::cx_agg();
1736 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1737 };
1738
1739 Accum::Numeric {
1740 accum: OrderedDecimal(accum),
1741 pos_infs,
1742 neg_infs,
1743 nans,
1744 non_nulls: Diff::ONE,
1745 }
1746 }
1747 Datum::Null => Accum::Numeric {
1748 accum: OrderedDecimal(NumericAgg::zero()),
1749 pos_infs: Diff::ZERO,
1750 neg_infs: Diff::ZERO,
1751 nans: Diff::ZERO,
1752 non_nulls: Diff::ZERO,
1753 },
1754 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1755 },
1756 _ => {
1757 match datum {
1761 Datum::Int16(i) => Accum::SimpleNumber {
1762 accum: i.into(),
1763 non_nulls: Diff::ONE,
1764 },
1765 Datum::Int32(i) => Accum::SimpleNumber {
1766 accum: i.into(),
1767 non_nulls: Diff::ONE,
1768 },
1769 Datum::Int64(i) => Accum::SimpleNumber {
1770 accum: i.into(),
1771 non_nulls: Diff::ONE,
1772 },
1773 Datum::UInt16(u) => Accum::SimpleNumber {
1774 accum: u.into(),
1775 non_nulls: Diff::ONE,
1776 },
1777 Datum::UInt32(u) => Accum::SimpleNumber {
1778 accum: u.into(),
1779 non_nulls: Diff::ONE,
1780 },
1781 Datum::UInt64(u) => Accum::SimpleNumber {
1782 accum: u.into(),
1783 non_nulls: Diff::ONE,
1784 },
1785 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1786 accum: u64::from(t).into(),
1787 non_nulls: Diff::ONE,
1788 },
1789 Datum::Null => Accum::SimpleNumber {
1790 accum: AccumCount::ZERO,
1791 non_nulls: Diff::ZERO,
1792 },
1793 x => panic!("Accumulating non-integer data: {x:?}"),
1794 }
1795 }
1796 }
1797}
1798
1799fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1800 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1804 Datum::Null
1805 } else {
1806 match (&aggr_func, &accum) {
1807 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1808 Datum::Int64(non_nulls.into_inner())
1809 }
1810 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1811 if falses.is_positive() {
1813 Datum::False
1814 } else if *trues == total {
1815 Datum::True
1816 } else {
1817 Datum::Null
1818 }
1819 }
1820 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1821 if trues.is_positive() {
1823 Datum::True
1824 } else if *falses == total {
1825 Datum::False
1826 } else {
1827 Datum::Null
1828 }
1829 }
1830 (AggregateFunc::Dummy, _) => Datum::Dummy,
1831 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1833 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1834 #[allow(clippy::as_conversions)]
1839 Datum::Int64(accum.into_inner() as i64)
1840 }
1841 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1842 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1843 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1844 if !accum.is_negative() {
1845 #[allow(clippy::as_conversions)]
1851 Datum::UInt64(accum.into_inner() as u64)
1852 } else {
1853 Datum::Null
1857 }
1858 }
1859 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1860 if !accum.is_negative() {
1861 Datum::from(*accum)
1862 } else {
1863 Datum::Null
1867 }
1868 }
1869 (
1870 AggregateFunc::SumFloat32,
1871 Accum::Float {
1872 accum,
1873 pos_infs,
1874 neg_infs,
1875 nans,
1876 non_nulls: _,
1877 },
1878 ) => {
1879 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1880 Datum::from(f32::NAN)
1883 } else if pos_infs.is_positive() {
1884 Datum::from(f32::INFINITY)
1885 } else if neg_infs.is_positive() {
1886 Datum::from(f32::NEG_INFINITY)
1887 } else {
1888 let sum = f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE;
1889 Datum::from(f32::cast_lossy(sum))
1890 }
1891 }
1892 (
1893 AggregateFunc::SumFloat64,
1894 Accum::Float {
1895 accum,
1896 pos_infs,
1897 neg_infs,
1898 nans,
1899 non_nulls: _,
1900 },
1901 ) => {
1902 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1903 Datum::from(f64::NAN)
1906 } else if pos_infs.is_positive() {
1907 Datum::from(f64::INFINITY)
1908 } else if neg_infs.is_positive() {
1909 Datum::from(f64::NEG_INFINITY)
1910 } else {
1911 Datum::from(f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE)
1912 }
1913 }
1914 (
1915 AggregateFunc::SumNumeric,
1916 Accum::Numeric {
1917 accum,
1918 pos_infs,
1919 neg_infs,
1920 nans,
1921 non_nulls: _,
1922 },
1923 ) => {
1924 let mut cx_datum = numeric::cx_datum();
1925 let d = cx_datum.to_width(accum.0);
1926 let inf_d = d.is_infinite();
1932 let neg_d = d.is_negative();
1933 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1934 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1935 if nans.is_positive() || (pos_inf && neg_inf) {
1936 Datum::from(Numeric::nan())
1939 } else if pos_inf {
1940 Datum::from(Numeric::infinity())
1941 } else if neg_inf {
1942 let mut cx = numeric::cx_datum();
1943 let mut d = Numeric::infinity();
1944 cx.neg(&mut d);
1945 Datum::from(d)
1946 } else {
1947 Datum::from(d)
1948 }
1949 }
1950 _ => panic!(
1951 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1952 aggr_func
1953 ),
1954 }
1955 }
1956}
1957
1958type AccumCount = mz_ore::Overflowing<i128>;
1960
1961#[derive(
1972 Debug,
1973 Clone,
1974 Copy,
1975 PartialEq,
1976 Eq,
1977 PartialOrd,
1978 Ord,
1979 Serialize,
1980 Deserialize
1981)]
1982enum Accum {
1983 Bool {
1985 trues: Diff,
1987 falses: Diff,
1989 },
1990 SimpleNumber {
1992 accum: AccumCount,
1994 non_nulls: Diff,
1996 },
1997 Float {
1999 accum: AccumCount,
2002 pos_infs: Diff,
2004 neg_infs: Diff,
2006 nans: Diff,
2008 non_nulls: Diff,
2010 },
2011 Numeric {
2013 accum: OrderedDecimal<NumericAgg>,
2015 pos_infs: Diff,
2017 neg_infs: Diff,
2019 nans: Diff,
2021 non_nulls: Diff,
2023 },
2024}
2025
2026impl IsZero for Accum {
2027 fn is_zero(&self) -> bool {
2028 match self {
2029 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2030 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2031 Accum::Float {
2032 accum,
2033 pos_infs,
2034 neg_infs,
2035 nans,
2036 non_nulls,
2037 } => {
2038 accum.is_zero()
2039 && pos_infs.is_zero()
2040 && neg_infs.is_zero()
2041 && nans.is_zero()
2042 && non_nulls.is_zero()
2043 }
2044 Accum::Numeric {
2045 accum,
2046 pos_infs,
2047 neg_infs,
2048 nans,
2049 non_nulls,
2050 } => {
2051 accum.0.is_zero()
2052 && pos_infs.is_zero()
2053 && neg_infs.is_zero()
2054 && nans.is_zero()
2055 && non_nulls.is_zero()
2056 }
2057 }
2058 }
2059}
2060
2061impl Semigroup for Accum {
2062 fn plus_equals(&mut self, other: &Accum) {
2063 match (&mut *self, other) {
2064 (
2065 Accum::Bool { trues, falses },
2066 Accum::Bool {
2067 trues: other_trues,
2068 falses: other_falses,
2069 },
2070 ) => {
2071 *trues += other_trues;
2072 *falses += other_falses;
2073 }
2074 (
2075 Accum::SimpleNumber { accum, non_nulls },
2076 Accum::SimpleNumber {
2077 accum: other_accum,
2078 non_nulls: other_non_nulls,
2079 },
2080 ) => {
2081 *accum += other_accum;
2082 *non_nulls += other_non_nulls;
2083 }
2084 (
2085 Accum::Float {
2086 accum,
2087 pos_infs,
2088 neg_infs,
2089 nans,
2090 non_nulls,
2091 },
2092 Accum::Float {
2093 accum: other_accum,
2094 pos_infs: other_pos_infs,
2095 neg_infs: other_neg_infs,
2096 nans: other_nans,
2097 non_nulls: other_non_nulls,
2098 },
2099 ) => {
2100 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2101 warn!("Float accumulator overflow. Incorrect results possible");
2102 accum.wrapping_add(*other_accum)
2103 });
2104 *pos_infs += other_pos_infs;
2105 *neg_infs += other_neg_infs;
2106 *nans += other_nans;
2107 *non_nulls += other_non_nulls;
2108 }
2109 (
2110 Accum::Numeric {
2111 accum,
2112 pos_infs,
2113 neg_infs,
2114 nans,
2115 non_nulls,
2116 },
2117 Accum::Numeric {
2118 accum: other_accum,
2119 pos_infs: other_pos_infs,
2120 neg_infs: other_neg_infs,
2121 nans: other_nans,
2122 non_nulls: other_non_nulls,
2123 },
2124 ) => {
2125 let mut cx_agg = numeric::cx_agg();
2126 cx_agg.add(&mut accum.0, &other_accum.0);
2127 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2133 cx_agg.reduce(&mut accum.0);
2152 *pos_infs += other_pos_infs;
2153 *neg_infs += other_neg_infs;
2154 *nans += other_nans;
2155 *non_nulls += other_non_nulls;
2156 }
2157 (l, r) => unreachable!(
2158 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2159 ),
2160 }
2161 }
2162}
2163
2164impl Multiply<Diff> for Accum {
2165 type Output = Accum;
2166
2167 fn multiply(self, factor: &Diff) -> Accum {
2168 let factor = *factor;
2169 match self {
2170 Accum::Bool { trues, falses } => Accum::Bool {
2171 trues: trues * factor,
2172 falses: falses * factor,
2173 },
2174 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2175 accum: accum * AccumCount::from(factor),
2176 non_nulls: non_nulls * factor,
2177 },
2178 Accum::Float {
2179 accum,
2180 pos_infs,
2181 neg_infs,
2182 nans,
2183 non_nulls,
2184 } => Accum::Float {
2185 accum: accum
2186 .checked_mul(AccumCount::from(factor))
2187 .unwrap_or_else(|| {
2188 warn!("Float accumulator overflow. Incorrect results possible");
2189 accum.wrapping_mul(AccumCount::from(factor))
2190 }),
2191 pos_infs: pos_infs * factor,
2192 neg_infs: neg_infs * factor,
2193 nans: nans * factor,
2194 non_nulls: non_nulls * factor,
2195 },
2196 Accum::Numeric {
2197 accum,
2198 pos_infs,
2199 neg_infs,
2200 nans,
2201 non_nulls,
2202 } => {
2203 let mut cx = numeric::cx_agg();
2204 let mut f = NumericAgg::from(factor.into_inner());
2205 cx.mul(&mut f, &accum.0);
2209 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2215 Accum::Numeric {
2216 accum: OrderedDecimal(f),
2217 pos_infs: pos_infs * factor,
2218 neg_infs: neg_infs * factor,
2219 nans: nans * factor,
2220 non_nulls: non_nulls * factor,
2221 }
2222 }
2223 }
2224 }
2225}
2226
2227impl Columnation for Accum {
2228 type InnerRegion = CopyRegion<Self>;
2229}
2230
2231mod monoids {
2233
2234 use columnation::{Columnation, Region};
2250 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2251 use mz_expr::AggregateFunc;
2252 use mz_ore::soft_panic_or_log;
2253 use mz_repr::{Datum, Diff, Row};
2254 use serde::{Deserialize, Serialize};
2255
2256 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2258 pub enum ReductionMonoid {
2259 Min(Row),
2260 Max(Row),
2261 }
2262
2263 impl ReductionMonoid {
2264 pub fn finalize(&self) -> &Row {
2265 use ReductionMonoid::*;
2266 match self {
2267 Min(row) | Max(row) => row,
2268 }
2269 }
2270 }
2271
2272 impl Clone for ReductionMonoid {
2273 fn clone(&self) -> Self {
2274 use ReductionMonoid::*;
2275 match self {
2276 Min(row) => Min(row.clone()),
2277 Max(row) => Max(row.clone()),
2278 }
2279 }
2280
2281 fn clone_from(&mut self, source: &Self) {
2282 use ReductionMonoid::*;
2283
2284 let mut row = std::mem::take(match self {
2285 Min(row) | Max(row) => row,
2286 });
2287
2288 let source_row = match source {
2289 Min(row) | Max(row) => row,
2290 };
2291
2292 row.clone_from(source_row);
2293
2294 match source {
2295 Min(_) => *self = Min(row),
2296 Max(_) => *self = Max(row),
2297 }
2298 }
2299 }
2300
2301 impl Multiply<Diff> for ReductionMonoid {
2302 type Output = Self;
2303
2304 fn multiply(self, factor: &Diff) -> Self {
2305 assert!(factor.is_positive());
2310 self
2311 }
2312 }
2313
2314 impl Semigroup for ReductionMonoid {
2315 fn plus_equals(&mut self, rhs: &Self) {
2316 match (self, rhs) {
2317 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2318 let swap = {
2319 let lhs_val = lhs.unpack_first();
2320 let rhs_val = rhs.unpack_first();
2321 match (lhs_val, rhs_val) {
2323 (_, Datum::Null) => false,
2324 (Datum::Null, _) => true,
2325 (lhs, rhs) => rhs < lhs,
2326 }
2327 };
2328 if swap {
2329 lhs.clone_from(rhs);
2330 }
2331 }
2332 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2333 let swap = {
2334 let lhs_val = lhs.unpack_first();
2335 let rhs_val = rhs.unpack_first();
2336 match (lhs_val, rhs_val) {
2338 (_, Datum::Null) => false,
2339 (Datum::Null, _) => true,
2340 (lhs, rhs) => rhs > lhs,
2341 }
2342 };
2343 if swap {
2344 lhs.clone_from(rhs);
2345 }
2346 }
2347 (lhs, rhs) => {
2348 soft_panic_or_log!(
2349 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2350 );
2351 }
2352 }
2353 }
2354 }
2355
2356 impl IsZero for ReductionMonoid {
2357 fn is_zero(&self) -> bool {
2358 false
2364 }
2365 }
2366
2367 impl Columnation for ReductionMonoid {
2368 type InnerRegion = ReductionMonoidRegion;
2369 }
2370
2371 #[derive(Default)]
2375 pub struct ReductionMonoidRegion {
2376 inner: <Row as Columnation>::InnerRegion,
2377 }
2378
2379 impl Region for ReductionMonoidRegion {
2380 type Item = ReductionMonoid;
2381
2382 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2383 use ReductionMonoid::*;
2384 match item {
2385 Min(row) => Min(unsafe { self.inner.copy(row) }),
2386 Max(row) => Max(unsafe { self.inner.copy(row) }),
2387 }
2388 }
2389
2390 fn clear(&mut self) {
2391 self.inner.clear();
2392 }
2393
2394 fn reserve_items<'a, I>(&mut self, items: I)
2395 where
2396 Self: 'a,
2397 I: Iterator<Item = &'a Self::Item> + Clone,
2398 {
2399 self.inner
2400 .reserve_items(items.map(ReductionMonoid::finalize));
2401 }
2402
2403 fn reserve_regions<'a, I>(&mut self, regions: I)
2404 where
2405 Self: 'a,
2406 I: Iterator<Item = &'a Self> + Clone,
2407 {
2408 self.inner.reserve_regions(regions.map(|r| &r.inner));
2409 }
2410
2411 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2412 self.inner.heap_size(callback);
2413 }
2414 }
2415
2416 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2419 match func {
2420 AggregateFunc::MaxNumeric
2421 | AggregateFunc::MaxInt16
2422 | AggregateFunc::MaxInt32
2423 | AggregateFunc::MaxInt64
2424 | AggregateFunc::MaxUInt16
2425 | AggregateFunc::MaxUInt32
2426 | AggregateFunc::MaxUInt64
2427 | AggregateFunc::MaxMzTimestamp
2428 | AggregateFunc::MaxFloat32
2429 | AggregateFunc::MaxFloat64
2430 | AggregateFunc::MaxBool
2431 | AggregateFunc::MaxString
2432 | AggregateFunc::MaxDate
2433 | AggregateFunc::MaxTimestamp
2434 | AggregateFunc::MaxTimestampTz
2435 | AggregateFunc::MaxInterval
2436 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2437 AggregateFunc::MinNumeric
2438 | AggregateFunc::MinInt16
2439 | AggregateFunc::MinInt32
2440 | AggregateFunc::MinInt64
2441 | AggregateFunc::MinUInt16
2442 | AggregateFunc::MinUInt32
2443 | AggregateFunc::MinUInt64
2444 | AggregateFunc::MinMzTimestamp
2445 | AggregateFunc::MinFloat32
2446 | AggregateFunc::MinFloat64
2447 | AggregateFunc::MinBool
2448 | AggregateFunc::MinString
2449 | AggregateFunc::MinDate
2450 | AggregateFunc::MinTimestamp
2451 | AggregateFunc::MinTimestampTz
2452 | AggregateFunc::MinInterval
2453 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2454 AggregateFunc::SumInt16
2455 | AggregateFunc::SumInt32
2456 | AggregateFunc::SumInt64
2457 | AggregateFunc::SumUInt16
2458 | AggregateFunc::SumUInt32
2459 | AggregateFunc::SumUInt64
2460 | AggregateFunc::SumFloat32
2461 | AggregateFunc::SumFloat64
2462 | AggregateFunc::SumNumeric
2463 | AggregateFunc::Count
2464 | AggregateFunc::Any
2465 | AggregateFunc::All
2466 | AggregateFunc::Dummy
2467 | AggregateFunc::JsonbAgg { .. }
2468 | AggregateFunc::JsonbObjectAgg { .. }
2469 | AggregateFunc::MapAgg { .. }
2470 | AggregateFunc::ArrayConcat { .. }
2471 | AggregateFunc::ListConcat { .. }
2472 | AggregateFunc::StringAgg { .. }
2473 | AggregateFunc::RowNumber { .. }
2474 | AggregateFunc::Rank { .. }
2475 | AggregateFunc::DenseRank { .. }
2476 | AggregateFunc::LagLead { .. }
2477 | AggregateFunc::FirstValue { .. }
2478 | AggregateFunc::LastValue { .. }
2479 | AggregateFunc::WindowAggregate { .. }
2480 | AggregateFunc::FusedValueWindowFunc { .. }
2481 | AggregateFunc::FusedWindowAggregate { .. } => None,
2482 }
2483 }
2484}
2485
2486mod window_agg_helpers {
2487 use crate::render::reduce::*;
2488
2489 pub enum OneByOneAggrImpls {
2494 Accumulable(AccumulableOneByOneAggr),
2495 Hierarchical(HierarchicalOneByOneAggr),
2496 Basic(mz_expr::NaiveOneByOneAggr),
2497 }
2498
2499 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2500 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2501 match reduction_type(agg) {
2502 ReductionType::Basic => {
2503 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2504 }
2505 ReductionType::Accumulable => {
2506 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2507 }
2508 ReductionType::Hierarchical => {
2509 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2510 }
2511 }
2512 }
2513
2514 fn give(&mut self, d: &Datum) {
2515 match self {
2516 OneByOneAggrImpls::Basic(i) => i.give(d),
2517 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2518 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2519 }
2520 }
2521
2522 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2523 match self {
2525 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2526 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2527 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2528 }
2529 }
2530 }
2531
2532 pub struct AccumulableOneByOneAggr {
2533 aggr_func: AggregateFunc,
2534 accum: Accum,
2535 total: Diff,
2536 }
2537
2538 impl AccumulableOneByOneAggr {
2539 fn new(aggr_func: &AggregateFunc) -> Self {
2540 AccumulableOneByOneAggr {
2541 aggr_func: aggr_func.clone(),
2542 accum: accumulable_zero(aggr_func),
2543 total: Diff::ZERO,
2544 }
2545 }
2546
2547 fn give(&mut self, d: &Datum) {
2548 self.accum
2549 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2550 self.total += Diff::ONE;
2551 }
2552
2553 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2554 temp_storage.make_datum(|packer| {
2555 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2556 })
2557 }
2558 }
2559
2560 pub struct HierarchicalOneByOneAggr {
2561 aggr_func: AggregateFunc,
2562 monoid: ReductionMonoid,
2565 }
2566
2567 impl HierarchicalOneByOneAggr {
2568 fn new(aggr_func: &AggregateFunc) -> Self {
2569 let mut row_buf = Row::default();
2570 row_buf.packer().push(Datum::Null);
2571 HierarchicalOneByOneAggr {
2572 aggr_func: aggr_func.clone(),
2573 monoid: get_monoid(row_buf, aggr_func)
2574 .expect("aggr_func should be a hierarchical aggregation function"),
2575 }
2576 }
2577
2578 fn give(&mut self, d: &Datum) {
2579 let mut row_buf = Row::default();
2580 row_buf.packer().push(d);
2581 let m = get_monoid(row_buf, &self.aggr_func)
2582 .expect("aggr_func should be a hierarchical aggregation function");
2583 self.monoid.plus_equals(&m);
2584 }
2585
2586 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2587 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2588 }
2589 }
2590}
2591
2592#[cfg(test)]
2593mod tests {
2594 use super::*;
2595
2596 #[allow(clippy::as_conversions)]
2600 fn saturating_convert(n: f64) -> i128 {
2601 (n * FLOAT_SCALE) as i128
2602 }
2603
2604 #[mz_ore::test]
2605 fn float_to_fixed_point_matches_saturating_in_range() {
2606 let cases = [
2610 0.0,
2611 -0.0,
2612 1.0,
2613 -1.0,
2614 0.1,
2615 -0.1,
2616 0.5,
2617 -0.5,
2618 3.25,
2619 -3.25,
2620 123456.789,
2621 -123456.789,
2622 1e10,
2623 -1e10,
2624 1e20,
2625 -1e20,
2626 5e30, -5e30,
2628 ];
2629 for n in cases {
2630 assert_eq!(
2631 float_to_fixed_point(n),
2632 saturating_convert(n),
2633 "mismatch for n = {n}"
2634 );
2635 }
2636 }
2637
2638 #[mz_ore::test]
2639 fn float_to_fixed_point_truncates_toward_zero() {
2640 assert_eq!(float_to_fixed_point(1.75), 29_360_128);
2642 assert_eq!(float_to_fixed_point(-1.75), -29_360_128);
2643
2644 let frac = 0.123_456_7_f64;
2646 assert_eq!(float_to_fixed_point(frac), saturating_convert(frac));
2647 assert_eq!(float_to_fixed_point(-frac), saturating_convert(-frac));
2648 assert_eq!(float_to_fixed_point(-frac), -float_to_fixed_point(frac));
2649 }
2650
2651 #[mz_ore::test]
2652 fn float_to_fixed_point_subnormals_round_to_zero() {
2653 assert_eq!(float_to_fixed_point(0.0), 0);
2654 assert_eq!(float_to_fixed_point(-0.0), 0);
2655 assert_eq!(float_to_fixed_point(f64::MIN_POSITIVE / 2.0), 0);
2656 assert_eq!(float_to_fixed_point(5e-324), 0); }
2658
2659 #[mz_ore::test]
2660 fn float_to_fixed_point_cancels_large_finite_values() {
2661 for &n in &[1.1e31_f64, 1e32, 5e33, 1e284] {
2666 assert_eq!(
2667 float_to_fixed_point(n).wrapping_add(float_to_fixed_point(-n)),
2668 0,
2669 "n = {n} did not cancel with -n"
2670 );
2671 }
2672 }
2673
2674 #[mz_ore::test]
2675 fn float_to_fixed_point_sum_via_accumulator() {
2676 let func = AggregateFunc::SumFloat64;
2678 let mut acc = accumulable_zero(&func);
2679 acc.plus_equals(&datum_to_accumulator(&func, Datum::from(1.1e31_f64)));
2680 acc.plus_equals(&datum_to_accumulator(&func, Datum::from(-1.1e31_f64)));
2681 let datum = finalize_accum(&func, &acc, Diff::from(2_i64));
2682 assert_eq!(datum, Datum::from(0.0_f64));
2683 }
2684}