1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::IntoOwned;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
26use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
27use differential_dataflow::{Collection, Diff as _};
28use mz_compute_types::plan::reduce::{
29 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
30 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
31};
32use mz_expr::{
33 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
34};
35use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
36use mz_repr::fixed_length::ToDatumIter;
37use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
38use mz_storage_types::errors::DataflowError;
39use mz_timely_util::operator::CollectionExt;
40use serde::{Deserialize, Serialize};
41use timely::Container;
42use timely::container::{CapacityContainerBuilder, PushInto};
43use timely::dataflow::Scope;
44use timely::progress::Timestamp;
45use timely::progress::timestamp::Refines;
46use tracing::warn;
47
48use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
49use crate::extensions::reduce::{MzReduce, ReduceExt};
50use crate::render::context::{CollectionBundle, Context};
51use crate::render::errors::MaybeValidatingRow;
52use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
53use crate::render::{ArrangementFlavor, Pairer};
54use crate::row_spine::{
55 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
56};
57use crate::typedefs::{
58 ErrBatcher, ErrBuilder, KeyBatcher, RowErrBuilder, RowErrSpine, RowRowAgent, RowRowArrangement,
59 RowRowSpine, RowSpine, RowValSpine,
60};
61
62impl<G, T> Context<G, T>
63where
64 G: Scope,
65 G::Timestamp: Lattice + Refines<T> + Columnation,
66 T: Timestamp + Lattice + Columnation,
67{
68 pub fn render_reduce(
71 &self,
72 input: CollectionBundle<G, T>,
73 key_val_plan: KeyValPlan,
74 reduce_plan: ReducePlan,
75 input_key: Option<Vec<MirScalarExpr>>,
76 mfp_after: Option<MapFilterProject>,
77 ) -> CollectionBundle<G, T> {
78 let mfp_after = mfp_after.map(|m| {
80 m.into_plan()
81 .expect("MFP planning must succeed")
82 .into_nontemporal()
83 .expect("Fused Reduce MFPs do not have temporal predicates")
84 });
85
86 input.scope().region_named("Reduce", |inner| {
87 let KeyValPlan {
88 mut key_plan,
89 mut val_plan,
90 } = key_val_plan;
91 let key_arity = key_plan.projection.len();
92 let mut datums = DatumVec::new();
93
94 let mut demand = Vec::new();
96 demand.extend(key_plan.demand());
97 demand.extend(val_plan.demand());
98 demand.sort();
99 demand.dedup();
100
101 let mut demand_map = BTreeMap::new();
103 for column in demand.iter() {
104 demand_map.insert(*column, demand_map.len());
105 }
106 let demand_map_len = demand_map.len();
107 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
109 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
110 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
111
112 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
113 input_key.map(|k| (k, None)),
114 max_demand,
115 move |row_datums, time, diff| {
116 let binding = SharedRow::get();
117 let mut row_builder = binding.borrow_mut();
118 let temp_storage = RowArena::new();
119
120 let mut row_iter = row_datums.drain(..);
121 let mut datums_local = datums.borrow();
122 for skip in skips.iter() {
124 datums_local.push(row_iter.nth(*skip).unwrap());
125 }
126
127 let key =
129 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
130 let key = match key {
131 Err(e) => {
132 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
133 }
134 Ok(Some(key)) => key.clone(),
135 Ok(None) => panic!("Row expected as no predicate was used"),
136 };
137
138 datums_local.truncate(skips.len());
141 let val =
142 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
143 let val = match val {
144 Err(e) => {
145 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
146 }
147 Ok(Some(val)) => val.clone(),
148 Ok(None) => panic!("Row expected as no predicate was used"),
149 };
150
151 Some((Ok((key, val)), time.clone(), diff.clone()))
152 },
153 );
154
155 type CB<T> = ConsolidatingContainerBuilder<T>;
157 let (ok, mut err) = key_val_input
158 .as_collection()
159 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
160
161 err = err.concat(&err_input);
162
163 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
165 .leave_region()
166 })
167 }
168
169 fn render_reduce_plan<S>(
175 &self,
176 plan: ReducePlan,
177 collection: Collection<S, (Row, Row), Diff>,
178 err_input: Collection<S, DataflowError, Diff>,
179 key_arity: usize,
180 mfp_after: Option<SafeMfpPlan>,
181 ) -> CollectionBundle<S, T>
182 where
183 S: Scope<Timestamp = G::Timestamp>,
184 {
185 let mut errors = Default::default();
186 let arrangement =
187 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
188 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
189 CollectionBundle::from_columns(
190 0..key_arity,
191 ArrangementFlavor::Local(
192 arrangement,
193 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
194 ),
195 )
196 }
197
198 fn render_reduce_plan_inner<S>(
199 &self,
200 plan: ReducePlan,
201 collection: Collection<S, (Row, Row), Diff>,
202 errors: &mut Vec<Collection<S, DataflowError, Diff>>,
203 key_arity: usize,
204 mfp_after: Option<SafeMfpPlan>,
205 ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
206 where
207 S: Scope<Timestamp = G::Timestamp>,
208 {
209 let arrangement = match plan {
212 ReducePlan::Distinct => {
215 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
216 errors.push(errs);
217 arranged_output
218 }
219 ReducePlan::Accumulable(expr) => {
220 let (arranged_output, errs) =
221 self.build_accumulable(collection, expr, key_arity, mfp_after);
222 errors.push(errs);
223 arranged_output
224 }
225 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
226 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
227 errors.push(errs);
228 output
229 }
230 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
231 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
232 errors.push(errs);
233 output
234 }
235 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
236 index,
237 expr,
238 fused_unnest_list,
239 })) => {
240 let validating = !fused_unnest_list;
244 let (output, errs) = self.build_basic_aggregate(
245 collection,
246 index,
247 &expr,
248 validating,
249 key_arity,
250 mfp_after,
251 fused_unnest_list,
252 );
253 if validating {
254 errors.push(errs.expect("validation should have occurred as it was requested"));
255 }
256 output
257 }
258 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
259 let (output, errs) =
260 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
261 errors.push(errs);
262 output
263 }
264 ReducePlan::Collation(expr) => {
267 let mut to_collate = vec![];
269
270 for plan in [
271 expr.hierarchical.map(ReducePlan::Hierarchical),
272 expr.accumulable.map(ReducePlan::Accumulable),
273 expr.basic.map(ReducePlan::Basic),
274 ]
275 .into_iter()
276 .flat_map(std::convert::identity)
277 {
278 let r#type = ReductionType::try_from(&plan)
279 .expect("only representable reduction types were used above");
280
281 let arrangement = self.render_reduce_plan_inner(
282 plan,
283 collection.clone(),
284 errors,
285 key_arity,
286 None,
287 );
288 to_collate.push((r#type, arrangement));
289 }
290
291 let (oks, errs) = self.build_collation(
293 to_collate,
294 expr.aggregate_types,
295 &mut collection.scope(),
296 mfp_after,
297 );
298 errors.push(errs);
299 oks
300 }
301 };
302 arrangement
303 }
304
305 fn build_collation<S>(
313 &self,
314 arrangements: Vec<(ReductionType, RowRowArrangement<S>)>,
315 aggregate_types: Vec<ReductionType>,
316 scope: &mut S,
317 mfp_after: Option<SafeMfpPlan>,
318 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
319 where
320 S: Scope<Timestamp = G::Timestamp>,
321 {
322 let error_logger = self.error_logger();
323
324 if arrangements.len() <= 1 {
326 error_logger.soft_panic_or_log(
327 "Incorrect number of arrangements in reduce collation",
328 &format!("len={}", arrangements.len()),
329 );
330 }
331
332 let mut to_concat = vec![];
333
334 for (reduction_type, arrangement) in arrangements.into_iter() {
336 let collection = arrangement.as_collection(move |key, val| {
337 (key.into_owned(), (reduction_type, val.into_owned()))
338 });
339 to_concat.push(collection);
340 }
341
342 let mut distinct_aggregate_types = aggregate_types.clone();
346 distinct_aggregate_types.sort_unstable();
347 distinct_aggregate_types.dedup();
348 let n_distinct_aggregate_types = distinct_aggregate_types.len();
349
350 let mut datums1 = DatumVec::new();
352 let mut datums2 = DatumVec::new();
353 let mfp_after1 = mfp_after.clone();
354 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
355
356 let aggregate_types_err = aggregate_types.clone();
357 let (oks, errs) = differential_dataflow::collection::concatenate(scope, to_concat)
358 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_,_,_>, RowValSpine<_, _, _>>("Arrange ReduceCollation")
359 .reduce_pair::<_, _, _, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
360 "ReduceCollation",
361 "ReduceCollation Errors",
362 {
363 move |key, input, output| {
364 let mut accumulable = DatumList::empty().iter();
373 let mut hierarchical = DatumList::empty().iter();
374 let mut basic = DatumList::empty().iter();
375
376 if input.len() != n_distinct_aggregate_types {
380 return;
381 }
382
383 for (item, _) in input.iter() {
384 let reduction_type = &item.0;
385 let row = &item.1;
386 match reduction_type {
387 ReductionType::Accumulable => accumulable = row.iter(),
388 ReductionType::Hierarchical => hierarchical = row.iter(),
389 ReductionType::Basic => basic = row.iter(),
390 }
391 }
392
393 let temp_storage = RowArena::new();
394 let datum_iter = key.to_datum_iter();
395 let mut datums_local = datums1.borrow();
396 datums_local.extend(datum_iter);
397 let key_len = datums_local.len();
398
399 for typ in aggregate_types.iter() {
401 let datum = match typ {
402 ReductionType::Accumulable => accumulable.next(),
403 ReductionType::Hierarchical => hierarchical.next(),
404 ReductionType::Basic => basic.next(),
405 };
406 let Some(datum) = datum else { return };
407 datums_local.push(datum);
408 }
409
410 if (accumulable.next(), hierarchical.next(), basic.next())
418 == (None, None, None)
419 {
420 if let Some(row) = evaluate_mfp_after(
421 &mfp_after1,
422 &mut datums_local,
423 &temp_storage,
424 key_len,
425 ) {
426 output.push((row, Diff::ONE));
427 }
428 }
429 }
430 },
431 move |key, input, output| {
432 if input.len() != n_distinct_aggregate_types {
433 let message = "Mismatched aggregates for key in ReduceCollation";
437 error_logger.log(
438 message,
439 &format!(
440 "key={key:?}, n_aggregates_requested={requested}, \
441 n_distinct_aggregate_types={n_distinct_aggregate_types}",
442 requested = input.len(),
443 ),
444 );
445 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
446 return;
447 }
448
449 let mut accumulable = DatumList::empty().iter();
450 let mut hierarchical = DatumList::empty().iter();
451 let mut basic = DatumList::empty().iter();
452 for (item, _) in input.iter() {
453 let reduction_type = &item.0;
454 let row = &item.1;
455 match reduction_type {
456 ReductionType::Accumulable => accumulable = row.iter(),
457 ReductionType::Hierarchical => hierarchical = row.iter(),
458 ReductionType::Basic => basic = row.iter(),
459 }
460 }
461
462 let temp_storage = RowArena::new();
463 let datum_iter = key.to_datum_iter();
464 let mut datums_local = datums2.borrow();
465 datums_local.extend(datum_iter);
466
467 for typ in aggregate_types_err.iter() {
468 let datum = match typ {
469 ReductionType::Accumulable => accumulable.next(),
470 ReductionType::Hierarchical => hierarchical.next(),
471 ReductionType::Basic => basic.next(),
472 };
473 if let Some(datum) = datum {
474 datums_local.push(datum);
475 } else {
476 let message = "Missing value for key in ReduceCollation";
479 error_logger.log(message, &format!("typ={typ:?}, key={key:?}"));
480 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
481 return;
482 }
483 }
484
485 if (accumulable.next(), hierarchical.next(), basic.next()) != (None, None, None)
488 {
489 let message = "Rows too large for key in ReduceCollation";
490 error_logger.log(message, &format!("key={key:?}"));
491 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
492 }
493
494 let Some(mfp) = &mfp_after2 else { return };
497 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
498 output.push((e.into(), Diff::ONE));
499 }
500 },
501 );
502 (oks, errs.as_collection(|_, v| v.clone()))
503 }
504
505 fn build_distinct<S>(
507 &self,
508 collection: Collection<S, (Row, Row), Diff>,
509 mfp_after: Option<SafeMfpPlan>,
510 ) -> (
511 Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
512 Collection<S, DataflowError, Diff>,
513 )
514 where
515 S: Scope<Timestamp = G::Timestamp>,
516 {
517 let error_logger = self.error_logger();
518
519 let mut datums1 = DatumVec::new();
521 let mut datums2 = DatumVec::new();
522 let mfp_after1 = mfp_after.clone();
523 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
524
525 let (output, errors) = collection
526 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _,>, RowRowSpine<_, _>>("Arranged DistinctBy")
527 .reduce_pair::<_, _, _, RowRowBuilder<_, _,>, RowRowSpine<_, _>, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
528 "DistinctBy",
529 "DistinctByErrorCheck",
530 move |key, _input, output| {
531 let temp_storage = RowArena::new();
532 let mut datums_local = datums1.borrow();
533 datums_local.extend(key.to_datum_iter());
534
535 if mfp_after1
539 .as_ref()
540 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
541 .unwrap_or(Ok(true))
542 == Ok(true)
543 {
544 output.push((Row::default(), Diff::ONE));
548 }
549 },
550 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
551 for (_, count) in input.iter() {
552 if count.is_positive() {
553 continue;
554 }
555 let message = "Non-positive multiplicity in DistinctBy";
556 error_logger.log(message, &format!("row={key:?}, count={count}"));
557 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
558 return;
559 }
560 let Some(mfp) = &mfp_after2 else { return };
562 let temp_storage = RowArena::new();
563 let datum_iter = key.to_datum_iter();
564 let mut datums_local = datums2.borrow();
565 datums_local.extend(datum_iter);
566
567 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
568 output.push((e.into(), Diff::ONE));
569 }
570 },
571 );
572 (output, errors.as_collection(|_k, v| v.clone()))
573 }
574
575 fn build_basic_aggregates<S>(
583 &self,
584 input: Collection<S, (Row, Row), Diff>,
585 aggrs: Vec<(usize, AggregateExpr)>,
586 key_arity: usize,
587 mfp_after: Option<SafeMfpPlan>,
588 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
589 where
590 S: Scope<Timestamp = G::Timestamp>,
591 {
592 if aggrs.len() <= 1 {
595 self.error_logger().soft_panic_or_log(
596 "Too few aggregations when building basic aggregates",
597 &format!("len={}", aggrs.len()),
598 )
599 }
600 let mut err_output = None;
601 let mut to_collect = Vec::new();
602 for (index, aggr) in aggrs {
603 let (result, errs) = self.build_basic_aggregate(
604 input.clone(),
605 index,
606 &aggr,
607 err_output.is_none(),
608 key_arity,
609 None,
610 false,
611 );
612 if errs.is_some() {
613 err_output = errs
614 }
615 to_collect.push(
616 result.as_collection(move |key, val| (key.into_owned(), (index, val.into_owned()))),
617 );
618 }
619
620 let mut datums1 = DatumVec::new();
622 let mut datums2 = DatumVec::new();
623 let mfp_after1 = mfp_after.clone();
624 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
625
626 let arranged =
627 differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
628 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
629 "Arranged ReduceFuseBasic input",
630 );
631
632 let output = arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
633 "ReduceFuseBasic",
634 {
635 move |key, input, output| {
636 let temp_storage = RowArena::new();
637 let datum_iter = key.to_datum_iter();
638 let mut datums_local = datums1.borrow();
639 datums_local.extend(datum_iter);
640 let key_len = datums_local.len();
641
642 for ((_, row), _) in input.iter() {
643 datums_local.push(row.unpack_first());
644 }
645
646 if let Some(row) =
647 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
648 {
649 output.push((row, Diff::ONE));
650 }
651 }
652 },
653 );
654 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
659 if let Some(mfp) = mfp_after2 {
660 let mfp_errs = arranged
661 .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
662 "ReduceFuseBasic Error Check",
663 move |key, input, output| {
664 let temp_storage = RowArena::new();
667 let datum_iter = key.to_datum_iter();
668 let mut datums_local = datums2.borrow();
669 datums_local.extend(datum_iter);
670
671 for ((_, row), _) in input.iter() {
672 datums_local.push(row.unpack_first());
673 }
674
675 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
676 output.push((e.into(), Diff::ONE));
677 }
678 },
679 )
680 .as_collection(|_, v| v.into_owned());
681 (output, validation_errs.concat(&mfp_errs))
682 } else {
683 (output, validation_errs)
684 }
685 }
686
687 fn build_basic_aggregate<S>(
691 &self,
692 input: Collection<S, (Row, Row), Diff>,
693 index: usize,
694 aggr: &AggregateExpr,
695 validating: bool,
696 key_arity: usize,
697 mfp_after: Option<SafeMfpPlan>,
698 fused_unnest_list: bool,
699 ) -> (
700 RowRowArrangement<S>,
701 Option<Collection<S, DataflowError, Diff>>,
702 )
703 where
704 S: Scope<Timestamp = G::Timestamp>,
705 {
706 let AggregateExpr {
707 func,
708 expr: _,
709 distinct,
710 } = aggr.clone();
711
712 let mut partial = input.map(move |(key, row)| {
714 let binding = SharedRow::get();
715 let mut row_builder = binding.borrow_mut();
716 let value = row.iter().nth(index).unwrap();
717 row_builder.packer().push(value);
718 (key, row_builder.clone())
719 });
720
721 let mut err_output = None;
722
723 if distinct {
725 let pairer = Pairer::new(key_arity);
727 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
728 if validating {
729 let (oks, errs) = self
730 .build_reduce_inaccumulable_distinct::<_, _, RowValBuilder<Result<(), String>, _,_>, RowValSpine<Result<(), String>, _, _>>(keyed, None)
731 .as_collection(|k, v| (k.into_owned(), v.as_ref().map(|&()| ()).map_err(|m| m.as_str().into())))
732 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>("Demux Errors", move |(key_val, result)| match result {
733 Ok(()) => Ok(pairer.split(&key_val)),
734 Err(m) => Err(EvalError::Internal(m).into()),
735 });
736 err_output = Some(errs);
737 partial = oks;
738 } else {
739 partial = self
740 .build_reduce_inaccumulable_distinct::<_, _, RowBuilder<_, _>, RowSpine<_, _>>(
741 keyed,
742 Some(" [val: empty]"),
743 )
744 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
745 }
746 }
747
748 let mut datums1 = DatumVec::new();
750 let mut datums2 = DatumVec::new();
751 let mut datums_key_1 = DatumVec::new();
752 let mut datums_key_2 = DatumVec::new();
753 let mfp_after1 = mfp_after.clone();
754 let func2 = func.clone();
755
756 let name = if !fused_unnest_list {
757 "ReduceInaccumulable"
758 } else {
759 "FusedReduceUnnestList"
760 };
761 let arranged = partial
762 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
763 "Arranged {name}"
764 ));
765 let oks = if !fused_unnest_list {
766 arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
767 move |key, source, target| {
768 let iter = source.iter().flat_map(|(v, w)| {
772 let count = usize::try_from(w.into_inner()).unwrap_or(0);
775 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
776 });
777
778 let temp_storage = RowArena::new();
779 let datum_iter = key.to_datum_iter();
780 let mut datums_local = datums1.borrow();
781 datums_local.extend(datum_iter);
782 let key_len = datums_local.len();
783 datums_local.push(
784 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
787 iter,
788 &temp_storage,
789 ),
790 );
791
792 if let Some(row) =
793 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
794 {
795 target.push((row, Diff::ONE));
796 }
797 }
798 })
799 } else {
800 arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
801 move |key, source, target| {
802 let iter = source.iter().flat_map(|(v, w)| {
804 let count = usize::try_from(w.into_inner()).unwrap_or(0);
805 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
806 });
807
808 let temp_storage = RowArena::new();
810 let mut datums_local = datums_key_1.borrow();
811 datums_local.extend(key.to_datum_iter());
812 let key_len = datums_local.len();
813 for datum in func
814 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
815 iter,
816 &temp_storage,
817 )
818 {
819 datums_local.truncate(key_len);
820 datums_local.push(datum);
821 if let Some(row) = evaluate_mfp_after(
822 &mfp_after1,
823 &mut datums_local,
824 &temp_storage,
825 key_len,
826 ) {
827 target.push((row, Diff::ONE));
828 }
829 }
830 }
831 })
832 };
833
834 let must_validate = validating && err_output.is_none();
838 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
839 if must_validate || mfp_after2.is_some() {
840 let error_logger = self.error_logger();
841
842 let errs = if !fused_unnest_list {
843 arranged
844 .mz_reduce_abelian::<_, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
845 &format!("{name} Error Check"),
846 move |key, source, target| {
847 if must_validate {
851 for (value, count) in source.iter() {
852 if count.is_positive() {
853 continue;
854 }
855 let value = value.into_owned();
856 let message = "Non-positive accumulation in ReduceInaccumulable";
857 error_logger
858 .log(message, &format!("value={value:?}, count={count}"));
859 target.push((EvalError::Internal(message.into()).into(), Diff::ONE));
860 return;
861 }
862 }
863
864 let Some(mfp) = &mfp_after2 else { return };
866 let iter = source.iter().flat_map(|&(mut v, ref w)| {
867 let count = usize::try_from(w.into_inner()).unwrap_or(0);
868 std::iter::repeat(v.next().unwrap()).take(count)
871 });
872
873 let temp_storage = RowArena::new();
874 let datum_iter = key.to_datum_iter();
875 let mut datums_local = datums2.borrow();
876 datums_local.extend(datum_iter);
877 datums_local.push(
878 func2.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
879 iter,
880 &temp_storage,
881 ),
882 );
883 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
884 {
885 target.push((e.into(), Diff::ONE));
886 }
887 },
888 )
889 .as_collection(|_, v| v.into_owned())
890 } else {
891 assert!(!must_validate);
893 let Some(mfp) = mfp_after2 else {
896 unreachable!()
897 };
898 arranged
899 .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
900 &format!("{name} Error Check"),
901 move |key, source, target| {
902 let iter = source.iter().flat_map(|&(mut v, ref w)| {
903 let count = usize::try_from(w.into_inner()).unwrap_or(0);
904 std::iter::repeat(v.next().unwrap()).take(count)
907 });
908
909 let temp_storage = RowArena::new();
910 let mut datums_local = datums_key_2.borrow();
911 datums_local.extend(key.to_datum_iter());
912 let key_len = datums_local.len();
913 for datum in func2
914 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
915 iter,
916 &temp_storage,
917 )
918 {
919 datums_local.truncate(key_len);
920 datums_local.push(datum);
921 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
924 {
925 target.push((e.into(), Diff::ONE));
926 }
927 }
928 },
929 )
930 .as_collection(|_, v| v.into_owned())
931 };
932
933 if let Some(e) = err_output {
934 err_output = Some(e.concat(&errs));
935 } else {
936 err_output = Some(errs);
937 }
938 }
939 (oks, err_output)
940 }
941
942 fn build_reduce_inaccumulable_distinct<S, V, Bu, Tr>(
943 &self,
944 input: Collection<S, Row, Diff>,
945 name_tag: Option<&str>,
946 ) -> Arranged<S, TraceAgent<Tr>>
947 where
948 S: Scope<Timestamp = G::Timestamp>,
949 V: MaybeValidatingRow<(), String>,
950 Tr: Trace
951 + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff>
952 + 'static,
953 Tr::Batch: Batch,
954 Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
955 Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
956 for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = Row>,
957 for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
958 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
959 {
960 let error_logger = self.error_logger();
961
962 let output_name = format!(
963 "ReduceInaccumulable Distinct{}",
964 name_tag.unwrap_or_default()
965 );
966
967 let input: KeyCollection<_, _, _> = input.into();
968 input
969 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
970 "Arranged ReduceInaccumulable Distinct [val: empty]",
971 )
972 .mz_reduce_abelian::<_, _, _, Bu, Tr>(&output_name, move |_, source, t| {
973 if let Some(err) = V::into_error() {
974 for (value, count) in source.iter() {
975 if count.is_positive() {
976 continue;
977 }
978
979 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
980 error_logger.log(message, &format!("value={value:?}, count={count}"));
981 t.push((err(message.to_string()), Diff::ONE));
982 return;
983 }
984 }
985 t.push((V::ok(()), Diff::ONE))
986 })
987 }
988
989 fn build_bucketed<S>(
1007 &self,
1008 input: Collection<S, (Row, Row), Diff>,
1009 BucketedPlan {
1010 aggr_funcs,
1011 skips,
1012 buckets,
1013 }: BucketedPlan,
1014 key_arity: usize,
1015 mfp_after: Option<SafeMfpPlan>,
1016 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1017 where
1018 S: Scope<Timestamp = G::Timestamp>,
1019 {
1020 let mut err_output: Option<Collection<S, _, _>> = None;
1021 let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
1022 let input = input.enter(inner);
1023
1024 let first_mod = buckets.get(0).copied().unwrap_or(1);
1026
1027 let mut stage = input.map(move |(key, row)| {
1029 let binding = SharedRow::get();
1030 let mut row_builder = binding.borrow_mut();
1031 let mut row_packer = row_builder.packer();
1032 let mut row_iter = row.iter();
1033 for skip in skips.iter() {
1034 row_packer.push(row_iter.nth(*skip).unwrap());
1035 }
1036 let values = row_builder.clone();
1037
1038 let hash = values.hashed() % first_mod;
1040 let hash_key =
1041 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
1042 (hash_key, values)
1043 });
1044
1045 for (index, b) in buckets.into_iter().enumerate() {
1047 let input = if index == 0 {
1049 stage
1050 } else {
1051 stage.map(move |(hash_key, values)| {
1052 let mut hash_key_iter = hash_key.iter();
1053 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
1054 let hash_key = SharedRow::pack(
1056 std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
1057 );
1058 (hash_key, values)
1059 })
1060 };
1061
1062 let validating = err_output.is_none();
1066
1067 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
1068 if let Some(errs) = errs {
1069 err_output = Some(errs.leave_region());
1070 }
1071 stage = oks
1072 }
1073
1074 let partial = stage.map(move |(hash_key, values)| {
1076 let mut hash_key_iter = hash_key.iter();
1077 let _hash = hash_key_iter.next();
1078 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
1079 });
1080
1081 let mut datums1 = DatumVec::new();
1083 let mut datums2 = DatumVec::new();
1084 let mfp_after1 = mfp_after.clone();
1085 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1086 let aggr_funcs2 = aggr_funcs.clone();
1087
1088 let error_logger = self.error_logger();
1091 let arranged = partial
1094 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1095 "Arrange ReduceMinsMaxes",
1096 );
1097 let must_validate = err_output.is_none();
1101 if must_validate || mfp_after2.is_some() {
1102 let errs = arranged
1103 .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1104 "ReduceMinsMaxes Error Check",
1105 move |key, source, target| {
1106 if must_validate {
1110 for (val, count) in source.iter() {
1111 if count.is_positive() {
1112 continue;
1113 }
1114 let val = val.into_owned();
1115 let message = "Non-positive accumulation in ReduceMinsMaxes";
1116 error_logger
1117 .log(message, &format!("val={val:?}, count={count}"));
1118 target.push((
1119 EvalError::Internal(message.into()).into(),
1120 Diff::ONE,
1121 ));
1122 return;
1123 }
1124 }
1125
1126 let Some(mfp) = &mfp_after2 else { return };
1128 let temp_storage = RowArena::new();
1129 let datum_iter = key.to_datum_iter();
1130 let mut datums_local = datums2.borrow();
1131 datums_local.extend(datum_iter);
1132
1133 let mut source_iters = source
1134 .iter()
1135 .map(|(values, _cnt)| *values)
1136 .collect::<Vec<_>>();
1137 for func in aggr_funcs2.iter() {
1138 let column_iter = (0..source_iters.len())
1139 .map(|i| source_iters[i].next().unwrap());
1140 datums_local.push(func.eval(column_iter, &temp_storage));
1141 }
1142 if let Result::Err(e) =
1143 mfp.evaluate_inner(&mut datums_local, &temp_storage)
1144 {
1145 target.push((e.into(), Diff::ONE));
1146 }
1147 },
1148 )
1149 .as_collection(|_, v| v.into_owned())
1150 .leave_region();
1151 if let Some(e) = &err_output {
1152 err_output = Some(e.concat(&errs));
1153 } else {
1154 err_output = Some(errs);
1155 }
1156 }
1157 arranged
1158 .mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1159 "ReduceMinsMaxes",
1160 move |key, source, target| {
1161 let temp_storage = RowArena::new();
1162 let datum_iter = key.to_datum_iter();
1163 let mut datums_local = datums1.borrow();
1164 datums_local.extend(datum_iter);
1165 let key_len = datums_local.len();
1166
1167 let mut source_iters = source
1168 .iter()
1169 .map(|(values, _cnt)| *values)
1170 .collect::<Vec<_>>();
1171 for func in aggr_funcs.iter() {
1172 let column_iter =
1173 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1174 datums_local.push(func.eval(column_iter, &temp_storage));
1175 }
1176
1177 if let Some(row) = evaluate_mfp_after(
1178 &mfp_after1,
1179 &mut datums_local,
1180 &temp_storage,
1181 key_len,
1182 ) {
1183 target.push((row, Diff::ONE));
1184 }
1185 },
1186 )
1187 .leave_region()
1188 });
1189 (
1190 arranged_output,
1191 err_output.expect("expected to validate in one level of the hierarchy"),
1192 )
1193 }
1194
1195 fn build_bucketed_stage<S>(
1202 &self,
1203 aggr_funcs: &Vec<AggregateFunc>,
1204 input: &Collection<S, (Row, Row), Diff>,
1205 validating: bool,
1206 ) -> (
1207 Collection<S, (Row, Row), Diff>,
1208 Option<Collection<S, DataflowError, Diff>>,
1209 )
1210 where
1211 S: Scope<Timestamp = G::Timestamp>,
1212 {
1213 let (input, negated_output, errs) = if validating {
1214 let (input, reduced) = self
1215 .build_bucketed_negated_output::<_, _, RowValBuilder<_,_,_>, RowValSpine<Result<Row, Row>, _, _>>(
1216 input,
1217 aggr_funcs.clone(),
1218 );
1219 let (oks, errs) = reduced
1220 .as_collection(|k, v| (k.into_owned(), v.clone()))
1221 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1222 "Checked Invalid Accumulations",
1223 |(hash_key, result)| match result {
1224 Err(hash_key) => {
1225 let mut hash_key_iter = hash_key.iter();
1226 let _hash = hash_key_iter.next();
1227 let key = SharedRow::pack(hash_key_iter);
1228 let message = format!(
1229 "Invalid data in source, saw non-positive accumulation \
1230 for key {key:?} in hierarchical mins-maxes aggregate"
1231 );
1232 Err(EvalError::Internal(message.into()).into())
1233 }
1234 Ok(values) => Ok((hash_key, values)),
1235 },
1236 );
1237 (input, oks, Some(errs))
1238 } else {
1239 let (input, reduced) = self
1240 .build_bucketed_negated_output::<_, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1241 input,
1242 aggr_funcs.clone(),
1243 );
1244 let oks = reduced.as_collection(|k, v| (k.into_owned(), v.into_owned()));
1247 (input, oks, None)
1248 };
1249
1250 let input = input.as_collection(|k, v| (k.into_owned(), v.into_owned()));
1251 let oks = negated_output.concat(&input);
1252 (oks, errs)
1253 }
1254
1255 fn build_bucketed_negated_output<S, V, Bu, Tr>(
1259 &self,
1260 input: &Collection<S, (Row, Row), Diff>,
1261 aggrs: Vec<AggregateFunc>,
1262 ) -> (
1263 Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1264 Arranged<S, TraceAgent<Tr>>,
1265 )
1266 where
1267 S: Scope<Timestamp = G::Timestamp>,
1268 V: MaybeValidatingRow<Row, Row>,
1269 Tr: Trace
1270 + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff>
1271 + 'static,
1272 Tr::Batch: Batch,
1273 Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
1274 Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
1275 for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
1276 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1277 {
1278 let error_logger = self.error_logger();
1279 let arranged_input = input
1282 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1283 "Arranged MinsMaxesHierarchical input",
1284 );
1285
1286 let reduced = arranged_input.mz_reduce_abelian::<_, _, _, Bu, Tr>(
1287 "Reduced Fallibly MinsMaxesHierarchical",
1288 move |key, source, target| {
1289 if let Some(err) = V::into_error() {
1290 for (value, count) in source.iter() {
1292 if count.is_positive() {
1293 continue;
1294 }
1295 error_logger.log(
1296 "Non-positive accumulation in MinsMaxesHierarchical",
1297 &format!("key={key:?}, value={value:?}, count={count}"),
1298 );
1299 target.push((err(key.into_owned()), Diff::MINUS_ONE));
1302 return;
1303 }
1304 }
1305
1306 let binding = SharedRow::get();
1307 let mut row_builder = binding.borrow_mut();
1308 let mut row_packer = row_builder.packer();
1309
1310 let mut source_iters = source
1311 .iter()
1312 .map(|(values, _cnt)| *values)
1313 .collect::<Vec<_>>();
1314 for func in aggrs.iter() {
1315 let column_iter =
1316 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1317 row_packer.push(func.eval(column_iter, &RowArena::new()));
1318 }
1319 target.reserve(source.len().saturating_add(1));
1325 target.push((V::ok(row_builder.clone()), Diff::MINUS_ONE));
1326 target.extend(source.iter().map(|(values, cnt)| {
1327 let mut cnt = *cnt;
1328 cnt.negate();
1329 (V::ok((*values).into_owned()), cnt)
1330 }));
1331 },
1332 );
1333 (arranged_input, reduced)
1334 }
1335
1336 fn build_monotonic<S>(
1339 &self,
1340 collection: Collection<S, (Row, Row), Diff>,
1341 MonotonicPlan {
1342 aggr_funcs,
1343 skips,
1344 must_consolidate,
1345 }: MonotonicPlan,
1346 mfp_after: Option<SafeMfpPlan>,
1347 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1348 where
1349 S: Scope<Timestamp = G::Timestamp>,
1350 {
1351 let collection = collection
1353 .map(move |(key, row)| {
1354 let binding = SharedRow::get();
1355 let mut row_builder = binding.borrow_mut();
1356 let mut values = Vec::with_capacity(skips.len());
1357 let mut row_iter = row.iter();
1358 for skip in skips.iter() {
1359 values.push(
1360 row_builder.pack_using(std::iter::once(row_iter.nth(*skip).unwrap())),
1361 );
1362 }
1363
1364 (key, values)
1365 })
1366 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1367 must_consolidate,
1368 "Consolidated ReduceMonotonic input",
1369 );
1370
1371 let error_logger = self.error_logger();
1373 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1374 error_logger.log(
1375 "Non-monotonic input to ReduceMonotonic",
1376 &format!("data={data:?}, diff={diff}"),
1377 );
1378 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1379 (EvalError::Internal(m).into(), Diff::ONE)
1380 });
1381 let partial = partial.explode_one(move |(key, values)| {
1385 let mut output = Vec::new();
1386 for (row, func) in values.into_iter().zip(aggr_funcs.iter()) {
1387 output.push(monoids::get_monoid(row, func).expect(
1388 "hierarchical aggregations are expected to have monoid implementations",
1389 ));
1390 }
1391 (key, output)
1392 });
1393
1394 let mut datums1 = DatumVec::new();
1396 let mut datums2 = DatumVec::new();
1397 let mfp_after1 = mfp_after.clone();
1398 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1399
1400 let partial: KeyCollection<_, _, _> = partial.into();
1401 let arranged = partial
1402 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1403 "ArrangeMonotonic [val: empty]",
1404 );
1405 let output = arranged.mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1406 "ReduceMonotonic",
1407 {
1408 move |key, input, output| {
1409 let temp_storage = RowArena::new();
1410 let datum_iter = key.to_datum_iter();
1411 let mut datums_local = datums1.borrow();
1412 datums_local.extend(datum_iter);
1413 let key_len = datums_local.len();
1414 let accum = &input[0].1;
1415 for monoid in accum.iter() {
1416 datums_local.extend(monoid.finalize().iter());
1417 }
1418
1419 if let Some(row) =
1420 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1421 {
1422 output.push((row, Diff::ONE));
1423 }
1424 }
1425 },
1426 );
1427
1428 if let Some(mfp) = mfp_after2 {
1433 let mfp_errs = arranged
1434 .mz_reduce_abelian::<_, _, _, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1435 "ReduceMonotonic Error Check",
1436 move |key, input, output| {
1437 let temp_storage = RowArena::new();
1438 let datum_iter = key.to_datum_iter();
1439 let mut datums_local = datums2.borrow();
1440 datums_local.extend(datum_iter);
1441 let accum = &input[0].1;
1442 for monoid in accum.iter() {
1443 datums_local.extend(monoid.finalize().iter());
1444 }
1445 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1446 {
1447 output.push((e.into(), Diff::ONE));
1448 }
1449 },
1450 )
1451 .as_collection(|_k, v| v.clone());
1452 (output, validation_errs.concat(&mfp_errs))
1453 } else {
1454 (output, validation_errs)
1455 }
1456 }
1457
1458 fn build_accumulable<S>(
1465 &self,
1466 collection: Collection<S, (Row, Row), Diff>,
1467 AccumulablePlan {
1468 full_aggrs,
1469 simple_aggrs,
1470 distinct_aggrs,
1471 }: AccumulablePlan,
1472 key_arity: usize,
1473 mfp_after: Option<SafeMfpPlan>,
1474 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1475 where
1476 S: Scope<Timestamp = G::Timestamp>,
1477 {
1478 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1480 self.error_logger().soft_panic_or_log(
1481 "Incorrect numbers of aggregates in accummulable reduction rendering",
1482 &format!(
1483 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1484 full_aggrs.len(),
1485 simple_aggrs.len(),
1486 distinct_aggrs.len(),
1487 ),
1488 );
1489 }
1490
1491 let zero_diffs: (Vec<_>, Diff) = (
1503 full_aggrs
1504 .iter()
1505 .map(|f| accumulable_zero(&f.func))
1506 .collect(),
1507 Diff::ZERO,
1508 );
1509
1510 let mut to_aggregate = Vec::new();
1511 if simple_aggrs.len() > 0 {
1512 let easy_cases = collection.explode_one({
1514 let zero_diffs = zero_diffs.clone();
1515 move |(key, row)| {
1516 let mut diffs = zero_diffs.clone();
1517 let mut row_iter = row.iter().enumerate();
1523 for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
1524 let mut datum = row_iter.next().unwrap();
1525 while datum_index != &datum.0 {
1526 datum = row_iter.next().unwrap();
1527 }
1528 let datum = datum.1;
1529 diffs.0[*accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1530 diffs.1 = Diff::ONE;
1531 }
1532 ((key, ()), diffs)
1533 }
1534 });
1535 to_aggregate.push(easy_cases);
1536 }
1537
1538 for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
1540 let pairer = Pairer::new(key_arity);
1541 let collection = collection
1542 .map(move |(key, row)| {
1543 let value = row.iter().nth(datum_index).unwrap();
1544 (pairer.merge(&key, std::iter::once(value)), ())
1545 })
1546 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1547 "Arranged Accumulable Distinct [val: empty]",
1548 )
1549 .mz_reduce_abelian::<_, _, _, RowBuilder<_, _>, RowSpine<_, _>>(
1550 "Reduced Accumulable Distinct [val: empty]",
1551 move |_k, _s, t| t.push(((), Diff::ONE)),
1552 )
1553 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1554 .explode_one({
1555 let zero_diffs = zero_diffs.clone();
1556 move |(key, row)| {
1557 let datum = row.iter().next().unwrap();
1558 let mut diffs = zero_diffs.clone();
1559 diffs.0[accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1560 diffs.1 = Diff::ONE;
1561 ((key, ()), diffs)
1562 }
1563 });
1564 to_aggregate.push(collection);
1565 }
1566
1567 let collection = if to_aggregate.len() == 1 {
1569 to_aggregate.remove(0)
1570 } else {
1571 differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1572 };
1573
1574 let mut datums1 = DatumVec::new();
1576 let mut datums2 = DatumVec::new();
1577 let mfp_after1 = mfp_after.clone();
1578 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1579 let full_aggrs2 = full_aggrs.clone();
1580
1581 let error_logger = self.error_logger();
1582 let err_full_aggrs = full_aggrs.clone();
1583 let (arranged_output, arranged_errs) = collection
1584 .mz_arrange::<RowBatcher<_,_>, RowBuilder<_,_>, RowSpine<_, (Vec<Accum>, Diff)>>("ArrangeAccumulable [val: empty]")
1585 .reduce_pair::<_, _, _, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
1586 "ReduceAccumulable",
1587 "AccumulableErrorCheck",
1588 {
1589 move |key, input, output| {
1590 let (ref accums, total) = input[0].1;
1591
1592 let temp_storage = RowArena::new();
1593 let datum_iter = key.to_datum_iter();
1594 let mut datums_local = datums1.borrow();
1595 datums_local.extend(datum_iter);
1596 let key_len = datums_local.len();
1597 for (aggr, accum) in full_aggrs.iter().zip(accums) {
1598 datums_local.push(finalize_accum(&aggr.func, accum, total));
1599 }
1600
1601 if let Some(row) = evaluate_mfp_after(
1602 &mfp_after1,
1603 &mut datums_local,
1604 &temp_storage,
1605 key_len,
1606 ) {
1607 output.push((row, Diff::ONE));
1608 }
1609 }
1610 },
1611 move |key, input, output| {
1612 let (ref accums, total) = input[0].1;
1613 for (aggr, accum) in err_full_aggrs.iter().zip(accums) {
1614 if total == Diff::ZERO && !accum.is_zero() {
1617 error_logger.log(
1618 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1619 &format!("aggr={aggr:?}, accum={accum:?}"),
1620 );
1621 let key = key.into_owned();
1622 let message = format!(
1623 "Invalid data in source, saw net-zero records for key {key} \
1624 with non-zero accumulation in accumulable aggregate"
1625 );
1626 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1627 }
1628 match (&aggr.func, &accum) {
1629 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1630 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1631 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1632 if accum.is_negative() {
1633 error_logger.log(
1634 "Invalid negative unsigned aggregation in ReduceAccumulable",
1635 &format!("aggr={aggr:?}, accum={accum:?}"),
1636 );
1637 let key = key.into_owned();
1638 let message = format!(
1639 "Invalid data in source, saw negative accumulation with \
1640 unsigned type for key {key}"
1641 );
1642 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1643 }
1644 }
1645 _ => (), }
1647 }
1648
1649 let Some(mfp) = &mfp_after2 else { return };
1651 let temp_storage = RowArena::new();
1652 let datum_iter = key.to_datum_iter();
1653 let mut datums_local = datums2.borrow();
1654 datums_local.extend(datum_iter);
1655 for (aggr, accum) in full_aggrs2.iter().zip(accums) {
1656 datums_local.push(finalize_accum(&aggr.func, accum, total));
1657 }
1658
1659 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1660 output.push((e.into(), Diff::ONE));
1661 }
1662 },
1663 );
1664 (
1665 arranged_output,
1666 arranged_errs.as_collection(|_key, error| error.into_owned()),
1667 )
1668 }
1669}
1670
1671fn evaluate_mfp_after<'a, 'b>(
1675 mfp_after: &'a Option<SafeMfpPlan>,
1676 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1677 temp_storage: &'a RowArena,
1678 key_len: usize,
1679) -> Option<Row> {
1680 let binding = SharedRow::get();
1681 let mut row_builder = binding.borrow_mut();
1682 if let Some(mfp) = mfp_after {
1685 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1688 Some(row_builder.pack_using(iter.skip(key_len)))
1691 } else {
1692 None
1693 }
1694 } else {
1695 Some(row_builder.pack_using(&datums_local[key_len..]))
1696 }
1697}
1698
1699fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1700 match aggr_func {
1701 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1702 trues: Diff::ZERO,
1703 falses: Diff::ZERO,
1704 },
1705 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1706 accum: AccumCount::ZERO,
1707 pos_infs: Diff::ZERO,
1708 neg_infs: Diff::ZERO,
1709 nans: Diff::ZERO,
1710 non_nulls: Diff::ZERO,
1711 },
1712 AggregateFunc::SumNumeric => Accum::Numeric {
1713 accum: OrderedDecimal(NumericAgg::zero()),
1714 pos_infs: Diff::ZERO,
1715 neg_infs: Diff::ZERO,
1716 nans: Diff::ZERO,
1717 non_nulls: Diff::ZERO,
1718 },
1719 _ => Accum::SimpleNumber {
1720 accum: AccumCount::ZERO,
1721 non_nulls: Diff::ZERO,
1722 },
1723 }
1724}
1725
1726static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1727
1728fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1729 match aggregate_func {
1730 AggregateFunc::Count => Accum::SimpleNumber {
1731 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1733 Diff::ZERO
1734 } else {
1735 Diff::ONE
1736 },
1737 },
1738 AggregateFunc::Any | AggregateFunc::All => match datum {
1739 Datum::True => Accum::Bool {
1740 trues: Diff::ONE,
1741 falses: Diff::ZERO,
1742 },
1743 Datum::Null => Accum::Bool {
1744 trues: Diff::ZERO,
1745 falses: Diff::ZERO,
1746 },
1747 Datum::False => Accum::Bool {
1748 trues: Diff::ZERO,
1749 falses: Diff::ONE,
1750 },
1751 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1752 },
1753 AggregateFunc::Dummy => match datum {
1754 Datum::Dummy => Accum::SimpleNumber {
1755 accum: AccumCount::ZERO,
1756 non_nulls: Diff::ZERO,
1757 },
1758 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1759 },
1760 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1761 let n = match datum {
1762 Datum::Float32(n) => f64::from(*n),
1763 Datum::Float64(n) => *n,
1764 Datum::Null => 0f64,
1765 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1766 };
1767
1768 let nans = Diff::from(n.is_nan());
1769 let pos_infs = Diff::from(n == f64::INFINITY);
1770 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1771 let non_nulls = Diff::from(datum != Datum::Null);
1772
1773 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1776 AccumCount::ZERO
1777 } else {
1778 #[allow(clippy::as_conversions)]
1781 { (n * *FLOAT_SCALE) as i128 }.into()
1782 };
1783
1784 Accum::Float {
1785 accum,
1786 pos_infs,
1787 neg_infs,
1788 nans,
1789 non_nulls,
1790 }
1791 }
1792 AggregateFunc::SumNumeric => match datum {
1793 Datum::Numeric(n) => {
1794 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1795 if n.0.is_negative() {
1796 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1797 } else {
1798 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1799 }
1800 } else if n.0.is_nan() {
1801 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1802 } else {
1803 let mut cx_agg = numeric::cx_agg();
1806 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1807 };
1808
1809 Accum::Numeric {
1810 accum: OrderedDecimal(accum),
1811 pos_infs,
1812 neg_infs,
1813 nans,
1814 non_nulls: Diff::ONE,
1815 }
1816 }
1817 Datum::Null => Accum::Numeric {
1818 accum: OrderedDecimal(NumericAgg::zero()),
1819 pos_infs: Diff::ZERO,
1820 neg_infs: Diff::ZERO,
1821 nans: Diff::ZERO,
1822 non_nulls: Diff::ZERO,
1823 },
1824 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1825 },
1826 _ => {
1827 match datum {
1831 Datum::Int16(i) => Accum::SimpleNumber {
1832 accum: i.into(),
1833 non_nulls: Diff::ONE,
1834 },
1835 Datum::Int32(i) => Accum::SimpleNumber {
1836 accum: i.into(),
1837 non_nulls: Diff::ONE,
1838 },
1839 Datum::Int64(i) => Accum::SimpleNumber {
1840 accum: i.into(),
1841 non_nulls: Diff::ONE,
1842 },
1843 Datum::UInt16(u) => Accum::SimpleNumber {
1844 accum: u.into(),
1845 non_nulls: Diff::ONE,
1846 },
1847 Datum::UInt32(u) => Accum::SimpleNumber {
1848 accum: u.into(),
1849 non_nulls: Diff::ONE,
1850 },
1851 Datum::UInt64(u) => Accum::SimpleNumber {
1852 accum: u.into(),
1853 non_nulls: Diff::ONE,
1854 },
1855 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1856 accum: u64::from(t).into(),
1857 non_nulls: Diff::ONE,
1858 },
1859 Datum::Null => Accum::SimpleNumber {
1860 accum: AccumCount::ZERO,
1861 non_nulls: Diff::ZERO,
1862 },
1863 x => panic!("Accumulating non-integer data: {x:?}"),
1864 }
1865 }
1866 }
1867}
1868
1869fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1870 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1874 Datum::Null
1875 } else {
1876 match (&aggr_func, &accum) {
1877 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1878 Datum::Int64(non_nulls.into_inner())
1879 }
1880 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1881 if falses.is_positive() {
1883 Datum::False
1884 } else if *trues == total {
1885 Datum::True
1886 } else {
1887 Datum::Null
1888 }
1889 }
1890 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1891 if trues.is_positive() {
1893 Datum::True
1894 } else if *falses == total {
1895 Datum::False
1896 } else {
1897 Datum::Null
1898 }
1899 }
1900 (AggregateFunc::Dummy, _) => Datum::Dummy,
1901 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1903 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1904 #[allow(clippy::as_conversions)]
1909 Datum::Int64(accum.into_inner() as i64)
1910 }
1911 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1912 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1913 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1914 if !accum.is_negative() {
1915 #[allow(clippy::as_conversions)]
1921 Datum::UInt64(accum.into_inner() as u64)
1922 } else {
1923 Datum::Null
1927 }
1928 }
1929 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1930 if !accum.is_negative() {
1931 Datum::from(*accum)
1932 } else {
1933 Datum::Null
1937 }
1938 }
1939 (
1940 AggregateFunc::SumFloat32,
1941 Accum::Float {
1942 accum,
1943 pos_infs,
1944 neg_infs,
1945 nans,
1946 non_nulls: _,
1947 },
1948 ) => {
1949 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1950 Datum::from(f32::NAN)
1953 } else if pos_infs.is_positive() {
1954 Datum::from(f32::INFINITY)
1955 } else if neg_infs.is_positive() {
1956 Datum::from(f32::NEG_INFINITY)
1957 } else {
1958 #[allow(clippy::as_conversions)]
1960 {
1961 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1962 }
1963 }
1964 }
1965 (
1966 AggregateFunc::SumFloat64,
1967 Accum::Float {
1968 accum,
1969 pos_infs,
1970 neg_infs,
1971 nans,
1972 non_nulls: _,
1973 },
1974 ) => {
1975 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1976 Datum::from(f64::NAN)
1979 } else if pos_infs.is_positive() {
1980 Datum::from(f64::INFINITY)
1981 } else if neg_infs.is_positive() {
1982 Datum::from(f64::NEG_INFINITY)
1983 } else {
1984 #[allow(clippy::as_conversions)]
1986 {
1987 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1988 }
1989 }
1990 }
1991 (
1992 AggregateFunc::SumNumeric,
1993 Accum::Numeric {
1994 accum,
1995 pos_infs,
1996 neg_infs,
1997 nans,
1998 non_nulls: _,
1999 },
2000 ) => {
2001 let mut cx_datum = numeric::cx_datum();
2002 let d = cx_datum.to_width(accum.0);
2003 let inf_d = d.is_infinite();
2009 let neg_d = d.is_negative();
2010 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
2011 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
2012 if nans.is_positive() || (pos_inf && neg_inf) {
2013 Datum::from(Numeric::nan())
2016 } else if pos_inf {
2017 Datum::from(Numeric::infinity())
2018 } else if neg_inf {
2019 let mut cx = numeric::cx_datum();
2020 let mut d = Numeric::infinity();
2021 cx.neg(&mut d);
2022 Datum::from(d)
2023 } else {
2024 Datum::from(d)
2025 }
2026 }
2027 _ => panic!(
2028 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
2029 aggr_func
2030 ),
2031 }
2032 }
2033}
2034
2035type AccumCount = mz_ore::Overflowing<i128>;
2037
2038#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2049enum Accum {
2050 Bool {
2052 trues: Diff,
2054 falses: Diff,
2056 },
2057 SimpleNumber {
2059 accum: AccumCount,
2061 non_nulls: Diff,
2063 },
2064 Float {
2066 accum: AccumCount,
2069 pos_infs: Diff,
2071 neg_infs: Diff,
2073 nans: Diff,
2075 non_nulls: Diff,
2077 },
2078 Numeric {
2080 accum: OrderedDecimal<NumericAgg>,
2082 pos_infs: Diff,
2084 neg_infs: Diff,
2086 nans: Diff,
2088 non_nulls: Diff,
2090 },
2091}
2092
2093impl IsZero for Accum {
2094 fn is_zero(&self) -> bool {
2095 match self {
2096 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2097 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2098 Accum::Float {
2099 accum,
2100 pos_infs,
2101 neg_infs,
2102 nans,
2103 non_nulls,
2104 } => {
2105 accum.is_zero()
2106 && pos_infs.is_zero()
2107 && neg_infs.is_zero()
2108 && nans.is_zero()
2109 && non_nulls.is_zero()
2110 }
2111 Accum::Numeric {
2112 accum,
2113 pos_infs,
2114 neg_infs,
2115 nans,
2116 non_nulls,
2117 } => {
2118 accum.0.is_zero()
2119 && pos_infs.is_zero()
2120 && neg_infs.is_zero()
2121 && nans.is_zero()
2122 && non_nulls.is_zero()
2123 }
2124 }
2125 }
2126}
2127
2128impl Semigroup for Accum {
2129 fn plus_equals(&mut self, other: &Accum) {
2130 match (&mut *self, other) {
2131 (
2132 Accum::Bool { trues, falses },
2133 Accum::Bool {
2134 trues: other_trues,
2135 falses: other_falses,
2136 },
2137 ) => {
2138 *trues += other_trues;
2139 *falses += other_falses;
2140 }
2141 (
2142 Accum::SimpleNumber { accum, non_nulls },
2143 Accum::SimpleNumber {
2144 accum: other_accum,
2145 non_nulls: other_non_nulls,
2146 },
2147 ) => {
2148 *accum += other_accum;
2149 *non_nulls += other_non_nulls;
2150 }
2151 (
2152 Accum::Float {
2153 accum,
2154 pos_infs,
2155 neg_infs,
2156 nans,
2157 non_nulls,
2158 },
2159 Accum::Float {
2160 accum: other_accum,
2161 pos_infs: other_pos_infs,
2162 neg_infs: other_neg_infs,
2163 nans: other_nans,
2164 non_nulls: other_non_nulls,
2165 },
2166 ) => {
2167 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2168 warn!("Float accumulator overflow. Incorrect results possible");
2169 accum.wrapping_add(*other_accum)
2170 });
2171 *pos_infs += other_pos_infs;
2172 *neg_infs += other_neg_infs;
2173 *nans += other_nans;
2174 *non_nulls += other_non_nulls;
2175 }
2176 (
2177 Accum::Numeric {
2178 accum,
2179 pos_infs,
2180 neg_infs,
2181 nans,
2182 non_nulls,
2183 },
2184 Accum::Numeric {
2185 accum: other_accum,
2186 pos_infs: other_pos_infs,
2187 neg_infs: other_neg_infs,
2188 nans: other_nans,
2189 non_nulls: other_non_nulls,
2190 },
2191 ) => {
2192 let mut cx_agg = numeric::cx_agg();
2193 cx_agg.add(&mut accum.0, &other_accum.0);
2194 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2200 cx_agg.reduce(&mut accum.0);
2219 *pos_infs += other_pos_infs;
2220 *neg_infs += other_neg_infs;
2221 *nans += other_nans;
2222 *non_nulls += other_non_nulls;
2223 }
2224 (l, r) => unreachable!(
2225 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2226 ),
2227 }
2228 }
2229}
2230
2231impl Multiply<Diff> for Accum {
2232 type Output = Accum;
2233
2234 fn multiply(self, factor: &Diff) -> Accum {
2235 let factor = *factor;
2236 match self {
2237 Accum::Bool { trues, falses } => Accum::Bool {
2238 trues: trues * factor,
2239 falses: falses * factor,
2240 },
2241 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2242 accum: accum * AccumCount::from(factor),
2243 non_nulls: non_nulls * factor,
2244 },
2245 Accum::Float {
2246 accum,
2247 pos_infs,
2248 neg_infs,
2249 nans,
2250 non_nulls,
2251 } => Accum::Float {
2252 accum: accum
2253 .checked_mul(AccumCount::from(factor))
2254 .unwrap_or_else(|| {
2255 warn!("Float accumulator overflow. Incorrect results possible");
2256 accum.wrapping_mul(AccumCount::from(factor))
2257 }),
2258 pos_infs: pos_infs * factor,
2259 neg_infs: neg_infs * factor,
2260 nans: nans * factor,
2261 non_nulls: non_nulls * factor,
2262 },
2263 Accum::Numeric {
2264 accum,
2265 pos_infs,
2266 neg_infs,
2267 nans,
2268 non_nulls,
2269 } => {
2270 let mut cx = numeric::cx_agg();
2271 let mut f = NumericAgg::from(factor.into_inner());
2272 cx.mul(&mut f, &accum.0);
2276 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2282 Accum::Numeric {
2283 accum: OrderedDecimal(f),
2284 pos_infs: pos_infs * factor,
2285 neg_infs: neg_infs * factor,
2286 nans: nans * factor,
2287 non_nulls: non_nulls * factor,
2288 }
2289 }
2290 }
2291 }
2292}
2293
2294impl Columnation for Accum {
2295 type InnerRegion = CopyRegion<Self>;
2296}
2297
2298mod monoids {
2300
2301 use differential_dataflow::containers::{Columnation, Region};
2317 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2318 use mz_expr::AggregateFunc;
2319 use mz_ore::soft_panic_or_log;
2320 use mz_repr::{Datum, Diff, Row};
2321 use serde::{Deserialize, Serialize};
2322
2323 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
2325 pub enum ReductionMonoid {
2326 Min(Row),
2327 Max(Row),
2328 }
2329
2330 impl ReductionMonoid {
2331 pub fn finalize(&self) -> &Row {
2332 use ReductionMonoid::*;
2333 match self {
2334 Min(row) | Max(row) => row,
2335 }
2336 }
2337 }
2338
2339 impl Multiply<Diff> for ReductionMonoid {
2340 type Output = Self;
2341
2342 fn multiply(self, factor: &Diff) -> Self {
2343 assert!(factor.is_positive());
2348 self
2349 }
2350 }
2351
2352 impl Semigroup for ReductionMonoid {
2353 fn plus_equals(&mut self, rhs: &Self) {
2354 match (self, rhs) {
2355 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2356 let swap = {
2357 let lhs_val = lhs.unpack_first();
2358 let rhs_val = rhs.unpack_first();
2359 match (lhs_val, rhs_val) {
2361 (_, Datum::Null) => false,
2362 (Datum::Null, _) => true,
2363 (lhs, rhs) => rhs < lhs,
2364 }
2365 };
2366 if swap {
2367 lhs.clone_from(rhs);
2368 }
2369 }
2370 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2371 let swap = {
2372 let lhs_val = lhs.unpack_first();
2373 let rhs_val = rhs.unpack_first();
2374 match (lhs_val, rhs_val) {
2376 (_, Datum::Null) => false,
2377 (Datum::Null, _) => true,
2378 (lhs, rhs) => rhs > lhs,
2379 }
2380 };
2381 if swap {
2382 lhs.clone_from(rhs);
2383 }
2384 }
2385 (lhs, rhs) => {
2386 soft_panic_or_log!(
2387 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2388 );
2389 }
2390 }
2391 }
2392 }
2393
2394 impl IsZero for ReductionMonoid {
2395 fn is_zero(&self) -> bool {
2396 false
2402 }
2403 }
2404
2405 impl Columnation for ReductionMonoid {
2406 type InnerRegion = ReductionMonoidRegion;
2407 }
2408
2409 #[derive(Default)]
2413 pub struct ReductionMonoidRegion {
2414 inner: <Row as Columnation>::InnerRegion,
2415 }
2416
2417 impl Region for ReductionMonoidRegion {
2418 type Item = ReductionMonoid;
2419
2420 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2421 use ReductionMonoid::*;
2422 match item {
2423 Min(row) => Min(unsafe { self.inner.copy(row) }),
2424 Max(row) => Max(unsafe { self.inner.copy(row) }),
2425 }
2426 }
2427
2428 fn clear(&mut self) {
2429 self.inner.clear();
2430 }
2431
2432 fn reserve_items<'a, I>(&mut self, items: I)
2433 where
2434 Self: 'a,
2435 I: Iterator<Item = &'a Self::Item> + Clone,
2436 {
2437 self.inner
2438 .reserve_items(items.map(ReductionMonoid::finalize));
2439 }
2440
2441 fn reserve_regions<'a, I>(&mut self, regions: I)
2442 where
2443 Self: 'a,
2444 I: Iterator<Item = &'a Self> + Clone,
2445 {
2446 self.inner.reserve_regions(regions.map(|r| &r.inner));
2447 }
2448
2449 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2450 self.inner.heap_size(callback);
2451 }
2452 }
2453
2454 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2457 match func {
2458 AggregateFunc::MaxNumeric
2459 | AggregateFunc::MaxInt16
2460 | AggregateFunc::MaxInt32
2461 | AggregateFunc::MaxInt64
2462 | AggregateFunc::MaxUInt16
2463 | AggregateFunc::MaxUInt32
2464 | AggregateFunc::MaxUInt64
2465 | AggregateFunc::MaxMzTimestamp
2466 | AggregateFunc::MaxFloat32
2467 | AggregateFunc::MaxFloat64
2468 | AggregateFunc::MaxBool
2469 | AggregateFunc::MaxString
2470 | AggregateFunc::MaxDate
2471 | AggregateFunc::MaxTimestamp
2472 | AggregateFunc::MaxTimestampTz
2473 | AggregateFunc::MaxInterval
2474 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2475 AggregateFunc::MinNumeric
2476 | AggregateFunc::MinInt16
2477 | AggregateFunc::MinInt32
2478 | AggregateFunc::MinInt64
2479 | AggregateFunc::MinUInt16
2480 | AggregateFunc::MinUInt32
2481 | AggregateFunc::MinUInt64
2482 | AggregateFunc::MinMzTimestamp
2483 | AggregateFunc::MinFloat32
2484 | AggregateFunc::MinFloat64
2485 | AggregateFunc::MinBool
2486 | AggregateFunc::MinString
2487 | AggregateFunc::MinDate
2488 | AggregateFunc::MinTimestamp
2489 | AggregateFunc::MinTimestampTz
2490 | AggregateFunc::MinInterval
2491 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2492 AggregateFunc::SumInt16
2493 | AggregateFunc::SumInt32
2494 | AggregateFunc::SumInt64
2495 | AggregateFunc::SumUInt16
2496 | AggregateFunc::SumUInt32
2497 | AggregateFunc::SumUInt64
2498 | AggregateFunc::SumFloat32
2499 | AggregateFunc::SumFloat64
2500 | AggregateFunc::SumNumeric
2501 | AggregateFunc::Count
2502 | AggregateFunc::Any
2503 | AggregateFunc::All
2504 | AggregateFunc::Dummy
2505 | AggregateFunc::JsonbAgg { .. }
2506 | AggregateFunc::JsonbObjectAgg { .. }
2507 | AggregateFunc::MapAgg { .. }
2508 | AggregateFunc::ArrayConcat { .. }
2509 | AggregateFunc::ListConcat { .. }
2510 | AggregateFunc::StringAgg { .. }
2511 | AggregateFunc::RowNumber { .. }
2512 | AggregateFunc::Rank { .. }
2513 | AggregateFunc::DenseRank { .. }
2514 | AggregateFunc::LagLead { .. }
2515 | AggregateFunc::FirstValue { .. }
2516 | AggregateFunc::LastValue { .. }
2517 | AggregateFunc::WindowAggregate { .. }
2518 | AggregateFunc::FusedValueWindowFunc { .. }
2519 | AggregateFunc::FusedWindowAggregate { .. } => None,
2520 }
2521 }
2522}
2523
2524mod window_agg_helpers {
2525 use crate::render::reduce::*;
2526
2527 pub enum OneByOneAggrImpls {
2532 Accumulable(AccumulableOneByOneAggr),
2533 Hierarchical(HierarchicalOneByOneAggr),
2534 Basic(mz_expr::NaiveOneByOneAggr),
2535 }
2536
2537 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2538 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2539 match reduction_type(agg) {
2540 ReductionType::Basic => {
2541 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2542 }
2543 ReductionType::Accumulable => {
2544 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2545 }
2546 ReductionType::Hierarchical => {
2547 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2548 }
2549 }
2550 }
2551
2552 fn give(&mut self, d: &Datum) {
2553 match self {
2554 OneByOneAggrImpls::Basic(i) => i.give(d),
2555 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2556 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2557 }
2558 }
2559
2560 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2561 match self {
2563 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2564 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2565 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2566 }
2567 }
2568 }
2569
2570 pub struct AccumulableOneByOneAggr {
2571 aggr_func: AggregateFunc,
2572 accum: Accum,
2573 total: Diff,
2574 }
2575
2576 impl AccumulableOneByOneAggr {
2577 fn new(aggr_func: &AggregateFunc) -> Self {
2578 AccumulableOneByOneAggr {
2579 aggr_func: aggr_func.clone(),
2580 accum: accumulable_zero(aggr_func),
2581 total: Diff::ZERO,
2582 }
2583 }
2584
2585 fn give(&mut self, d: &Datum) {
2586 self.accum
2587 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2588 self.total += Diff::ONE;
2589 }
2590
2591 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2592 temp_storage.make_datum(|packer| {
2593 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2594 })
2595 }
2596 }
2597
2598 pub struct HierarchicalOneByOneAggr {
2599 aggr_func: AggregateFunc,
2600 monoid: ReductionMonoid,
2603 }
2604
2605 impl HierarchicalOneByOneAggr {
2606 fn new(aggr_func: &AggregateFunc) -> Self {
2607 let mut row_buf = Row::default();
2608 row_buf.packer().push(Datum::Null);
2609 HierarchicalOneByOneAggr {
2610 aggr_func: aggr_func.clone(),
2611 monoid: get_monoid(row_buf, aggr_func)
2612 .expect("aggr_func should be a hierarchical aggregation function"),
2613 }
2614 }
2615
2616 fn give(&mut self, d: &Datum) {
2617 let mut row_buf = Row::default();
2618 row_buf.packer().push(d);
2619 let m = get_monoid(row_buf, &self.aggr_func)
2620 .expect("aggr_func should be a hierarchical aggregation function");
2621 self.monoid.plus_equals(&m);
2622 }
2623
2624 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2625 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2626 }
2627 }
2628}