1use std::collections::BTreeMap;
15
16use columnation::{Columnation, CopyRegion};
17use dec::OrderedDecimal;
18use differential_dataflow::Diff as _;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
22use differential_dataflow::hashable::Hashable;
23use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
24use differential_dataflow::trace::implementations::BatchContainer;
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Data, VecCollection};
27use itertools::Itertools;
28use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
29use mz_compute_types::plan::ArrangementStrategy;
30use mz_compute_types::plan::reduce::{
31 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
32 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
33};
34use mz_expr::{
35 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
36};
37use mz_ore::cast::CastLossy;
38use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
39use mz_repr::fixed_length::ExtendDatums;
40use mz_repr::{Datum, DatumVec, Diff, Row, RowArena, SharedRow};
41use mz_timely_util::columnation::ColumnationChunker;
42use mz_timely_util::operator::CollectionExt;
43use num_traits::Float;
44use serde::{Deserialize, Serialize};
45use timely::Container;
46use timely::container::{CapacityContainerBuilder, PushInto};
47use tracing::warn;
48
49use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
50use crate::extensions::reduce::{ClearContainer, MzReduce};
51use crate::render::context::{CollectionBundle, Context};
52use crate::render::errors::DataflowErrorSer;
53use crate::render::errors::MaybeValidatingRow;
54use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
55use crate::render::{ArrangementFlavor, Pairer, RenderTimestamp};
56use crate::typedefs::{
57 ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
58 RowRowSpine, RowSpine, RowValSpine,
59};
60use mz_row_spine::{
61 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
62};
63
64impl<'scope, T: RenderTimestamp> Context<'scope, T> {
65 pub fn render_reduce(
68 &self,
69 input_key: Option<Vec<MirScalarExpr>>,
70 input: CollectionBundle<'scope, T>,
71 key_val_plan: KeyValPlan,
72 reduce_plan: ReducePlan,
73 mfp_after: Option<MapFilterProject>,
74 temporal_bucketing_strategy: ArrangementStrategy,
75 ) -> CollectionBundle<'scope, T>
76 where
77 T: crate::render::MaybeBucketByTime,
78 {
79 let mfp_after = mfp_after.map(|m| {
81 m.into_plan()
82 .expect("MFP planning must succeed")
83 .into_nontemporal()
84 .expect("Fused Reduce MFPs do not have temporal predicates")
85 });
86
87 input.scope().region_named("Reduce", |inner| {
88 let KeyValPlan {
89 mut key_plan,
90 mut val_plan,
91 } = key_val_plan;
92 let key_arity = key_plan.projection.len();
93 let mut datums = DatumVec::new();
94
95 let mut demand = Vec::new();
97 demand.extend(key_plan.demand());
98 demand.extend(val_plan.demand());
99 demand.sort();
100 demand.dedup();
101
102 let mut demand_map = BTreeMap::new();
104 for column in demand.iter() {
105 demand_map.insert(*column, demand_map.len());
106 }
107 let demand_map_len = demand_map.len();
108 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
110 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
111 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
112
113 let (key_val_input, err) = input
114 .enter_region(inner)
115 .flat_map::<_, ConsolidatingContainerBuilder<Vec<((Row, Row), T, Diff)>>, _>(
116 input_key.map(|k| (k, None)),
117 max_demand,
118 move |row_datums, time, diff, ok_session, err_session| {
119 let mut row_builder = SharedRow::get();
120 let temp_storage = RowArena::new();
121
122 let mut row_iter = row_datums.drain(..);
123 let mut datums_local = datums.borrow();
124 for skip in skips.iter() {
126 datums_local.push(row_iter.nth(*skip).unwrap());
127 }
128
129 let key = key_plan.evaluate_into(
131 &mut datums_local,
132 &temp_storage,
133 &mut row_builder,
134 );
135 let key = match key {
136 Err(e) => {
137 err_session.give((e.into(), time, diff));
138 return 1;
139 }
140 Ok(Some(key)) => key.clone(),
141 Ok(None) => panic!("Row expected as no predicate was used"),
142 };
143
144 datums_local.truncate(skips.len());
147 let val = val_plan.evaluate_into(
148 &mut datums_local,
149 &temp_storage,
150 &mut row_builder,
151 );
152 let val = match val {
153 Err(e) => {
154 err_session.give((e.into(), time, diff));
155 return 1;
156 }
157 Ok(Some(val)) => val.clone(),
158 Ok(None) => panic!("Row expected as no predicate was used"),
159 };
160
161 ok_session.give(((key, val), time, diff));
162 1
163 },
164 );
165
166 let key_val_collection = key_val_input.as_collection();
171 let key_val_collection = if matches!(
172 temporal_bucketing_strategy,
173 ArrangementStrategy::TemporalBucketing
174 ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
175 {
176 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
177 .get(&self.config_set)
178 .try_into()
179 .expect("must fit");
180 T::maybe_apply_temporal_bucketing(
181 key_val_collection.inner,
182 self.as_of_frontier.clone(),
183 summary,
184 )
185 } else {
186 key_val_collection
187 };
188
189 self.render_reduce_plan(reduce_plan, key_val_collection, err, key_arity, mfp_after)
191 .leave_region(self.scope)
192 })
193 }
194
195 fn render_reduce_plan<'s>(
201 &self,
202 plan: ReducePlan,
203 collection: VecCollection<'s, T, (Row, Row), Diff>,
204 err_input: VecCollection<'s, T, DataflowErrorSer, Diff>,
205 key_arity: usize,
206 mfp_after: Option<SafeMfpPlan>,
207 ) -> CollectionBundle<'s, T> {
208 let mut errors = Default::default();
209 let arrangement =
210 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
211 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
212 CollectionBundle::from_columns(
213 0..key_arity,
214 ArrangementFlavor::Local(
215 arrangement,
216 errs.mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
217 "Arrange bundle err",
218 ),
219 ),
220 )
221 }
222
223 fn render_reduce_plan_inner<'s>(
224 &self,
225 plan: ReducePlan,
226 collection: VecCollection<'s, T, (Row, Row), Diff>,
227 errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
228 key_arity: usize,
229 mfp_after: Option<SafeMfpPlan>,
230 ) -> Arranged<'s, RowRowAgent<T, Diff>> {
231 let arrangement = match plan {
234 ReducePlan::Distinct => {
237 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
238 errors.push(errs);
239 arranged_output
240 }
241 ReducePlan::Accumulable(expr) => {
242 let (arranged_output, errs) =
243 self.build_accumulable(collection, expr, key_arity, mfp_after);
244 errors.push(errs);
245 arranged_output
246 }
247 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
248 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
249 errors.push(errs);
250 output
251 }
252 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
253 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
254 errors.push(errs);
255 output
256 }
257 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
258 expr,
259 fused_unnest_list,
260 })) => {
261 let validating = !fused_unnest_list;
265 let (output, errs) = self.build_basic_aggregate(
266 collection,
267 0,
268 &expr,
269 validating,
270 key_arity,
271 mfp_after,
272 fused_unnest_list,
273 );
274 if validating {
275 errors.push(errs.expect("validation should have occurred as it was requested"));
276 }
277 output
278 }
279 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
280 let (output, errs) =
281 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
282 errors.push(errs);
283 output
284 }
285 };
286 arrangement
287 }
288
289 fn build_distinct<'s>(
291 &self,
292 collection: VecCollection<'s, T, (Row, Row), Diff>,
293 mfp_after: Option<SafeMfpPlan>,
294 ) -> (
295 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
296 VecCollection<'s, T, DataflowErrorSer, Diff>,
297 ) {
298 let error_logger = self.error_logger();
299
300 let mut datums1 = DatumVec::new();
302 let mut datums2 = DatumVec::new();
303 let mfp_after1 = mfp_after.clone();
304 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
305
306 let arranged = collection
307 .mz_arrange::<
308 ColumnationChunker<_>,
309 RowRowBatcher<_, _>,
310 RowRowBuilder<_, _>,
311 RowRowSpine<_, _>,
312 >(
313 "Arranged DistinctBy",
314 );
315 let output = arranged
316 .clone()
317 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
318 "DistinctBy",
319 move |key, _input, output| {
320 let temp_storage = RowArena::new();
321 let mut datums_local = datums1.borrow();
322 key.extend_datums(&temp_storage, &mut datums_local, None);
323
324 if mfp_after1
328 .as_ref()
329 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
330 .unwrap_or(Ok(true))
331 == Ok(true)
332 {
333 output.push((Row::default(), Diff::ONE));
337 }
338 },
339 );
340 let errors = arranged.mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
341 "DistinctByErrorCheck",
342 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowErrorSer, _)>| {
343 for (_, count) in input.iter() {
344 if count.is_positive() {
345 continue;
346 }
347 let message = "Non-positive multiplicity in DistinctBy";
348 error_logger.log(message, &format!("row={key:?}, count={count}"));
349 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
350 return;
351 }
352 let Some(mfp) = &mfp_after2 else { return };
354 let temp_storage = RowArena::new();
355 let mut datums_local = datums2.borrow();
356 key.extend_datums(&temp_storage, &mut datums_local, None);
357
358 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
359 output.push((e.into(), Diff::ONE));
360 }
361 },
362 );
363 (output, errors.as_collection(|_k, v| v.clone()))
364 }
365
366 fn build_basic_aggregates<'s>(
374 &self,
375 input: VecCollection<'s, T, (Row, Row), Diff>,
376 aggrs: Vec<AggregateExpr>,
377 key_arity: usize,
378 mfp_after: Option<SafeMfpPlan>,
379 ) -> (
380 RowRowArrangement<'s, T>,
381 VecCollection<'s, T, DataflowErrorSer, Diff>,
382 ) {
383 if aggrs.len() <= 1 {
386 self.error_logger().soft_panic_or_log(
387 "Too few aggregations when building basic aggregates",
388 &format!("len={}", aggrs.len()),
389 )
390 }
391 let mut err_output = None;
392 let mut to_collect = Vec::new();
393 for (index, aggr) in aggrs.into_iter().enumerate() {
394 let (result, errs) = self.build_basic_aggregate(
395 input.clone(),
396 index,
397 &aggr,
398 err_output.is_none(),
399 key_arity,
400 None,
401 false,
402 );
403 if errs.is_some() {
404 err_output = errs
405 }
406 to_collect
407 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
408 }
409
410 let mut datums1 = DatumVec::new();
412 let mut datums2 = DatumVec::new();
413 let mfp_after1 = mfp_after.clone();
414 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
415
416 let arranged = differential_dataflow::collection::concatenate(input.scope(), to_collect)
417 .mz_arrange::<
418 ColumnationChunker<_>,
419 RowValBatcher<_, _, _>,
420 RowValBuilder<_, _, _>,
421 RowValSpine<_, _, _>,
422 >(
423 "Arranged ReduceFuseBasic input",
424 );
425
426 let output = arranged
427 .clone()
428 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceFuseBasic", {
429 move |key, input, output| {
430 let temp_storage = RowArena::new();
431 let mut datums_local = datums1.borrow();
432 key.extend_datums(&temp_storage, &mut datums_local, None);
433 let key_len = datums_local.len();
434
435 for ((_, row), _) in input.iter() {
436 datums_local.push(row.unpack_first());
437 }
438
439 if let Some(row) =
440 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
441 {
442 output.push((row, Diff::ONE));
443 }
444 }
445 });
446 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
451 if let Some(mfp) = mfp_after2 {
452 let mfp_errs = arranged
453 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
454 "ReduceFuseBasic Error Check",
455 move |key, input, output| {
456 let temp_storage = RowArena::new();
459 let mut datums_local = datums2.borrow();
460 key.extend_datums(&temp_storage, &mut datums_local, None);
461
462 for ((_, row), _) in input.iter() {
463 datums_local.push(row.unpack_first());
464 }
465
466 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
467 output.push((e.into(), Diff::ONE));
468 }
469 },
470 )
471 .as_collection(|_, v| v.clone());
472 (output, validation_errs.concat(mfp_errs))
473 } else {
474 (output, validation_errs)
475 }
476 }
477
478 fn build_basic_aggregate<'s>(
482 &self,
483 input: VecCollection<'s, T, (Row, Row), Diff>,
484 index: usize,
485 aggr: &AggregateExpr,
486 validating: bool,
487 key_arity: usize,
488 mfp_after: Option<SafeMfpPlan>,
489 fused_unnest_list: bool,
490 ) -> (
491 RowRowArrangement<'s, T>,
492 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
493 ) {
494 let AggregateExpr {
495 func,
496 expr: _,
497 distinct,
498 } = aggr.clone();
499
500 let mut partial = input.map(move |(key, row)| {
502 let mut row_builder = SharedRow::get();
503 let value = row.iter().nth(index).unwrap();
504 row_builder.packer().push(value);
505 (key, row_builder.clone())
506 });
507
508 let mut err_output = None;
509
510 if distinct {
512 let pairer = Pairer::new(key_arity);
514 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
515 if validating {
516 let (oks, errs) = self
517 .build_reduce_inaccumulable_distinct::<
518 RowValBuilder<Result<(), String>, _, _>,
519 RowValSpine<Result<(), String>, _, _>,
520 >(keyed, None)
521 .as_collection(|k, v| {
522 (
523 k.to_row(),
524 v.as_ref()
525 .map(|&()| ())
526 .map_err(|m| m.as_str().into()),
527 )
528 })
529 .map_fallible::<
530 CapacityContainerBuilder<_>,
531 CapacityContainerBuilder<_>,
532 _,
533 _,
534 _,
535 >(
536 "Demux Errors",
537 move |(key_val, result)| match result {
538 Ok(()) => Ok(pairer.split(&key_val)),
539 Err(m) => {
540 Err(EvalError::Internal(m).into())
541 }
542 },
543 );
544 err_output = Some(errs);
545 partial = oks;
546 } else {
547 partial = self
548 .build_reduce_inaccumulable_distinct::<RowBuilder<_, _>, RowSpine<_, _>>(
549 keyed,
550 Some(" [val: empty]"),
551 )
552 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
553 }
554 }
555
556 let mut datums1 = DatumVec::new();
558 let mut datums2 = DatumVec::new();
559 let mut datums_key_1 = DatumVec::new();
560 let mut datums_key_2 = DatumVec::new();
561 let mut vals1 = DatumVec::new();
565 let mut vals2 = DatumVec::new();
566 let mut vals_key_1 = DatumVec::new();
567 let mut vals_key_2 = DatumVec::new();
568 let mfp_after1 = mfp_after.clone();
569 let func2 = func.clone();
570
571 let name = if !fused_unnest_list {
572 "ReduceInaccumulable"
573 } else {
574 "FusedReduceUnnestList"
575 };
576 let arranged = partial
577 .mz_arrange::<
578 ColumnationChunker<_>,
579 RowRowBatcher<_, _>,
580 RowRowBuilder<_, _>,
581 RowRowSpine<_, _>,
582 >(&format!(
583 "Arranged {name}"
584 ));
585 let oks = if !fused_unnest_list {
586 arranged
587 .clone()
588 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
589 move |key, source, target| {
590 let temp_storage = RowArena::new();
591 let mut val_scratch = vals1.borrow();
598 let iter = source.iter().map(|(v, w)| {
599 val_scratch.clear();
600 v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
601 (val_scratch[0], *w)
602 });
603
604 let mut datums_local = datums1.borrow();
605 key.extend_datums(&temp_storage, &mut datums_local, None);
606 let key_len = datums_local.len();
607 datums_local.push(
608 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
611 iter,
612 &temp_storage,
613 ),
614 );
615
616 if let Some(row) = evaluate_mfp_after(
617 &mfp_after1,
618 &mut datums_local,
619 &temp_storage,
620 key_len,
621 ) {
622 target.push((row, Diff::ONE));
623 }
624 }
625 })
626 } else {
627 arranged
628 .clone()
629 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
630 move |key, source, target| {
631 let temp_storage = RowArena::new();
633 let mut val_scratch = vals_key_1.borrow();
634 let iter = source.iter().map(|(v, w)| {
635 val_scratch.clear();
636 v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
637 (val_scratch[0], *w)
638 });
639
640 let mut datums_local = datums_key_1.borrow();
642 key.extend_datums(&temp_storage, &mut datums_local, None);
643 let key_len = datums_local.len();
644 for datum in func
645 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
646 iter,
647 &temp_storage,
648 )
649 {
650 datums_local.truncate(key_len);
651 datums_local.push(datum);
652 if let Some(row) = evaluate_mfp_after(
653 &mfp_after1,
654 &mut datums_local,
655 &temp_storage,
656 key_len,
657 ) {
658 target.push((row, Diff::ONE));
659 }
660 }
661 }
662 })
663 };
664
665 let must_validate = validating && err_output.is_none();
669 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
670 if must_validate || mfp_after2.is_some() {
671 let error_logger = self.error_logger();
672
673 let errs = if !fused_unnest_list {
674 arranged
675 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
676 &format!("{name} Error Check"),
677 move |key, source, target| {
678 if must_validate {
682 for (value, count) in source.iter() {
683 if count.is_positive() {
684 continue;
685 }
686 let value = value.to_row();
687 let message =
688 "Non-positive accumulation in ReduceInaccumulable";
689 error_logger
690 .log(message, &format!("value={value:?}, count={count}"));
691 let err = EvalError::Internal(message.into());
692 target.push((err.into(), Diff::ONE));
693 return;
694 }
695 }
696
697 let Some(mfp) = &mfp_after2 else { return };
699 let temp_storage = RowArena::new();
700 let mut val_scratch = vals2.borrow();
701 let iter = source.iter().map(|(v, w)| {
702 val_scratch.clear();
703 v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
704 (val_scratch[0], *w)
705 });
706
707 let mut datums_local = datums2.borrow();
708 key.extend_datums(&temp_storage, &mut datums_local, None);
709 datums_local.push(
710 func2.eval_with_fast_window_agg::<
711 _,
712 window_agg_helpers::OneByOneAggrImpls,
713 >(
714 iter, &temp_storage
715 ),
716 );
717 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
718 target.push((e.into(), Diff::ONE));
719 }
720 },
721 )
722 .as_collection(|_, v| v.clone())
723 } else {
724 assert!(!must_validate);
726 let Some(mfp) = mfp_after2 else {
729 unreachable!()
730 };
731 arranged
732 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
733 &format!("{name} Error Check"),
734 move |key, source, target| {
735 let temp_storage = RowArena::new();
736 let mut val_scratch = vals_key_2.borrow();
737 let iter = source.iter().map(|(v, w)| {
738 val_scratch.clear();
739 v.extend_datums(&temp_storage, &mut val_scratch, Some(1));
740 (val_scratch[0], *w)
741 });
742
743 let mut datums_local = datums_key_2.borrow();
744 key.extend_datums(&temp_storage, &mut datums_local, None);
745 let key_len = datums_local.len();
746 for datum in func2
747 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
748 iter,
749 &temp_storage,
750 )
751 {
752 datums_local.truncate(key_len);
753 datums_local.push(datum);
754 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
757 {
758 target.push((e.into(), Diff::ONE));
759 }
760 }
761 },
762 )
763 .as_collection(|_, v| v.clone())
764 };
765
766 if let Some(e) = err_output {
767 err_output = Some(e.concat(errs));
768 } else {
769 err_output = Some(errs);
770 }
771 }
772 (oks, err_output)
773 }
774
775 fn build_reduce_inaccumulable_distinct<'s, Bu, Tr>(
776 &self,
777 input: VecCollection<'s, T, Row, Diff>,
778 name_tag: Option<&str>,
779 ) -> Arranged<'s, TraceAgent<Tr>>
780 where
781 Tr: for<'a> Trace<
782 Key<'a> = DatumSeq<'a>,
783 KeyContainer: BatchContainer<Owned = Row>,
784 Time = T,
785 Diff = Diff,
786 ValOwn: Data + MaybeValidatingRow<(), String>,
787 > + 'static,
788 Bu: Builder<
789 Time = T,
790 Input: Container
791 + ClearContainer
792 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
793 Output = Tr::Batch,
794 >,
795 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
796 {
797 let error_logger = self.error_logger();
798
799 let output_name = format!(
800 "ReduceInaccumulable Distinct{}",
801 name_tag.unwrap_or_default()
802 );
803
804 let input: KeyCollection<_, _, _> = input.into();
805 input
806 .mz_arrange::<
807 ColumnationChunker<_>,
808 RowBatcher<_, _>,
809 RowBuilder<_, _>,
810 RowSpine<_, _>,
811 >(
812 "Arranged ReduceInaccumulable Distinct [val: empty]",
813 )
814 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
815 if let Some(err) = Tr::ValOwn::into_error() {
816 for (value, count) in source.iter() {
817 if count.is_positive() {
818 continue;
819 }
820
821 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
822 error_logger.log(message, &format!("value={value:?}, count={count}"));
823 t.push((err(message.to_string()), Diff::ONE));
824 return;
825 }
826 }
827 t.push((Tr::ValOwn::ok(()), Diff::ONE))
828 })
829 }
830
831 fn build_bucketed<'s>(
849 &self,
850 input: VecCollection<'s, T, (Row, Row), Diff>,
851 BucketedPlan {
852 aggr_funcs,
853 buckets,
854 }: BucketedPlan,
855 key_arity: usize,
856 mfp_after: Option<SafeMfpPlan>,
857 ) -> (
858 RowRowArrangement<'s, T>,
859 VecCollection<'s, T, DataflowErrorSer, Diff>,
860 ) {
861 let mut err_output: Option<VecCollection<'s, T, _, _>> = None;
862 let outer_scope = input.scope();
863 let arranged_output = outer_scope
864 .clone()
865 .region_named("ReduceHierarchical", |inner| {
866 let input = input.enter(inner);
867
868 let first_mod = buckets.get(0).copied().unwrap_or(1);
870 let aggregations = aggr_funcs.len();
871
872 let mut stage = input.map(move |(key, row)| {
874 let mut row_builder = SharedRow::get();
875 let mut row_packer = row_builder.packer();
876 row_packer.extend(row.iter().take(aggregations));
877 let values = row_builder.clone();
878
879 let hash = values.hashed() % first_mod;
881 let hash_key =
882 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
883 (hash_key, values)
884 });
885
886 for (index, b) in buckets.into_iter().enumerate() {
888 let input = if index == 0 {
890 stage
891 } else {
892 stage.map(move |(hash_key, values)| {
893 let mut hash_key_iter = hash_key.iter();
894 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
895 let hash_key = SharedRow::pack(
897 std::iter::once(Datum::from(hash))
898 .chain(hash_key_iter.take(key_arity)),
899 );
900 (hash_key, values)
901 })
902 };
903
904 let validating = err_output.is_none();
908
909 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, input, validating);
910 if let Some(errs) = errs {
911 err_output = Some(errs.leave_region(outer_scope));
912 }
913 stage = oks
914 }
915
916 let partial = stage.map(move |(hash_key, values)| {
918 let mut hash_key_iter = hash_key.iter();
919 let _hash = hash_key_iter.next();
920 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
921 });
922
923 let mut datums1 = DatumVec::new();
925 let mut datums2 = DatumVec::new();
926 let mut vals1 = DatumVec::new();
930 let mut vals2 = DatumVec::new();
931 let mfp_after1 = mfp_after.clone();
932 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
933 let aggr_funcs2 = aggr_funcs.clone();
934
935 let error_logger = self.error_logger();
938 let arranged = partial
941 .mz_arrange::<
942 ColumnationChunker<_>,
943 RowRowBatcher<_, _>,
944 RowRowBuilder<_, _>,
945 RowRowSpine<_, _>,
946 >(
947 "Arrange ReduceMinsMaxes",
948 );
949 let must_validate = err_output.is_none();
953 if must_validate || mfp_after2.is_some() {
954 let errs = arranged
955 .clone()
956 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
957 "ReduceMinsMaxes Error Check",
958 move |key, source, target| {
959 if must_validate {
963 for (val, count) in source.iter() {
964 if count.is_positive() {
965 continue;
966 }
967 let val = val.to_row();
968 let message =
969 "Non-positive accumulation in ReduceMinsMaxes";
970 error_logger
971 .log(message, &format!("val={val:?}, count={count}"));
972 target.push((
973 EvalError::Internal(message.into()).into(),
974 Diff::ONE,
975 ));
976 return;
977 }
978 }
979
980 let Some(mfp) = &mfp_after2 else { return };
982 let temp_storage = RowArena::new();
983 let mut datums_local = datums2.borrow();
984 key.extend_datums(&temp_storage, &mut datums_local, None);
985
986 let arity = aggr_funcs2.len();
991 let mut decoded = vals2.borrow();
992 for (values, _cnt) in source.iter() {
993 values.extend_datums(&temp_storage, &mut decoded, None);
994 }
995 assert_eq!(decoded.len(), source.len() * arity);
996 for (col, func) in aggr_funcs2.iter().enumerate() {
997 let column_iter = (0..source.len())
998 .map(|r| (decoded[r * arity + col], Diff::ONE));
999 datums_local.push(func.eval(column_iter, &temp_storage));
1000 }
1001 if let Result::Err(e) =
1002 mfp.evaluate_inner(&mut datums_local, &temp_storage)
1003 {
1004 target.push((e.into(), Diff::ONE));
1005 }
1006 },
1007 )
1008 .as_collection(|_, v| v.clone())
1009 .leave_region(outer_scope);
1010 if let Some(e) = err_output.take() {
1011 err_output = Some(e.concat(errs));
1012 } else {
1013 err_output = Some(errs);
1014 }
1015 }
1016 arranged
1017 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1018 "ReduceMinsMaxes",
1019 move |key, source, target| {
1020 let temp_storage = RowArena::new();
1021 let mut datums_local = datums1.borrow();
1022 key.extend_datums(&temp_storage, &mut datums_local, None);
1023 let key_len = datums_local.len();
1024
1025 let arity = aggr_funcs.len();
1030 let mut decoded = vals1.borrow();
1031 for (values, _cnt) in source.iter() {
1032 values.extend_datums(&temp_storage, &mut decoded, None);
1033 }
1034 assert_eq!(decoded.len(), source.len() * arity);
1035 for (col, func) in aggr_funcs.iter().enumerate() {
1036 let column_iter = (0..source.len())
1037 .map(|r| (decoded[r * arity + col], Diff::ONE));
1038 datums_local.push(func.eval(column_iter, &temp_storage));
1039 }
1040
1041 if let Some(row) = evaluate_mfp_after(
1042 &mfp_after1,
1043 &mut datums_local,
1044 &temp_storage,
1045 key_len,
1046 ) {
1047 target.push((row, Diff::ONE));
1048 }
1049 },
1050 )
1051 .leave_region(outer_scope)
1052 });
1053 (
1054 arranged_output,
1055 err_output.expect("expected to validate in one level of the hierarchy"),
1056 )
1057 }
1058
1059 fn build_bucketed_stage<'s>(
1066 &self,
1067 aggr_funcs: &Vec<AggregateFunc>,
1068 input: VecCollection<'s, T, (Row, Row), Diff>,
1069 validating: bool,
1070 ) -> (
1071 VecCollection<'s, T, (Row, Row), Diff>,
1072 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
1073 ) {
1074 let (input, negated_output, errs) = if validating {
1075 let (input, reduced) = self
1076 .build_bucketed_negated_output::<
1077 RowValBuilder<_, _, _>,
1078 RowValSpine<Result<Row, Row>, _, _>,
1079 >(
1080 input.clone(),
1081 aggr_funcs.clone(),
1082 );
1083 let (oks, errs) = reduced
1084 .as_collection(|k, v| (k.to_row(), v.clone()))
1085 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1086 "Checked Invalid Accumulations",
1087 |(hash_key, result)| match result {
1088 Err(hash_key) => {
1089 let mut hash_key_iter = hash_key.iter();
1090 let _hash = hash_key_iter.next();
1091 let key = SharedRow::pack(hash_key_iter);
1092 let message = format!(
1093 "Invalid data in source, saw non-positive accumulation \
1094 for key {key:?} in hierarchical mins-maxes aggregate"
1095 );
1096 Err(EvalError::Internal(message.into()).into())
1097 }
1098 Ok(values) => Ok((hash_key, values)),
1099 },
1100 );
1101 (input, oks, Some(errs))
1102 } else {
1103 let (input, reduced) = self
1104 .build_bucketed_negated_output::<RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1105 input,
1106 aggr_funcs.clone(),
1107 );
1108 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1111 (input, oks, None)
1112 };
1113
1114 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1115 let oks = negated_output.concat(input);
1116 (oks, errs)
1117 }
1118
1119 fn build_bucketed_negated_output<'s, Bu, Tr>(
1123 &self,
1124 input: VecCollection<'s, T, (Row, Row), Diff>,
1125 aggrs: Vec<AggregateFunc>,
1126 ) -> (
1127 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
1128 Arranged<'s, TraceAgent<Tr>>,
1129 )
1130 where
1131 Tr: for<'a> Trace<
1132 Key<'a> = DatumSeq<'a>,
1133 KeyContainer: BatchContainer<Owned = Row>,
1134 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1135 Time = T,
1136 Diff = Diff,
1137 > + 'static,
1138 Bu: Builder<
1139 Time = T,
1140 Input: Container
1141 + ClearContainer
1142 + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1143 Output = Tr::Batch,
1144 >,
1145 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
1146 {
1147 let error_logger = self.error_logger();
1148 let arranged_input = input
1151 .mz_arrange::<
1152 ColumnationChunker<_>,
1153 RowRowBatcher<_, _>,
1154 RowRowBuilder<_, _>,
1155 RowRowSpine<_, _>,
1156 >(
1157 "Arranged MinsMaxesHierarchical input",
1158 );
1159
1160 let mut value_datums = DatumVec::new();
1163 let reduced = arranged_input.clone().mz_reduce_abelian::<_, Bu, Tr>(
1164 "Reduced Fallibly MinsMaxesHierarchical",
1165 move |key, source, target| {
1166 if let Some(err) = Tr::ValOwn::into_error() {
1167 for (value, count) in source.iter() {
1169 if count.is_positive() {
1170 continue;
1171 }
1172 error_logger.log(
1173 "Non-positive accumulation in MinsMaxesHierarchical",
1174 &format!("key={key:?}, value={value:?}, count={count}"),
1175 );
1176 target.push((
1179 err(<Tr::KeyContainer as BatchContainer>::into_owned(key)),
1180 Diff::ONE,
1181 ));
1182 return;
1183 }
1184 }
1185
1186 let temp_storage = RowArena::new();
1189 let arity = aggrs.len();
1190 let mut decoded = value_datums.borrow();
1191 for (values, _cnt) in source.iter() {
1192 values.extend_datums(&temp_storage, &mut decoded, None);
1193 }
1194 assert_eq!(decoded.len(), source.len() * arity);
1195
1196 let mut row_builder = SharedRow::get();
1197 let mut row_packer = row_builder.packer();
1198 for (col, func) in aggrs.iter().enumerate() {
1199 let column_iter =
1202 (0..source.len()).map(|r| (decoded[r * arity + col], Diff::ONE));
1203 row_packer.push(func.eval(column_iter, &temp_storage));
1204 }
1205 target.reserve(source.len().saturating_add(1));
1211 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1212 target.extend(source.iter().map(|(values, cnt)| {
1213 let mut cnt = *cnt;
1214 cnt.negate();
1215 (Tr::ValOwn::ok(values.to_row()), cnt)
1216 }));
1217 },
1218 );
1219 (arranged_input, reduced)
1220 }
1221
1222 fn build_monotonic<'s>(
1225 &self,
1226 collection: VecCollection<'s, T, (Row, Row), Diff>,
1227 MonotonicPlan {
1228 aggr_funcs,
1229 must_consolidate,
1230 }: MonotonicPlan,
1231 mfp_after: Option<SafeMfpPlan>,
1232 ) -> (
1233 RowRowArrangement<'s, T>,
1234 VecCollection<'s, T, DataflowErrorSer, Diff>,
1235 ) {
1236 let aggregations = aggr_funcs.len();
1237 let collection = collection
1239 .map(move |(key, row)| {
1240 let mut row_builder = SharedRow::get();
1241 let mut values = Vec::with_capacity(aggregations);
1242 values.extend(
1243 row.iter()
1244 .take(aggregations)
1245 .map(|v| row_builder.pack_using(std::iter::once(v))),
1246 );
1247
1248 (key, values)
1249 })
1250 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1251 must_consolidate,
1252 "Consolidated ReduceMonotonic input",
1253 );
1254
1255 let error_logger = self.error_logger();
1257 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1258 error_logger.log(
1259 "Non-monotonic input to ReduceMonotonic",
1260 &format!("data={data:?}, diff={diff}"),
1261 );
1262 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1263 (EvalError::Internal(m).into(), Diff::ONE)
1264 });
1265 let partial = partial.explode_one(move |(key, values)| {
1269 let mut output = Vec::new();
1270 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1271 output.push(monoids::get_monoid(row, func).expect(
1272 "hierarchical aggregations are expected to have monoid implementations",
1273 ));
1274 }
1275 (key, output)
1276 });
1277
1278 let mut datums1 = DatumVec::new();
1280 let mut datums2 = DatumVec::new();
1281 let mfp_after1 = mfp_after.clone();
1282 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1283
1284 let partial: KeyCollection<_, _, _> = partial.into();
1285 let arranged = partial
1286 .mz_arrange::<
1287 ColumnationChunker<_>,
1288 RowBatcher<_, _>,
1289 RowBuilder<_, _>,
1290 RowSpine<_, Vec<ReductionMonoid>>,
1291 >(
1292 "ArrangeMonotonic [val: empty]",
1293 );
1294 let output = arranged
1295 .clone()
1296 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceMonotonic", {
1297 move |key, input, output| {
1298 let temp_storage = RowArena::new();
1299 let mut datums_local = datums1.borrow();
1300 key.extend_datums(&temp_storage, &mut datums_local, None);
1301 let key_len = datums_local.len();
1302 let accum = &input[0].1;
1303 for monoid in accum.iter() {
1304 datums_local.extend(monoid.finalize().iter());
1305 }
1306
1307 if let Some(row) =
1308 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1309 {
1310 output.push((row, Diff::ONE));
1311 }
1312 }
1313 });
1314
1315 if let Some(mfp) = mfp_after2 {
1320 let mfp_errs = arranged
1321 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1322 "ReduceMonotonic Error Check",
1323 move |key, input, output| {
1324 let temp_storage = RowArena::new();
1325 let mut datums_local = datums2.borrow();
1326 key.extend_datums(&temp_storage, &mut datums_local, None);
1327 let accum = &input[0].1;
1328 for monoid in accum.iter() {
1329 datums_local.extend(monoid.finalize().iter());
1330 }
1331 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1332 {
1333 output.push((e.into(), Diff::ONE));
1334 }
1335 },
1336 )
1337 .as_collection(|_k, v| v.clone());
1338 (output, validation_errs.concat(mfp_errs))
1339 } else {
1340 (output, validation_errs)
1341 }
1342 }
1343
1344 fn build_accumulable<'s>(
1351 &self,
1352 collection: VecCollection<'s, T, (Row, Row), Diff>,
1353 AccumulablePlan {
1354 full_aggrs,
1355 simple_aggrs,
1356 distinct_aggrs,
1357 }: AccumulablePlan,
1358 key_arity: usize,
1359 mfp_after: Option<SafeMfpPlan>,
1360 ) -> (
1361 RowRowArrangement<'s, T>,
1362 VecCollection<'s, T, DataflowErrorSer, Diff>,
1363 ) {
1364 let collection_scope = collection.scope();
1365
1366 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1368 self.error_logger().soft_panic_or_log(
1369 "Incorrect numbers of aggregates in accummulable reduction rendering",
1370 &format!(
1371 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1372 full_aggrs.len(),
1373 simple_aggrs.len(),
1374 distinct_aggrs.len(),
1375 ),
1376 );
1377 }
1378
1379 let zero_diffs: (Vec<_>, Diff) = (
1391 full_aggrs
1392 .iter()
1393 .map(|f| accumulable_zero(&f.func))
1394 .collect(),
1395 Diff::ZERO,
1396 );
1397
1398 let mut to_aggregate = Vec::new();
1399 if simple_aggrs.len() > 0 {
1400 let collection = collection.clone();
1402 let easy_cases = collection.explode_one({
1403 let zero_diffs = zero_diffs.clone();
1404 move |(key, row)| {
1405 let mut diffs = zero_diffs.clone();
1406 let mut row_iter = row.iter().enumerate();
1412 for (datum_index, aggr) in simple_aggrs.iter() {
1413 let mut datum = row_iter.next().unwrap();
1414 while datum_index != &datum.0 {
1415 datum = row_iter.next().unwrap();
1416 }
1417 let datum = datum.1;
1418 diffs.0[*datum_index] = datum_to_accumulator(&aggr.func, datum);
1419 diffs.1 = Diff::ONE;
1420 }
1421 ((key, ()), diffs)
1422 }
1423 });
1424 to_aggregate.push(easy_cases);
1425 }
1426
1427 for (datum_index, aggr) in distinct_aggrs.into_iter() {
1429 let pairer = Pairer::new(key_arity);
1430 let collection = collection
1431 .clone()
1432 .map(move |(key, row)| {
1433 let value = row.iter().nth(datum_index).unwrap();
1434 (pairer.merge(&key, std::iter::once(value)), ())
1435 })
1436 .mz_arrange::<
1437 ColumnationChunker<_>,
1438 RowBatcher<_, _>,
1439 RowBuilder<_, _>,
1440 RowSpine<_, _>,
1441 >(
1442 "Arranged Accumulable Distinct [val: empty]",
1443 )
1444 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1445 "Reduced Accumulable Distinct [val: empty]",
1446 move |_k, _s, t| t.push(((), Diff::ONE)),
1447 )
1448 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1449 .explode_one({
1450 let zero_diffs = zero_diffs.clone();
1451 move |(key, row)| {
1452 let datum = row.iter().next().unwrap();
1453 let mut diffs = zero_diffs.clone();
1454 diffs.0[datum_index] = datum_to_accumulator(&aggr.func, datum);
1455 diffs.1 = Diff::ONE;
1456 ((key, ()), diffs)
1457 }
1458 });
1459 to_aggregate.push(collection);
1460 }
1461
1462 let collection = if to_aggregate.len() == 1 {
1464 to_aggregate.remove(0)
1465 } else {
1466 differential_dataflow::collection::concatenate(collection_scope, to_aggregate)
1467 };
1468
1469 let mut datums1 = DatumVec::new();
1471 let mut datums2 = DatumVec::new();
1472 let mfp_after1 = mfp_after.clone();
1473 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1474 let full_aggrs2 = full_aggrs.clone();
1475
1476 let error_logger = self.error_logger();
1477 let err_full_aggrs = full_aggrs.clone();
1478 let arranged = collection
1479 .mz_arrange::<
1480 ColumnationChunker<_>,
1481 RowBatcher<_, _>,
1482 RowBuilder<_, _>,
1483 RowSpine<_, (Vec<Accum>, Diff)>,
1484 >(
1485 "ArrangeAccumulable [val: empty]",
1486 );
1487 let arranged_output = arranged
1488 .clone()
1489 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>("ReduceAccumulable", {
1490 move |key, input, output| {
1491 let (ref accums, total) = input[0].1;
1492
1493 let temp_storage = RowArena::new();
1494 let mut datums_local = datums1.borrow();
1495 key.extend_datums(&temp_storage, &mut datums_local, None);
1496 let key_len = datums_local.len();
1497 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1498 datums_local.push(finalize_accum(&aggr.func, accum, total));
1499 }
1500
1501 if let Some(row) =
1502 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1503 {
1504 output.push((row, Diff::ONE));
1505 }
1506 }
1507 });
1508 let arranged_errs = arranged
1509 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1510 "AccumulableErrorCheck",
1511 move |key, input, output| {
1512 let (ref accums, total) = input[0].1;
1513 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1514 if total == Diff::ZERO && !accum.is_zero() {
1517 error_logger.log(
1518 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1519 &format!("aggr={aggr:?}, accum={accum:?}"),
1520 );
1521 let key = key.to_row();
1522 let message = format!(
1523 "Invalid data in source, saw net-zero records for key {key} \
1524 with non-zero accumulation in accumulable aggregate"
1525 );
1526 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1527 }
1528 match (&aggr.func, &accum) {
1529 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1530 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1531 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1532 if accum.is_negative() {
1533 error_logger.log(
1534 "Invalid negative unsigned aggregation in ReduceAccumulable",
1535 &format!("aggr={aggr:?}, accum={accum:?}"),
1536 );
1537 let key = key.to_row();
1538 let message = format!(
1539 "Invalid data in source, saw negative accumulation with \
1540 unsigned type for key {key}"
1541 );
1542 let err = EvalError::Internal(message.into());
1543 output.push((err.into(), Diff::ONE));
1544 }
1545 }
1546 _ => (), }
1548 }
1549
1550 let Some(mfp) = &mfp_after2 else { return };
1552 let temp_storage = RowArena::new();
1553 let mut datums_local = datums2.borrow();
1554 key.extend_datums(&temp_storage, &mut datums_local, None);
1555 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1556 datums_local.push(finalize_accum(&aggr.func, accum, total));
1557 }
1558
1559 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1560 output.push((e.into(), Diff::ONE));
1561 }
1562 },
1563 );
1564 (
1565 arranged_output,
1566 arranged_errs.as_collection(|_key, error| error.clone()),
1567 )
1568 }
1569}
1570
1571fn evaluate_mfp_after<'a, 'b>(
1575 mfp_after: &'a Option<SafeMfpPlan>,
1576 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1577 temp_storage: &'a RowArena,
1578 key_len: usize,
1579) -> Option<Row> {
1580 let mut row_builder = SharedRow::get();
1581 if let Some(mfp) = mfp_after {
1584 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1587 Some(row_builder.pack_using(iter.skip(key_len)))
1590 } else {
1591 None
1592 }
1593 } else {
1594 Some(row_builder.pack_using(&datums_local[key_len..]))
1595 }
1596}
1597
1598fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1599 match aggr_func {
1600 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1601 trues: Diff::ZERO,
1602 falses: Diff::ZERO,
1603 },
1604 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1605 accum: AccumCount::ZERO,
1606 pos_infs: Diff::ZERO,
1607 neg_infs: Diff::ZERO,
1608 nans: Diff::ZERO,
1609 non_nulls: Diff::ZERO,
1610 },
1611 AggregateFunc::SumNumeric => Accum::Numeric {
1612 accum: OrderedDecimal(NumericAgg::zero()),
1613 pos_infs: Diff::ZERO,
1614 neg_infs: Diff::ZERO,
1615 nans: Diff::ZERO,
1616 non_nulls: Diff::ZERO,
1617 },
1618 _ => Accum::SimpleNumber {
1619 accum: AccumCount::ZERO,
1620 non_nulls: Diff::ZERO,
1621 },
1622 }
1623}
1624
1625const FLOAT_SCALE_EXP: u32 = 24;
1629
1630#[allow(clippy::as_conversions)] const FLOAT_SCALE: f64 = (1_u64 << FLOAT_SCALE_EXP) as f64;
1633
1634fn float_to_fixed_point(n: f64) -> i128 {
1651 debug_assert!(n.is_finite());
1652
1653 let (mantissa, exponent, sign) = Float::integer_decode(n);
1657 let significand = u128::from(mantissa);
1658 let exp = i64::from(exponent) + i64::from(FLOAT_SCALE_EXP);
1659
1660 let magnitude: u128 = if exp >= 0 {
1661 match u32::try_from(exp) {
1664 Ok(shift) if shift < 128 => significand << shift,
1665 _ => 0,
1666 }
1667 } else {
1668 match u32::try_from(-exp) {
1671 Ok(shift) if shift < 128 => significand >> shift,
1672 _ => 0,
1673 }
1674 };
1675
1676 let magnitude = magnitude.cast_signed();
1679 if sign < 0 {
1680 magnitude.wrapping_neg()
1681 } else {
1682 magnitude
1683 }
1684}
1685
1686fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1687 match aggregate_func {
1688 AggregateFunc::Count => Accum::SimpleNumber {
1689 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1691 Diff::ZERO
1692 } else {
1693 Diff::ONE
1694 },
1695 },
1696 AggregateFunc::Any | AggregateFunc::All => match datum {
1697 Datum::True => Accum::Bool {
1698 trues: Diff::ONE,
1699 falses: Diff::ZERO,
1700 },
1701 Datum::Null => Accum::Bool {
1702 trues: Diff::ZERO,
1703 falses: Diff::ZERO,
1704 },
1705 Datum::False => Accum::Bool {
1706 trues: Diff::ZERO,
1707 falses: Diff::ONE,
1708 },
1709 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1710 },
1711 AggregateFunc::Dummy => match datum {
1712 Datum::Dummy => Accum::SimpleNumber {
1713 accum: AccumCount::ZERO,
1714 non_nulls: Diff::ZERO,
1715 },
1716 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1717 },
1718 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1719 let n = match datum {
1720 Datum::Float32(n) => f64::from(*n),
1721 Datum::Float64(n) => *n,
1722 Datum::Null => 0f64,
1723 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1724 };
1725
1726 let nans = Diff::from(n.is_nan());
1727 let pos_infs = Diff::from(n == f64::INFINITY);
1728 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1729 let non_nulls = Diff::from(datum != Datum::Null);
1730
1731 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1734 AccumCount::ZERO
1735 } else {
1736 float_to_fixed_point(n).into()
1740 };
1741
1742 Accum::Float {
1743 accum,
1744 pos_infs,
1745 neg_infs,
1746 nans,
1747 non_nulls,
1748 }
1749 }
1750 AggregateFunc::SumNumeric => match datum {
1751 Datum::Numeric(n) => {
1752 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1753 if n.0.is_negative() {
1754 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1755 } else {
1756 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1757 }
1758 } else if n.0.is_nan() {
1759 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1760 } else {
1761 let mut cx_agg = numeric::cx_agg();
1764 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1765 };
1766
1767 Accum::Numeric {
1768 accum: OrderedDecimal(accum),
1769 pos_infs,
1770 neg_infs,
1771 nans,
1772 non_nulls: Diff::ONE,
1773 }
1774 }
1775 Datum::Null => Accum::Numeric {
1776 accum: OrderedDecimal(NumericAgg::zero()),
1777 pos_infs: Diff::ZERO,
1778 neg_infs: Diff::ZERO,
1779 nans: Diff::ZERO,
1780 non_nulls: Diff::ZERO,
1781 },
1782 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1783 },
1784 _ => {
1785 match datum {
1789 Datum::Int16(i) => Accum::SimpleNumber {
1790 accum: i.into(),
1791 non_nulls: Diff::ONE,
1792 },
1793 Datum::Int32(i) => Accum::SimpleNumber {
1794 accum: i.into(),
1795 non_nulls: Diff::ONE,
1796 },
1797 Datum::Int64(i) => Accum::SimpleNumber {
1798 accum: i.into(),
1799 non_nulls: Diff::ONE,
1800 },
1801 Datum::UInt16(u) => Accum::SimpleNumber {
1802 accum: u.into(),
1803 non_nulls: Diff::ONE,
1804 },
1805 Datum::UInt32(u) => Accum::SimpleNumber {
1806 accum: u.into(),
1807 non_nulls: Diff::ONE,
1808 },
1809 Datum::UInt64(u) => Accum::SimpleNumber {
1810 accum: u.into(),
1811 non_nulls: Diff::ONE,
1812 },
1813 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1814 accum: u64::from(t).into(),
1815 non_nulls: Diff::ONE,
1816 },
1817 Datum::Null => Accum::SimpleNumber {
1818 accum: AccumCount::ZERO,
1819 non_nulls: Diff::ZERO,
1820 },
1821 x => panic!("Accumulating non-integer data: {x:?}"),
1822 }
1823 }
1824 }
1825}
1826
1827fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1828 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1832 Datum::Null
1833 } else {
1834 match (&aggr_func, &accum) {
1835 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1836 Datum::Int64(non_nulls.into_inner())
1837 }
1838 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1839 if falses.is_positive() {
1841 Datum::False
1842 } else if *trues == total {
1843 Datum::True
1844 } else {
1845 Datum::Null
1846 }
1847 }
1848 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1849 if trues.is_positive() {
1851 Datum::True
1852 } else if *falses == total {
1853 Datum::False
1854 } else {
1855 Datum::Null
1856 }
1857 }
1858 (AggregateFunc::Dummy, _) => Datum::Dummy,
1859 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1861 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1862 #[allow(clippy::as_conversions)]
1867 Datum::Int64(accum.into_inner() as i64)
1868 }
1869 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1870 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1871 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1872 if !accum.is_negative() {
1873 #[allow(clippy::as_conversions)]
1879 Datum::UInt64(accum.into_inner() as u64)
1880 } else {
1881 Datum::Null
1885 }
1886 }
1887 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1888 if !accum.is_negative() {
1889 Datum::from(*accum)
1890 } else {
1891 Datum::Null
1895 }
1896 }
1897 (
1898 AggregateFunc::SumFloat32,
1899 Accum::Float {
1900 accum,
1901 pos_infs,
1902 neg_infs,
1903 nans,
1904 non_nulls: _,
1905 },
1906 ) => {
1907 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1908 Datum::from(f32::NAN)
1911 } else if pos_infs.is_positive() {
1912 Datum::from(f32::INFINITY)
1913 } else if neg_infs.is_positive() {
1914 Datum::from(f32::NEG_INFINITY)
1915 } else {
1916 let sum = f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE;
1917 Datum::from(f32::cast_lossy(sum))
1918 }
1919 }
1920 (
1921 AggregateFunc::SumFloat64,
1922 Accum::Float {
1923 accum,
1924 pos_infs,
1925 neg_infs,
1926 nans,
1927 non_nulls: _,
1928 },
1929 ) => {
1930 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1931 Datum::from(f64::NAN)
1934 } else if pos_infs.is_positive() {
1935 Datum::from(f64::INFINITY)
1936 } else if neg_infs.is_positive() {
1937 Datum::from(f64::NEG_INFINITY)
1938 } else {
1939 Datum::from(f64::cast_lossy(accum.into_inner()) / FLOAT_SCALE)
1940 }
1941 }
1942 (
1943 AggregateFunc::SumNumeric,
1944 Accum::Numeric {
1945 accum,
1946 pos_infs,
1947 neg_infs,
1948 nans,
1949 non_nulls: _,
1950 },
1951 ) => {
1952 let mut cx_datum = numeric::cx_datum();
1953 let d = cx_datum.to_width(accum.0);
1954 let inf_d = d.is_infinite();
1960 let neg_d = d.is_negative();
1961 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
1962 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
1963 if nans.is_positive() || (pos_inf && neg_inf) {
1964 Datum::from(Numeric::nan())
1967 } else if pos_inf {
1968 Datum::from(Numeric::infinity())
1969 } else if neg_inf {
1970 let mut cx = numeric::cx_datum();
1971 let mut d = Numeric::infinity();
1972 cx.neg(&mut d);
1973 Datum::from(d)
1974 } else {
1975 Datum::from(d)
1976 }
1977 }
1978 _ => panic!(
1979 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
1980 aggr_func
1981 ),
1982 }
1983 }
1984}
1985
1986type AccumCount = mz_ore::Overflowing<i128>;
1988
1989#[derive(
2000 Debug,
2001 Clone,
2002 Copy,
2003 PartialEq,
2004 Eq,
2005 PartialOrd,
2006 Ord,
2007 Serialize,
2008 Deserialize
2009)]
2010enum Accum {
2011 Bool {
2013 trues: Diff,
2015 falses: Diff,
2017 },
2018 SimpleNumber {
2020 accum: AccumCount,
2022 non_nulls: Diff,
2024 },
2025 Float {
2027 accum: AccumCount,
2030 pos_infs: Diff,
2032 neg_infs: Diff,
2034 nans: Diff,
2036 non_nulls: Diff,
2038 },
2039 Numeric {
2041 accum: OrderedDecimal<NumericAgg>,
2043 pos_infs: Diff,
2045 neg_infs: Diff,
2047 nans: Diff,
2049 non_nulls: Diff,
2051 },
2052}
2053
2054impl IsZero for Accum {
2055 fn is_zero(&self) -> bool {
2056 match self {
2057 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2058 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2059 Accum::Float {
2060 accum,
2061 pos_infs,
2062 neg_infs,
2063 nans,
2064 non_nulls,
2065 } => {
2066 accum.is_zero()
2067 && pos_infs.is_zero()
2068 && neg_infs.is_zero()
2069 && nans.is_zero()
2070 && non_nulls.is_zero()
2071 }
2072 Accum::Numeric {
2073 accum,
2074 pos_infs,
2075 neg_infs,
2076 nans,
2077 non_nulls,
2078 } => {
2079 accum.0.is_zero()
2080 && pos_infs.is_zero()
2081 && neg_infs.is_zero()
2082 && nans.is_zero()
2083 && non_nulls.is_zero()
2084 }
2085 }
2086 }
2087}
2088
2089impl Semigroup for Accum {
2090 fn plus_equals(&mut self, other: &Accum) {
2091 match (&mut *self, other) {
2092 (
2093 Accum::Bool { trues, falses },
2094 Accum::Bool {
2095 trues: other_trues,
2096 falses: other_falses,
2097 },
2098 ) => {
2099 *trues += other_trues;
2100 *falses += other_falses;
2101 }
2102 (
2103 Accum::SimpleNumber { accum, non_nulls },
2104 Accum::SimpleNumber {
2105 accum: other_accum,
2106 non_nulls: other_non_nulls,
2107 },
2108 ) => {
2109 *accum += other_accum;
2110 *non_nulls += other_non_nulls;
2111 }
2112 (
2113 Accum::Float {
2114 accum,
2115 pos_infs,
2116 neg_infs,
2117 nans,
2118 non_nulls,
2119 },
2120 Accum::Float {
2121 accum: other_accum,
2122 pos_infs: other_pos_infs,
2123 neg_infs: other_neg_infs,
2124 nans: other_nans,
2125 non_nulls: other_non_nulls,
2126 },
2127 ) => {
2128 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2129 warn!("Float accumulator overflow. Incorrect results possible");
2130 accum.wrapping_add(*other_accum)
2131 });
2132 *pos_infs += other_pos_infs;
2133 *neg_infs += other_neg_infs;
2134 *nans += other_nans;
2135 *non_nulls += other_non_nulls;
2136 }
2137 (
2138 Accum::Numeric {
2139 accum,
2140 pos_infs,
2141 neg_infs,
2142 nans,
2143 non_nulls,
2144 },
2145 Accum::Numeric {
2146 accum: other_accum,
2147 pos_infs: other_pos_infs,
2148 neg_infs: other_neg_infs,
2149 nans: other_nans,
2150 non_nulls: other_non_nulls,
2151 },
2152 ) => {
2153 let mut cx_agg = numeric::cx_agg();
2154 cx_agg.add(&mut accum.0, &other_accum.0);
2155 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2161 cx_agg.reduce(&mut accum.0);
2180 *pos_infs += other_pos_infs;
2181 *neg_infs += other_neg_infs;
2182 *nans += other_nans;
2183 *non_nulls += other_non_nulls;
2184 }
2185 (l, r) => unreachable!(
2186 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2187 ),
2188 }
2189 }
2190}
2191
2192impl Multiply<Diff> for Accum {
2193 type Output = Accum;
2194
2195 fn multiply(self, factor: &Diff) -> Accum {
2196 let factor = *factor;
2197 match self {
2198 Accum::Bool { trues, falses } => Accum::Bool {
2199 trues: trues * factor,
2200 falses: falses * factor,
2201 },
2202 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2203 accum: accum * AccumCount::from(factor),
2204 non_nulls: non_nulls * factor,
2205 },
2206 Accum::Float {
2207 accum,
2208 pos_infs,
2209 neg_infs,
2210 nans,
2211 non_nulls,
2212 } => Accum::Float {
2213 accum: accum
2214 .checked_mul(AccumCount::from(factor))
2215 .unwrap_or_else(|| {
2216 warn!("Float accumulator overflow. Incorrect results possible");
2217 accum.wrapping_mul(AccumCount::from(factor))
2218 }),
2219 pos_infs: pos_infs * factor,
2220 neg_infs: neg_infs * factor,
2221 nans: nans * factor,
2222 non_nulls: non_nulls * factor,
2223 },
2224 Accum::Numeric {
2225 accum,
2226 pos_infs,
2227 neg_infs,
2228 nans,
2229 non_nulls,
2230 } => {
2231 let mut cx = numeric::cx_agg();
2232 let mut f = NumericAgg::from(factor.into_inner());
2233 cx.mul(&mut f, &accum.0);
2237 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2243 Accum::Numeric {
2244 accum: OrderedDecimal(f),
2245 pos_infs: pos_infs * factor,
2246 neg_infs: neg_infs * factor,
2247 nans: nans * factor,
2248 non_nulls: non_nulls * factor,
2249 }
2250 }
2251 }
2252 }
2253}
2254
2255impl Columnation for Accum {
2256 type InnerRegion = CopyRegion<Self>;
2257}
2258
2259mod monoids {
2261
2262 use columnation::{Columnation, Region};
2278 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2279 use mz_expr::AggregateFunc;
2280 use mz_ore::soft_panic_or_log;
2281 use mz_repr::{Datum, Diff, Row};
2282 use serde::{Deserialize, Serialize};
2283
2284 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2286 pub enum ReductionMonoid {
2287 Min(Row),
2288 Max(Row),
2289 }
2290
2291 impl ReductionMonoid {
2292 pub fn finalize(&self) -> &Row {
2293 use ReductionMonoid::*;
2294 match self {
2295 Min(row) | Max(row) => row,
2296 }
2297 }
2298 }
2299
2300 impl Clone for ReductionMonoid {
2301 fn clone(&self) -> Self {
2302 use ReductionMonoid::*;
2303 match self {
2304 Min(row) => Min(row.clone()),
2305 Max(row) => Max(row.clone()),
2306 }
2307 }
2308
2309 fn clone_from(&mut self, source: &Self) {
2310 use ReductionMonoid::*;
2311
2312 let mut row = std::mem::take(match self {
2313 Min(row) | Max(row) => row,
2314 });
2315
2316 let source_row = match source {
2317 Min(row) | Max(row) => row,
2318 };
2319
2320 row.clone_from(source_row);
2321
2322 match source {
2323 Min(_) => *self = Min(row),
2324 Max(_) => *self = Max(row),
2325 }
2326 }
2327 }
2328
2329 impl Multiply<Diff> for ReductionMonoid {
2330 type Output = Self;
2331
2332 fn multiply(self, factor: &Diff) -> Self {
2333 assert!(factor.is_positive());
2338 self
2339 }
2340 }
2341
2342 impl Semigroup for ReductionMonoid {
2343 fn plus_equals(&mut self, rhs: &Self) {
2344 match (self, rhs) {
2345 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2346 let swap = {
2347 let lhs_val = lhs.unpack_first();
2348 let rhs_val = rhs.unpack_first();
2349 match (lhs_val, rhs_val) {
2351 (_, Datum::Null) => false,
2352 (Datum::Null, _) => true,
2353 (lhs, rhs) => rhs < lhs,
2354 }
2355 };
2356 if swap {
2357 lhs.clone_from(rhs);
2358 }
2359 }
2360 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2361 let swap = {
2362 let lhs_val = lhs.unpack_first();
2363 let rhs_val = rhs.unpack_first();
2364 match (lhs_val, rhs_val) {
2366 (_, Datum::Null) => false,
2367 (Datum::Null, _) => true,
2368 (lhs, rhs) => rhs > lhs,
2369 }
2370 };
2371 if swap {
2372 lhs.clone_from(rhs);
2373 }
2374 }
2375 (lhs, rhs) => {
2376 soft_panic_or_log!(
2377 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2378 );
2379 }
2380 }
2381 }
2382 }
2383
2384 impl IsZero for ReductionMonoid {
2385 fn is_zero(&self) -> bool {
2386 false
2392 }
2393 }
2394
2395 impl Columnation for ReductionMonoid {
2396 type InnerRegion = ReductionMonoidRegion;
2397 }
2398
2399 #[derive(Default)]
2403 pub struct ReductionMonoidRegion {
2404 inner: <Row as Columnation>::InnerRegion,
2405 }
2406
2407 impl Region for ReductionMonoidRegion {
2408 type Item = ReductionMonoid;
2409
2410 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2411 use ReductionMonoid::*;
2412 match item {
2413 Min(row) => Min(unsafe { self.inner.copy(row) }),
2414 Max(row) => Max(unsafe { self.inner.copy(row) }),
2415 }
2416 }
2417
2418 fn clear(&mut self) {
2419 self.inner.clear();
2420 }
2421
2422 fn reserve_items<'a, I>(&mut self, items: I)
2423 where
2424 Self: 'a,
2425 I: Iterator<Item = &'a Self::Item> + Clone,
2426 {
2427 self.inner
2428 .reserve_items(items.map(ReductionMonoid::finalize));
2429 }
2430
2431 fn reserve_regions<'a, I>(&mut self, regions: I)
2432 where
2433 Self: 'a,
2434 I: Iterator<Item = &'a Self> + Clone,
2435 {
2436 self.inner.reserve_regions(regions.map(|r| &r.inner));
2437 }
2438
2439 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2440 self.inner.heap_size(callback);
2441 }
2442 }
2443
2444 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2447 match func {
2448 AggregateFunc::MaxNumeric
2449 | AggregateFunc::MaxInt16
2450 | AggregateFunc::MaxInt32
2451 | AggregateFunc::MaxInt64
2452 | AggregateFunc::MaxUInt16
2453 | AggregateFunc::MaxUInt32
2454 | AggregateFunc::MaxUInt64
2455 | AggregateFunc::MaxMzTimestamp
2456 | AggregateFunc::MaxFloat32
2457 | AggregateFunc::MaxFloat64
2458 | AggregateFunc::MaxBool
2459 | AggregateFunc::MaxString
2460 | AggregateFunc::MaxDate
2461 | AggregateFunc::MaxTimestamp
2462 | AggregateFunc::MaxTimestampTz
2463 | AggregateFunc::MaxInterval
2464 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2465 AggregateFunc::MinNumeric
2466 | AggregateFunc::MinInt16
2467 | AggregateFunc::MinInt32
2468 | AggregateFunc::MinInt64
2469 | AggregateFunc::MinUInt16
2470 | AggregateFunc::MinUInt32
2471 | AggregateFunc::MinUInt64
2472 | AggregateFunc::MinMzTimestamp
2473 | AggregateFunc::MinFloat32
2474 | AggregateFunc::MinFloat64
2475 | AggregateFunc::MinBool
2476 | AggregateFunc::MinString
2477 | AggregateFunc::MinDate
2478 | AggregateFunc::MinTimestamp
2479 | AggregateFunc::MinTimestampTz
2480 | AggregateFunc::MinInterval
2481 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2482 AggregateFunc::SumInt16
2483 | AggregateFunc::SumInt32
2484 | AggregateFunc::SumInt64
2485 | AggregateFunc::SumUInt16
2486 | AggregateFunc::SumUInt32
2487 | AggregateFunc::SumUInt64
2488 | AggregateFunc::SumFloat32
2489 | AggregateFunc::SumFloat64
2490 | AggregateFunc::SumNumeric
2491 | AggregateFunc::Count
2492 | AggregateFunc::Any
2493 | AggregateFunc::All
2494 | AggregateFunc::Dummy
2495 | AggregateFunc::JsonbAgg { .. }
2496 | AggregateFunc::JsonbObjectAgg { .. }
2497 | AggregateFunc::MapAgg { .. }
2498 | AggregateFunc::ArrayConcat { .. }
2499 | AggregateFunc::ListConcat { .. }
2500 | AggregateFunc::StringAgg { .. }
2501 | AggregateFunc::RowNumber { .. }
2502 | AggregateFunc::Rank { .. }
2503 | AggregateFunc::DenseRank { .. }
2504 | AggregateFunc::LagLead { .. }
2505 | AggregateFunc::FirstValue { .. }
2506 | AggregateFunc::LastValue { .. }
2507 | AggregateFunc::WindowAggregate { .. }
2508 | AggregateFunc::FusedValueWindowFunc { .. }
2509 | AggregateFunc::FusedWindowAggregate { .. } => None,
2510 }
2511 }
2512}
2513
2514mod window_agg_helpers {
2515 use crate::render::reduce::*;
2516
2517 pub enum OneByOneAggrImpls {
2522 Accumulable(AccumulableOneByOneAggr),
2523 Hierarchical(HierarchicalOneByOneAggr),
2524 Basic(mz_expr::NaiveOneByOneAggr),
2525 }
2526
2527 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2528 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2529 match reduction_type(agg) {
2530 ReductionType::Basic => {
2531 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2532 }
2533 ReductionType::Accumulable => {
2534 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2535 }
2536 ReductionType::Hierarchical => {
2537 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2538 }
2539 }
2540 }
2541
2542 fn give(&mut self, d: &Datum) {
2543 match self {
2544 OneByOneAggrImpls::Basic(i) => i.give(d),
2545 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2546 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2547 }
2548 }
2549
2550 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2551 match self {
2553 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2554 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2555 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2556 }
2557 }
2558 }
2559
2560 pub struct AccumulableOneByOneAggr {
2561 aggr_func: AggregateFunc,
2562 accum: Accum,
2563 total: Diff,
2564 }
2565
2566 impl AccumulableOneByOneAggr {
2567 fn new(aggr_func: &AggregateFunc) -> Self {
2568 AccumulableOneByOneAggr {
2569 aggr_func: aggr_func.clone(),
2570 accum: accumulable_zero(aggr_func),
2571 total: Diff::ZERO,
2572 }
2573 }
2574
2575 fn give(&mut self, d: &Datum) {
2576 self.accum
2577 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2578 self.total += Diff::ONE;
2579 }
2580
2581 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2582 temp_storage.make_datum(|packer| {
2583 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2584 })
2585 }
2586 }
2587
2588 pub struct HierarchicalOneByOneAggr {
2589 aggr_func: AggregateFunc,
2590 monoid: ReductionMonoid,
2593 }
2594
2595 impl HierarchicalOneByOneAggr {
2596 fn new(aggr_func: &AggregateFunc) -> Self {
2597 let mut row_buf = Row::default();
2598 row_buf.packer().push(Datum::Null);
2599 HierarchicalOneByOneAggr {
2600 aggr_func: aggr_func.clone(),
2601 monoid: get_monoid(row_buf, aggr_func)
2602 .expect("aggr_func should be a hierarchical aggregation function"),
2603 }
2604 }
2605
2606 fn give(&mut self, d: &Datum) {
2607 let mut row_buf = Row::default();
2608 row_buf.packer().push(d);
2609 let m = get_monoid(row_buf, &self.aggr_func)
2610 .expect("aggr_func should be a hierarchical aggregation function");
2611 self.monoid.plus_equals(&m);
2612 }
2613
2614 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2615 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2616 }
2617 }
2618}
2619
2620#[cfg(test)]
2621mod tests {
2622 use super::*;
2623
2624 #[allow(clippy::as_conversions)]
2628 fn saturating_convert(n: f64) -> i128 {
2629 (n * FLOAT_SCALE) as i128
2630 }
2631
2632 #[mz_ore::test]
2633 fn float_to_fixed_point_matches_saturating_in_range() {
2634 let cases = [
2638 0.0,
2639 -0.0,
2640 1.0,
2641 -1.0,
2642 0.1,
2643 -0.1,
2644 0.5,
2645 -0.5,
2646 3.25,
2647 -3.25,
2648 123456.789,
2649 -123456.789,
2650 1e10,
2651 -1e10,
2652 1e20,
2653 -1e20,
2654 5e30, -5e30,
2656 ];
2657 for n in cases {
2658 assert_eq!(
2659 float_to_fixed_point(n),
2660 saturating_convert(n),
2661 "mismatch for n = {n}"
2662 );
2663 }
2664 }
2665
2666 #[mz_ore::test]
2667 fn float_to_fixed_point_truncates_toward_zero() {
2668 assert_eq!(float_to_fixed_point(1.75), 29_360_128);
2670 assert_eq!(float_to_fixed_point(-1.75), -29_360_128);
2671
2672 let frac = 0.123_456_7_f64;
2674 assert_eq!(float_to_fixed_point(frac), saturating_convert(frac));
2675 assert_eq!(float_to_fixed_point(-frac), saturating_convert(-frac));
2676 assert_eq!(float_to_fixed_point(-frac), -float_to_fixed_point(frac));
2677 }
2678
2679 #[mz_ore::test]
2680 fn float_to_fixed_point_subnormals_round_to_zero() {
2681 assert_eq!(float_to_fixed_point(0.0), 0);
2682 assert_eq!(float_to_fixed_point(-0.0), 0);
2683 assert_eq!(float_to_fixed_point(f64::MIN_POSITIVE / 2.0), 0);
2684 assert_eq!(float_to_fixed_point(5e-324), 0); }
2686
2687 #[mz_ore::test]
2688 fn float_to_fixed_point_cancels_large_finite_values() {
2689 for &n in &[1.1e31_f64, 1e32, 5e33, 1e284] {
2694 assert_eq!(
2695 float_to_fixed_point(n).wrapping_add(float_to_fixed_point(-n)),
2696 0,
2697 "n = {n} did not cancel with -n"
2698 );
2699 }
2700 }
2701
2702 #[mz_ore::test]
2703 fn float_to_fixed_point_sum_via_accumulator() {
2704 let func = AggregateFunc::SumFloat64;
2706 let mut acc = accumulable_zero(&func);
2707 acc.plus_equals(&datum_to_accumulator(&func, Datum::from(1.1e31_f64)));
2708 acc.plus_equals(&datum_to_accumulator(&func, Datum::from(-1.1e31_f64)));
2709 let datum = finalize_accum(&func, &acc, Diff::from(2_i64));
2710 assert_eq!(datum, Datum::from(0.0_f64));
2711 }
2712}