1use std::collections::BTreeMap;
15use std::sync::LazyLock;
16
17use dec::OrderedDecimal;
18use differential_dataflow::Data;
19use differential_dataflow::collection::AsCollection;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::containers::{Columnation, CopyRegion};
22use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
23use differential_dataflow::hashable::Hashable;
24use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Collection, Diff as _};
27use itertools::Itertools;
28use mz_compute_types::plan::reduce::{
29 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, MonotonicPlan,
30 ReducePlan, ReductionType, SingleBasicPlan, reduction_type,
31};
32use mz_expr::{
33 AggregateExpr, AggregateFunc, EvalError, MapFilterProject, MirScalarExpr, SafeMfpPlan,
34};
35use mz_repr::adt::numeric::{self, Numeric, NumericAgg};
36use mz_repr::fixed_length::ToDatumIter;
37use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
38use mz_storage_types::errors::DataflowError;
39use mz_timely_util::operator::CollectionExt;
40use serde::{Deserialize, Serialize};
41use timely::Container;
42use timely::container::{CapacityContainerBuilder, PushInto};
43use timely::dataflow::Scope;
44use timely::progress::timestamp::Refines;
45use tracing::warn;
46
47use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
48use crate::extensions::reduce::{MzReduce, ReduceExt};
49use crate::render::context::{CollectionBundle, Context};
50use crate::render::errors::MaybeValidatingRow;
51use crate::render::reduce::monoids::{ReductionMonoid, get_monoid};
52use crate::render::{ArrangementFlavor, Pairer};
53use crate::row_spine::{
54 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBatcher, RowValBuilder,
55};
56use crate::typedefs::{
57 ErrBatcher, ErrBuilder, KeyBatcher, MzTimestamp, RowErrBuilder, RowErrSpine, RowRowAgent,
58 RowRowArrangement, RowRowSpine, RowSpine, RowValSpine,
59};
60
61impl<G, T> Context<G, T>
62where
63 G: Scope,
64 G::Timestamp: MzTimestamp + Refines<T>,
65 T: MzTimestamp,
66{
67 pub fn render_reduce(
70 &self,
71 input_key: Option<Vec<MirScalarExpr>>,
72 input: CollectionBundle<G, T>,
73 key_val_plan: KeyValPlan,
74 reduce_plan: ReducePlan,
75 mfp_after: Option<MapFilterProject>,
76 ) -> CollectionBundle<G, T> {
77 let mfp_after = mfp_after.map(|m| {
79 m.into_plan()
80 .expect("MFP planning must succeed")
81 .into_nontemporal()
82 .expect("Fused Reduce MFPs do not have temporal predicates")
83 });
84
85 input.scope().region_named("Reduce", |inner| {
86 let KeyValPlan {
87 mut key_plan,
88 mut val_plan,
89 } = key_val_plan;
90 let key_arity = key_plan.projection.len();
91 let mut datums = DatumVec::new();
92
93 let mut demand = Vec::new();
95 demand.extend(key_plan.demand());
96 demand.extend(val_plan.demand());
97 demand.sort();
98 demand.dedup();
99
100 let mut demand_map = BTreeMap::new();
102 for column in demand.iter() {
103 demand_map.insert(*column, demand_map.len());
104 }
105 let demand_map_len = demand_map.len();
106 key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
107 val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
108 let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
109 let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
110
111 let (key_val_input, err_input) = input.enter_region(inner).flat_map(
112 input_key.map(|k| (k, None)),
113 max_demand,
114 move |row_datums, time, diff| {
115 let mut row_builder = SharedRow::get();
116 let temp_storage = RowArena::new();
117
118 let mut row_iter = row_datums.drain(..);
119 let mut datums_local = datums.borrow();
120 for skip in skips.iter() {
122 datums_local.push(row_iter.nth(*skip).unwrap());
123 }
124
125 let key =
127 key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
128 let key = match key {
129 Err(e) => {
130 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
131 }
132 Ok(Some(key)) => key.clone(),
133 Ok(None) => panic!("Row expected as no predicate was used"),
134 };
135
136 datums_local.truncate(skips.len());
139 let val =
140 val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder);
141 let val = match val {
142 Err(e) => {
143 return Some((Err(DataflowError::from(e)), time.clone(), diff.clone()));
144 }
145 Ok(Some(val)) => val.clone(),
146 Ok(None) => panic!("Row expected as no predicate was used"),
147 };
148
149 Some((Ok((key, val)), time.clone(), diff.clone()))
150 },
151 );
152
153 type CB<T> = ConsolidatingContainerBuilder<T>;
155 let (ok, mut err) = key_val_input
156 .as_collection()
157 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);
158
159 err = err.concat(&err_input);
160
161 self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after)
163 .leave_region()
164 })
165 }
166
167 fn render_reduce_plan<S>(
173 &self,
174 plan: ReducePlan,
175 collection: Collection<S, (Row, Row), Diff>,
176 err_input: Collection<S, DataflowError, Diff>,
177 key_arity: usize,
178 mfp_after: Option<SafeMfpPlan>,
179 ) -> CollectionBundle<S, T>
180 where
181 S: Scope<Timestamp = G::Timestamp>,
182 {
183 let mut errors = Default::default();
184 let arrangement =
185 self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity, mfp_after);
186 let errs: KeyCollection<_, _, _> = err_input.concatenate(errors).into();
187 CollectionBundle::from_columns(
188 0..key_arity,
189 ArrangementFlavor::Local(
190 arrangement,
191 errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange bundle err"),
192 ),
193 )
194 }
195
196 fn render_reduce_plan_inner<S>(
197 &self,
198 plan: ReducePlan,
199 collection: Collection<S, (Row, Row), Diff>,
200 errors: &mut Vec<Collection<S, DataflowError, Diff>>,
201 key_arity: usize,
202 mfp_after: Option<SafeMfpPlan>,
203 ) -> Arranged<S, RowRowAgent<S::Timestamp, Diff>>
204 where
205 S: Scope<Timestamp = G::Timestamp>,
206 {
207 let arrangement = match plan {
210 ReducePlan::Distinct => {
213 let (arranged_output, errs) = self.build_distinct(collection, mfp_after);
214 errors.push(errs);
215 arranged_output
216 }
217 ReducePlan::Accumulable(expr) => {
218 let (arranged_output, errs) =
219 self.build_accumulable(collection, expr, key_arity, mfp_after);
220 errors.push(errs);
221 arranged_output
222 }
223 ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(expr)) => {
224 let (output, errs) = self.build_monotonic(collection, expr, mfp_after);
225 errors.push(errs);
226 output
227 }
228 ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(expr)) => {
229 let (output, errs) = self.build_bucketed(collection, expr, key_arity, mfp_after);
230 errors.push(errs);
231 output
232 }
233 ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
234 index,
235 expr,
236 fused_unnest_list,
237 })) => {
238 let validating = !fused_unnest_list;
242 let (output, errs) = self.build_basic_aggregate(
243 collection,
244 index,
245 &expr,
246 validating,
247 key_arity,
248 mfp_after,
249 fused_unnest_list,
250 );
251 if validating {
252 errors.push(errs.expect("validation should have occurred as it was requested"));
253 }
254 output
255 }
256 ReducePlan::Basic(BasicPlan::Multiple(aggrs)) => {
257 let (output, errs) =
258 self.build_basic_aggregates(collection, aggrs, key_arity, mfp_after);
259 errors.push(errs);
260 output
261 }
262 ReducePlan::Collation(expr) => {
265 let mut to_collate = vec![];
267
268 for plan in [
269 expr.hierarchical.map(ReducePlan::Hierarchical),
270 expr.accumulable.map(ReducePlan::Accumulable),
271 expr.basic.map(ReducePlan::Basic),
272 ]
273 .into_iter()
274 .flat_map(std::convert::identity)
275 {
276 let r#type = ReductionType::try_from(&plan)
277 .expect("only representable reduction types were used above");
278
279 let arrangement = self.render_reduce_plan_inner(
280 plan,
281 collection.clone(),
282 errors,
283 key_arity,
284 None,
285 );
286 to_collate.push((r#type, arrangement));
287 }
288
289 let (oks, errs) = self.build_collation(
291 to_collate,
292 expr.aggregate_types,
293 &mut collection.scope(),
294 mfp_after,
295 );
296 errors.push(errs);
297 oks
298 }
299 };
300 arrangement
301 }
302
303 fn build_collation<S>(
311 &self,
312 arrangements: Vec<(ReductionType, RowRowArrangement<S>)>,
313 aggregate_types: Vec<ReductionType>,
314 scope: &mut S,
315 mfp_after: Option<SafeMfpPlan>,
316 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
317 where
318 S: Scope<Timestamp = G::Timestamp>,
319 {
320 let error_logger = self.error_logger();
321
322 if arrangements.len() <= 1 {
324 error_logger.soft_panic_or_log(
325 "Incorrect number of arrangements in reduce collation",
326 &format!("len={}", arrangements.len()),
327 );
328 }
329
330 let mut to_concat = vec![];
331
332 for (reduction_type, arrangement) in arrangements.into_iter() {
334 let collection = arrangement
335 .as_collection(move |key, val| (key.to_row(), (reduction_type, val.to_row())));
336 to_concat.push(collection);
337 }
338
339 let mut distinct_aggregate_types = aggregate_types.clone();
343 distinct_aggregate_types.sort_unstable();
344 distinct_aggregate_types.dedup();
345 let n_distinct_aggregate_types = distinct_aggregate_types.len();
346
347 let mut datums1 = DatumVec::new();
349 let mut datums2 = DatumVec::new();
350 let mfp_after1 = mfp_after.clone();
351 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
352
353 let aggregate_types_err = aggregate_types.clone();
354 let (oks, errs) = differential_dataflow::collection::concatenate(scope, to_concat)
355 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_,_,_>, RowValSpine<_, _, _>>("Arrange ReduceCollation")
356 .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
357 "ReduceCollation",
358 "ReduceCollation Errors",
359 {
360 move |key, input, output| {
361 let mut accumulable = DatumList::empty().iter();
370 let mut hierarchical = DatumList::empty().iter();
371 let mut basic = DatumList::empty().iter();
372
373 if input.len() != n_distinct_aggregate_types {
377 return;
378 }
379
380 for (item, _) in input.iter() {
381 let reduction_type = &item.0;
382 let row = &item.1;
383 match reduction_type {
384 ReductionType::Accumulable => accumulable = row.iter(),
385 ReductionType::Hierarchical => hierarchical = row.iter(),
386 ReductionType::Basic => basic = row.iter(),
387 }
388 }
389
390 let temp_storage = RowArena::new();
391 let datum_iter = key.to_datum_iter();
392 let mut datums_local = datums1.borrow();
393 datums_local.extend(datum_iter);
394 let key_len = datums_local.len();
395
396 for typ in aggregate_types.iter() {
398 let datum = match typ {
399 ReductionType::Accumulable => accumulable.next(),
400 ReductionType::Hierarchical => hierarchical.next(),
401 ReductionType::Basic => basic.next(),
402 };
403 let Some(datum) = datum else { return };
404 datums_local.push(datum);
405 }
406
407 if (accumulable.next(), hierarchical.next(), basic.next())
415 == (None, None, None)
416 {
417 if let Some(row) = evaluate_mfp_after(
418 &mfp_after1,
419 &mut datums_local,
420 &temp_storage,
421 key_len,
422 ) {
423 output.push((row, Diff::ONE));
424 }
425 }
426 }
427 },
428 move |key, input, output| {
429 if input.len() != n_distinct_aggregate_types {
430 let message = "Mismatched aggregates for key in ReduceCollation";
434 error_logger.log(
435 message,
436 &format!(
437 "key={key:?}, n_aggregates_requested={requested}, \
438 n_distinct_aggregate_types={n_distinct_aggregate_types}",
439 requested = input.len(),
440 ),
441 );
442 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
443 return;
444 }
445
446 let mut accumulable = DatumList::empty().iter();
447 let mut hierarchical = DatumList::empty().iter();
448 let mut basic = DatumList::empty().iter();
449 for (item, _) in input.iter() {
450 let reduction_type = &item.0;
451 let row = &item.1;
452 match reduction_type {
453 ReductionType::Accumulable => accumulable = row.iter(),
454 ReductionType::Hierarchical => hierarchical = row.iter(),
455 ReductionType::Basic => basic = row.iter(),
456 }
457 }
458
459 let temp_storage = RowArena::new();
460 let datum_iter = key.to_datum_iter();
461 let mut datums_local = datums2.borrow();
462 datums_local.extend(datum_iter);
463
464 for typ in aggregate_types_err.iter() {
465 let datum = match typ {
466 ReductionType::Accumulable => accumulable.next(),
467 ReductionType::Hierarchical => hierarchical.next(),
468 ReductionType::Basic => basic.next(),
469 };
470 if let Some(datum) = datum {
471 datums_local.push(datum);
472 } else {
473 let message = "Missing value for key in ReduceCollation";
476 error_logger.log(message, &format!("typ={typ:?}, key={key:?}"));
477 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
478 return;
479 }
480 }
481
482 if (accumulable.next(), hierarchical.next(), basic.next()) != (None, None, None)
485 {
486 let message = "Rows too large for key in ReduceCollation";
487 error_logger.log(message, &format!("key={key:?}"));
488 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
489 }
490
491 let Some(mfp) = &mfp_after2 else { return };
494 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
495 output.push((e.into(), Diff::ONE));
496 }
497 },
498 );
499 (oks, errs.as_collection(|_, v| v.clone()))
500 }
501
502 fn build_distinct<S>(
504 &self,
505 collection: Collection<S, (Row, Row), Diff>,
506 mfp_after: Option<SafeMfpPlan>,
507 ) -> (
508 Arranged<S, TraceAgent<RowRowSpine<S::Timestamp, Diff>>>,
509 Collection<S, DataflowError, Diff>,
510 )
511 where
512 S: Scope<Timestamp = G::Timestamp>,
513 {
514 let error_logger = self.error_logger();
515
516 let mut datums1 = DatumVec::new();
518 let mut datums2 = DatumVec::new();
519 let mfp_after1 = mfp_after.clone();
520 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
521
522 let (output, errors) = collection
523 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _,>, RowRowSpine<_, _>>("Arranged DistinctBy")
524 .reduce_pair::<_, RowRowBuilder<_, _,>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
525 "DistinctBy",
526 "DistinctByErrorCheck",
527 move |key, _input, output| {
528 let temp_storage = RowArena::new();
529 let mut datums_local = datums1.borrow();
530 datums_local.extend(key.to_datum_iter());
531
532 if mfp_after1
536 .as_ref()
537 .map(|mfp| mfp.evaluate_inner(&mut datums_local, &temp_storage))
538 .unwrap_or(Ok(true))
539 == Ok(true)
540 {
541 output.push((Row::default(), Diff::ONE));
545 }
546 },
547 move |key, input: &[(_, Diff)], output: &mut Vec<(DataflowError, _)>| {
548 for (_, count) in input.iter() {
549 if count.is_positive() {
550 continue;
551 }
552 let message = "Non-positive multiplicity in DistinctBy";
553 error_logger.log(message, &format!("row={key:?}, count={count}"));
554 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
555 return;
556 }
557 let Some(mfp) = &mfp_after2 else { return };
559 let temp_storage = RowArena::new();
560 let datum_iter = key.to_datum_iter();
561 let mut datums_local = datums2.borrow();
562 datums_local.extend(datum_iter);
563
564 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
565 output.push((e.into(), Diff::ONE));
566 }
567 },
568 );
569 (output, errors.as_collection(|_k, v| v.clone()))
570 }
571
572 fn build_basic_aggregates<S>(
580 &self,
581 input: Collection<S, (Row, Row), Diff>,
582 aggrs: Vec<(usize, AggregateExpr)>,
583 key_arity: usize,
584 mfp_after: Option<SafeMfpPlan>,
585 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
586 where
587 S: Scope<Timestamp = G::Timestamp>,
588 {
589 if aggrs.len() <= 1 {
592 self.error_logger().soft_panic_or_log(
593 "Too few aggregations when building basic aggregates",
594 &format!("len={}", aggrs.len()),
595 )
596 }
597 let mut err_output = None;
598 let mut to_collect = Vec::new();
599 for (index, aggr) in aggrs {
600 let (result, errs) = self.build_basic_aggregate(
601 input.clone(),
602 index,
603 &aggr,
604 err_output.is_none(),
605 key_arity,
606 None,
607 false,
608 );
609 if errs.is_some() {
610 err_output = errs
611 }
612 to_collect
613 .push(result.as_collection(move |key, val| (key.to_row(), (index, val.to_row()))));
614 }
615
616 let mut datums1 = DatumVec::new();
618 let mut datums2 = DatumVec::new();
619 let mfp_after1 = mfp_after.clone();
620 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
621
622 let arranged =
623 differential_dataflow::collection::concatenate(&mut input.scope(), to_collect)
624 .mz_arrange::<RowValBatcher<_, _, _>, RowValBuilder<_, _, _>, RowValSpine<_, _, _>>(
625 "Arranged ReduceFuseBasic input",
626 );
627
628 let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
629 "ReduceFuseBasic",
630 {
631 move |key, input, output| {
632 let temp_storage = RowArena::new();
633 let datum_iter = key.to_datum_iter();
634 let mut datums_local = datums1.borrow();
635 datums_local.extend(datum_iter);
636 let key_len = datums_local.len();
637
638 for ((_, row), _) in input.iter() {
639 datums_local.push(row.unpack_first());
640 }
641
642 if let Some(row) =
643 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
644 {
645 output.push((row, Diff::ONE));
646 }
647 }
648 },
649 );
650 let validation_errs = err_output.expect("expected to validate in at least one aggregate");
655 if let Some(mfp) = mfp_after2 {
656 let mfp_errs = arranged
657 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
658 "ReduceFuseBasic Error Check",
659 move |key, input, output| {
660 let temp_storage = RowArena::new();
663 let datum_iter = key.to_datum_iter();
664 let mut datums_local = datums2.borrow();
665 datums_local.extend(datum_iter);
666
667 for ((_, row), _) in input.iter() {
668 datums_local.push(row.unpack_first());
669 }
670
671 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
672 output.push((e.into(), Diff::ONE));
673 }
674 },
675 )
676 .as_collection(|_, v| v.clone());
677 (output, validation_errs.concat(&mfp_errs))
678 } else {
679 (output, validation_errs)
680 }
681 }
682
683 fn build_basic_aggregate<S>(
687 &self,
688 input: Collection<S, (Row, Row), Diff>,
689 index: usize,
690 aggr: &AggregateExpr,
691 validating: bool,
692 key_arity: usize,
693 mfp_after: Option<SafeMfpPlan>,
694 fused_unnest_list: bool,
695 ) -> (
696 RowRowArrangement<S>,
697 Option<Collection<S, DataflowError, Diff>>,
698 )
699 where
700 S: Scope<Timestamp = G::Timestamp>,
701 {
702 let AggregateExpr {
703 func,
704 expr: _,
705 distinct,
706 } = aggr.clone();
707
708 let mut partial = input.map(move |(key, row)| {
710 let mut row_builder = SharedRow::get();
711 let value = row.iter().nth(index).unwrap();
712 row_builder.packer().push(value);
713 (key, row_builder.clone())
714 });
715
716 let mut err_output = None;
717
718 if distinct {
720 let pairer = Pairer::new(key_arity);
722 let keyed = partial.map(move |(key, val)| pairer.merge(&key, &val));
723 if validating {
724 let (oks, errs) = self
725 .build_reduce_inaccumulable_distinct::<_,RowValBuilder<Result<(), String>, _,_>, RowValSpine<Result<(), String>, _, _>>(keyed, None)
726 .as_collection(|k, v| (k.to_row(), v.as_ref().map(|&()| ()).map_err(|m| m.as_str().into())))
727 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>("Demux Errors", move |(key_val, result)| match result {
728 Ok(()) => Ok(pairer.split(&key_val)),
729 Err(m) => Err(EvalError::Internal(m).into()),
730 });
731 err_output = Some(errs);
732 partial = oks;
733 } else {
734 partial = self
735 .build_reduce_inaccumulable_distinct::<_, RowBuilder<_, _>, RowSpine<_, _>>(
736 keyed,
737 Some(" [val: empty]"),
738 )
739 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter));
740 }
741 }
742
743 let mut datums1 = DatumVec::new();
745 let mut datums2 = DatumVec::new();
746 let mut datums_key_1 = DatumVec::new();
747 let mut datums_key_2 = DatumVec::new();
748 let mfp_after1 = mfp_after.clone();
749 let func2 = func.clone();
750
751 let name = if !fused_unnest_list {
752 "ReduceInaccumulable"
753 } else {
754 "FusedReduceUnnestList"
755 };
756 let arranged = partial
757 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(&format!(
758 "Arranged {name}"
759 ));
760 let oks = if !fused_unnest_list {
761 arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
762 move |key, source, target| {
763 let iter = source.iter().flat_map(|(v, w)| {
767 let count = usize::try_from(w.into_inner()).unwrap_or(0);
770 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
771 });
772
773 let temp_storage = RowArena::new();
774 let datum_iter = key.to_datum_iter();
775 let mut datums_local = datums1.borrow();
776 datums_local.extend(datum_iter);
777 let key_len = datums_local.len();
778 datums_local.push(
779 func.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
782 iter,
783 &temp_storage,
784 ),
785 );
786
787 if let Some(row) =
788 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
789 {
790 target.push((row, Diff::ONE));
791 }
792 }
793 })
794 } else {
795 arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name, {
796 move |key, source, target| {
797 let iter = source.iter().flat_map(|(v, w)| {
799 let count = usize::try_from(w.into_inner()).unwrap_or(0);
800 std::iter::repeat(v.to_datum_iter().next().unwrap()).take(count)
801 });
802
803 let temp_storage = RowArena::new();
805 let mut datums_local = datums_key_1.borrow();
806 datums_local.extend(key.to_datum_iter());
807 let key_len = datums_local.len();
808 for datum in func
809 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
810 iter,
811 &temp_storage,
812 )
813 {
814 datums_local.truncate(key_len);
815 datums_local.push(datum);
816 if let Some(row) = evaluate_mfp_after(
817 &mfp_after1,
818 &mut datums_local,
819 &temp_storage,
820 key_len,
821 ) {
822 target.push((row, Diff::ONE));
823 }
824 }
825 }
826 })
827 };
828
829 let must_validate = validating && err_output.is_none();
833 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
834 if must_validate || mfp_after2.is_some() {
835 let error_logger = self.error_logger();
836
837 let errs = if !fused_unnest_list {
838 arranged
839 .mz_reduce_abelian::<_, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
840 &format!("{name} Error Check"),
841 move |key, source, target| {
842 if must_validate {
846 for (value, count) in source.iter() {
847 if count.is_positive() {
848 continue;
849 }
850 let value = value.to_row();
851 let message = "Non-positive accumulation in ReduceInaccumulable";
852 error_logger
853 .log(message, &format!("value={value:?}, count={count}"));
854 target.push((EvalError::Internal(message.into()).into(), Diff::ONE));
855 return;
856 }
857 }
858
859 let Some(mfp) = &mfp_after2 else { return };
861 let iter = source.iter().flat_map(|&(mut v, ref w)| {
862 let count = usize::try_from(w.into_inner()).unwrap_or(0);
863 std::iter::repeat(v.next().unwrap()).take(count)
866 });
867
868 let temp_storage = RowArena::new();
869 let datum_iter = key.to_datum_iter();
870 let mut datums_local = datums2.borrow();
871 datums_local.extend(datum_iter);
872 datums_local.push(
873 func2.eval_with_fast_window_agg::<_, window_agg_helpers::OneByOneAggrImpls>(
874 iter,
875 &temp_storage,
876 ),
877 );
878 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
879 {
880 target.push((e.into(), Diff::ONE));
881 }
882 },
883 )
884 .as_collection(|_, v| v.clone())
885 } else {
886 assert!(!must_validate);
888 let Some(mfp) = mfp_after2 else {
891 unreachable!()
892 };
893 arranged
894 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
895 &format!("{name} Error Check"),
896 move |key, source, target| {
897 let iter = source.iter().flat_map(|&(mut v, ref w)| {
898 let count = usize::try_from(w.into_inner()).unwrap_or(0);
899 std::iter::repeat(v.next().unwrap()).take(count)
902 });
903
904 let temp_storage = RowArena::new();
905 let mut datums_local = datums_key_2.borrow();
906 datums_local.extend(key.to_datum_iter());
907 let key_len = datums_local.len();
908 for datum in func2
909 .eval_with_unnest_list::<_, window_agg_helpers::OneByOneAggrImpls>(
910 iter,
911 &temp_storage,
912 )
913 {
914 datums_local.truncate(key_len);
915 datums_local.push(datum);
916 if let Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
919 {
920 target.push((e.into(), Diff::ONE));
921 }
922 }
923 },
924 )
925 .as_collection(|_, v| v.clone())
926 };
927
928 if let Some(e) = err_output {
929 err_output = Some(e.concat(&errs));
930 } else {
931 err_output = Some(errs);
932 }
933 }
934 (oks, err_output)
935 }
936
937 fn build_reduce_inaccumulable_distinct<S, Bu, Tr>(
938 &self,
939 input: Collection<S, Row, Diff>,
940 name_tag: Option<&str>,
941 ) -> Arranged<S, TraceAgent<Tr>>
942 where
943 S: Scope<Timestamp = G::Timestamp>,
944 Tr: for<'a> Trace<
945 Key<'a> = DatumSeq<'a>,
946 KeyOwn = Row,
947 Time = G::Timestamp,
948 Diff = Diff,
949 ValOwn: Data + MaybeValidatingRow<(), String>,
950 > + 'static,
951 Bu: Builder<
952 Time = G::Timestamp,
953 Input: Container + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
954 Output = Tr::Batch,
955 >,
956 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
957 {
958 let error_logger = self.error_logger();
959
960 let output_name = format!(
961 "ReduceInaccumulable Distinct{}",
962 name_tag.unwrap_or_default()
963 );
964
965 let input: KeyCollection<_, _, _> = input.into();
966 input
967 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
968 "Arranged ReduceInaccumulable Distinct [val: empty]",
969 )
970 .mz_reduce_abelian::<_, Bu, Tr>(&output_name, move |_, source, t| {
971 if let Some(err) = Tr::ValOwn::into_error() {
972 for (value, count) in source.iter() {
973 if count.is_positive() {
974 continue;
975 }
976
977 let message = "Non-positive accumulation in ReduceInaccumulable DISTINCT";
978 error_logger.log(message, &format!("value={value:?}, count={count}"));
979 t.push((err(message.to_string()), Diff::ONE));
980 return;
981 }
982 }
983 t.push((Tr::ValOwn::ok(()), Diff::ONE))
984 })
985 }
986
987 fn build_bucketed<S>(
1005 &self,
1006 input: Collection<S, (Row, Row), Diff>,
1007 BucketedPlan {
1008 aggr_funcs,
1009 skips,
1010 buckets,
1011 }: BucketedPlan,
1012 key_arity: usize,
1013 mfp_after: Option<SafeMfpPlan>,
1014 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1015 where
1016 S: Scope<Timestamp = G::Timestamp>,
1017 {
1018 let mut err_output: Option<Collection<S, _, _>> = None;
1019 let arranged_output = input.scope().region_named("ReduceHierarchical", |inner| {
1020 let input = input.enter(inner);
1021
1022 let first_mod = buckets.get(0).copied().unwrap_or(1);
1024
1025 let mut stage = input.map(move |(key, row)| {
1027 let mut row_builder = SharedRow::get();
1028 let mut row_packer = row_builder.packer();
1029 let mut row_iter = row.iter();
1030 for skip in skips.iter() {
1031 row_packer.push(row_iter.nth(*skip).unwrap());
1032 }
1033 let values = row_builder.clone();
1034
1035 let hash = values.hashed() % first_mod;
1037 let hash_key =
1038 row_builder.pack_using(std::iter::once(Datum::from(hash)).chain(&key));
1039 (hash_key, values)
1040 });
1041
1042 for (index, b) in buckets.into_iter().enumerate() {
1044 let input = if index == 0 {
1046 stage
1047 } else {
1048 stage.map(move |(hash_key, values)| {
1049 let mut hash_key_iter = hash_key.iter();
1050 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % b;
1051 let hash_key = SharedRow::pack(
1053 std::iter::once(Datum::from(hash)).chain(hash_key_iter.take(key_arity)),
1054 );
1055 (hash_key, values)
1056 })
1057 };
1058
1059 let validating = err_output.is_none();
1063
1064 let (oks, errs) = self.build_bucketed_stage(&aggr_funcs, &input, validating);
1065 if let Some(errs) = errs {
1066 err_output = Some(errs.leave_region());
1067 }
1068 stage = oks
1069 }
1070
1071 let partial = stage.map(move |(hash_key, values)| {
1073 let mut hash_key_iter = hash_key.iter();
1074 let _hash = hash_key_iter.next();
1075 (SharedRow::pack(hash_key_iter.take(key_arity)), values)
1076 });
1077
1078 let mut datums1 = DatumVec::new();
1080 let mut datums2 = DatumVec::new();
1081 let mfp_after1 = mfp_after.clone();
1082 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1083 let aggr_funcs2 = aggr_funcs.clone();
1084
1085 let error_logger = self.error_logger();
1088 let arranged = partial
1091 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1092 "Arrange ReduceMinsMaxes",
1093 );
1094 let must_validate = err_output.is_none();
1098 if must_validate || mfp_after2.is_some() {
1099 let errs = arranged
1100 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1101 "ReduceMinsMaxes Error Check",
1102 move |key, source, target| {
1103 if must_validate {
1107 for (val, count) in source.iter() {
1108 if count.is_positive() {
1109 continue;
1110 }
1111 let val = val.to_row();
1112 let message = "Non-positive accumulation in ReduceMinsMaxes";
1113 error_logger
1114 .log(message, &format!("val={val:?}, count={count}"));
1115 target.push((
1116 EvalError::Internal(message.into()).into(),
1117 Diff::ONE,
1118 ));
1119 return;
1120 }
1121 }
1122
1123 let Some(mfp) = &mfp_after2 else { return };
1125 let temp_storage = RowArena::new();
1126 let datum_iter = key.to_datum_iter();
1127 let mut datums_local = datums2.borrow();
1128 datums_local.extend(datum_iter);
1129
1130 let mut source_iters = source
1131 .iter()
1132 .map(|(values, _cnt)| *values)
1133 .collect::<Vec<_>>();
1134 for func in aggr_funcs2.iter() {
1135 let column_iter = (0..source_iters.len())
1136 .map(|i| source_iters[i].next().unwrap());
1137 datums_local.push(func.eval(column_iter, &temp_storage));
1138 }
1139 if let Result::Err(e) =
1140 mfp.evaluate_inner(&mut datums_local, &temp_storage)
1141 {
1142 target.push((e.into(), Diff::ONE));
1143 }
1144 },
1145 )
1146 .as_collection(|_, v| v.clone())
1147 .leave_region();
1148 if let Some(e) = &err_output {
1149 err_output = Some(e.concat(&errs));
1150 } else {
1151 err_output = Some(errs);
1152 }
1153 }
1154 arranged
1155 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1156 "ReduceMinsMaxes",
1157 move |key, source, target| {
1158 let temp_storage = RowArena::new();
1159 let datum_iter = key.to_datum_iter();
1160 let mut datums_local = datums1.borrow();
1161 datums_local.extend(datum_iter);
1162 let key_len = datums_local.len();
1163
1164 let mut source_iters = source
1165 .iter()
1166 .map(|(values, _cnt)| *values)
1167 .collect::<Vec<_>>();
1168 for func in aggr_funcs.iter() {
1169 let column_iter =
1170 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1171 datums_local.push(func.eval(column_iter, &temp_storage));
1172 }
1173
1174 if let Some(row) = evaluate_mfp_after(
1175 &mfp_after1,
1176 &mut datums_local,
1177 &temp_storage,
1178 key_len,
1179 ) {
1180 target.push((row, Diff::ONE));
1181 }
1182 },
1183 )
1184 .leave_region()
1185 });
1186 (
1187 arranged_output,
1188 err_output.expect("expected to validate in one level of the hierarchy"),
1189 )
1190 }
1191
1192 fn build_bucketed_stage<S>(
1199 &self,
1200 aggr_funcs: &Vec<AggregateFunc>,
1201 input: &Collection<S, (Row, Row), Diff>,
1202 validating: bool,
1203 ) -> (
1204 Collection<S, (Row, Row), Diff>,
1205 Option<Collection<S, DataflowError, Diff>>,
1206 )
1207 where
1208 S: Scope<Timestamp = G::Timestamp>,
1209 {
1210 let (input, negated_output, errs) = if validating {
1211 let (input, reduced) = self
1212 .build_bucketed_negated_output::<_, RowValBuilder<_,_,_>, RowValSpine<Result<Row, Row>, _, _>>(
1213 input,
1214 aggr_funcs.clone(),
1215 );
1216 let (oks, errs) = reduced
1217 .as_collection(|k, v| (k.to_row(), v.clone()))
1218 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
1219 "Checked Invalid Accumulations",
1220 |(hash_key, result)| match result {
1221 Err(hash_key) => {
1222 let mut hash_key_iter = hash_key.iter();
1223 let _hash = hash_key_iter.next();
1224 let key = SharedRow::pack(hash_key_iter);
1225 let message = format!(
1226 "Invalid data in source, saw non-positive accumulation \
1227 for key {key:?} in hierarchical mins-maxes aggregate"
1228 );
1229 Err(EvalError::Internal(message.into()).into())
1230 }
1231 Ok(values) => Ok((hash_key, values)),
1232 },
1233 );
1234 (input, oks, Some(errs))
1235 } else {
1236 let (input, reduced) = self
1237 .build_bucketed_negated_output::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1238 input,
1239 aggr_funcs.clone(),
1240 );
1241 let oks = reduced.as_collection(|k, v| (k.to_row(), v.to_row()));
1244 (input, oks, None)
1245 };
1246
1247 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
1248 let oks = negated_output.concat(&input);
1249 (oks, errs)
1250 }
1251
1252 fn build_bucketed_negated_output<S, Bu, Tr>(
1256 &self,
1257 input: &Collection<S, (Row, Row), Diff>,
1258 aggrs: Vec<AggregateFunc>,
1259 ) -> (
1260 Arranged<S, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
1261 Arranged<S, TraceAgent<Tr>>,
1262 )
1263 where
1264 S: Scope<Timestamp = G::Timestamp>,
1265 Tr: for<'a> Trace<
1266 Key<'a> = DatumSeq<'a>,
1267 KeyOwn = Row,
1268 ValOwn: Data + MaybeValidatingRow<Row, Row>,
1269 Time = G::Timestamp,
1270 Diff = Diff,
1271 > + 'static,
1272 Bu: Builder<
1273 Time = G::Timestamp,
1274 Input: Container + PushInto<((Row, Tr::ValOwn), Tr::Time, Tr::Diff)>,
1275 Output = Tr::Batch,
1276 >,
1277 Arranged<S, TraceAgent<Tr>>: ArrangementSize,
1278 {
1279 let error_logger = self.error_logger();
1280 let arranged_input = input
1283 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1284 "Arranged MinsMaxesHierarchical input",
1285 );
1286
1287 let reduced = arranged_input.mz_reduce_abelian::<_, Bu, Tr>(
1288 "Reduced Fallibly MinsMaxesHierarchical",
1289 move |key, source, target| {
1290 if let Some(err) = Tr::ValOwn::into_error() {
1291 for (value, count) in source.iter() {
1293 if count.is_positive() {
1294 continue;
1295 }
1296 error_logger.log(
1297 "Non-positive accumulation in MinsMaxesHierarchical",
1298 &format!("key={key:?}, value={value:?}, count={count}"),
1299 );
1300 target.push((err(Tr::owned_key(key)), Diff::ONE));
1303 return;
1304 }
1305 }
1306
1307 let mut row_builder = SharedRow::get();
1308 let mut row_packer = row_builder.packer();
1309
1310 let mut source_iters = source
1311 .iter()
1312 .map(|(values, _cnt)| *values)
1313 .collect::<Vec<_>>();
1314 for func in aggrs.iter() {
1315 let column_iter =
1316 (0..source_iters.len()).map(|i| source_iters[i].next().unwrap());
1317 row_packer.push(func.eval(column_iter, &RowArena::new()));
1318 }
1319 target.reserve(source.len().saturating_add(1));
1325 target.push((Tr::ValOwn::ok(row_builder.clone()), Diff::MINUS_ONE));
1326 target.extend(source.iter().map(|(values, cnt)| {
1327 let mut cnt = *cnt;
1328 cnt.negate();
1329 (Tr::ValOwn::ok(values.to_row()), cnt)
1330 }));
1331 },
1332 );
1333 (arranged_input, reduced)
1334 }
1335
1336 fn build_monotonic<S>(
1339 &self,
1340 collection: Collection<S, (Row, Row), Diff>,
1341 MonotonicPlan {
1342 aggr_funcs,
1343 skips,
1344 must_consolidate,
1345 }: MonotonicPlan,
1346 mfp_after: Option<SafeMfpPlan>,
1347 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1348 where
1349 S: Scope<Timestamp = G::Timestamp>,
1350 {
1351 let collection = collection
1353 .map(move |(key, row)| {
1354 let mut row_builder = SharedRow::get();
1355 let mut values = Vec::with_capacity(skips.len());
1356 let mut row_iter = row.iter();
1357 for skip in skips.iter() {
1358 values.push(
1359 row_builder.pack_using(std::iter::once(row_iter.nth(*skip).unwrap())),
1360 );
1361 }
1362
1363 (key, values)
1364 })
1365 .consolidate_named_if::<KeyBatcher<_, _, _>>(
1366 must_consolidate,
1367 "Consolidated ReduceMonotonic input",
1368 );
1369
1370 let error_logger = self.error_logger();
1372 let (partial, validation_errs) = collection.ensure_monotonic(move |data, diff| {
1373 error_logger.log(
1374 "Non-monotonic input to ReduceMonotonic",
1375 &format!("data={data:?}, diff={diff}"),
1376 );
1377 let m = "tried to build a monotonic reduction on non-monotonic input".into();
1378 (EvalError::Internal(m).into(), Diff::ONE)
1379 });
1380 let partial = partial.explode_one(move |(key, values)| {
1384 let mut output = Vec::new();
1385 for (row, func) in values.into_iter().zip_eq(aggr_funcs.iter()) {
1386 output.push(monoids::get_monoid(row, func).expect(
1387 "hierarchical aggregations are expected to have monoid implementations",
1388 ));
1389 }
1390 (key, output)
1391 });
1392
1393 let mut datums1 = DatumVec::new();
1395 let mut datums2 = DatumVec::new();
1396 let mfp_after1 = mfp_after.clone();
1397 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1398
1399 let partial: KeyCollection<_, _, _> = partial.into();
1400 let arranged = partial
1401 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, Vec<ReductionMonoid>>>(
1402 "ArrangeMonotonic [val: empty]",
1403 );
1404 let output = arranged.mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
1405 "ReduceMonotonic",
1406 {
1407 move |key, input, output| {
1408 let temp_storage = RowArena::new();
1409 let datum_iter = key.to_datum_iter();
1410 let mut datums_local = datums1.borrow();
1411 datums_local.extend(datum_iter);
1412 let key_len = datums_local.len();
1413 let accum = &input[0].1;
1414 for monoid in accum.iter() {
1415 datums_local.extend(monoid.finalize().iter());
1416 }
1417
1418 if let Some(row) =
1419 evaluate_mfp_after(&mfp_after1, &mut datums_local, &temp_storage, key_len)
1420 {
1421 output.push((row, Diff::ONE));
1422 }
1423 }
1424 },
1425 );
1426
1427 if let Some(mfp) = mfp_after2 {
1432 let mfp_errs = arranged
1433 .mz_reduce_abelian::<_, RowErrBuilder<_, _>, RowErrSpine<_, _>>(
1434 "ReduceMonotonic Error Check",
1435 move |key, input, output| {
1436 let temp_storage = RowArena::new();
1437 let datum_iter = key.to_datum_iter();
1438 let mut datums_local = datums2.borrow();
1439 datums_local.extend(datum_iter);
1440 let accum = &input[0].1;
1441 for monoid in accum.iter() {
1442 datums_local.extend(monoid.finalize().iter());
1443 }
1444 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage)
1445 {
1446 output.push((e.into(), Diff::ONE));
1447 }
1448 },
1449 )
1450 .as_collection(|_k, v| v.clone());
1451 (output, validation_errs.concat(&mfp_errs))
1452 } else {
1453 (output, validation_errs)
1454 }
1455 }
1456
1457 fn build_accumulable<S>(
1464 &self,
1465 collection: Collection<S, (Row, Row), Diff>,
1466 AccumulablePlan {
1467 full_aggrs,
1468 simple_aggrs,
1469 distinct_aggrs,
1470 }: AccumulablePlan,
1471 key_arity: usize,
1472 mfp_after: Option<SafeMfpPlan>,
1473 ) -> (RowRowArrangement<S>, Collection<S, DataflowError, Diff>)
1474 where
1475 S: Scope<Timestamp = G::Timestamp>,
1476 {
1477 if full_aggrs.len() == 0 || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
1479 self.error_logger().soft_panic_or_log(
1480 "Incorrect numbers of aggregates in accummulable reduction rendering",
1481 &format!(
1482 "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
1483 full_aggrs.len(),
1484 simple_aggrs.len(),
1485 distinct_aggrs.len(),
1486 ),
1487 );
1488 }
1489
1490 let zero_diffs: (Vec<_>, Diff) = (
1502 full_aggrs
1503 .iter()
1504 .map(|f| accumulable_zero(&f.func))
1505 .collect(),
1506 Diff::ZERO,
1507 );
1508
1509 let mut to_aggregate = Vec::new();
1510 if simple_aggrs.len() > 0 {
1511 let easy_cases = collection.explode_one({
1513 let zero_diffs = zero_diffs.clone();
1514 move |(key, row)| {
1515 let mut diffs = zero_diffs.clone();
1516 let mut row_iter = row.iter().enumerate();
1522 for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
1523 let mut datum = row_iter.next().unwrap();
1524 while datum_index != &datum.0 {
1525 datum = row_iter.next().unwrap();
1526 }
1527 let datum = datum.1;
1528 diffs.0[*accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1529 diffs.1 = Diff::ONE;
1530 }
1531 ((key, ()), diffs)
1532 }
1533 });
1534 to_aggregate.push(easy_cases);
1535 }
1536
1537 for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
1539 let pairer = Pairer::new(key_arity);
1540 let collection = collection
1541 .map(move |(key, row)| {
1542 let value = row.iter().nth(datum_index).unwrap();
1543 (pairer.merge(&key, std::iter::once(value)), ())
1544 })
1545 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
1546 "Arranged Accumulable Distinct [val: empty]",
1547 )
1548 .mz_reduce_abelian::<_, RowBuilder<_, _>, RowSpine<_, _>>(
1549 "Reduced Accumulable Distinct [val: empty]",
1550 move |_k, _s, t| t.push(((), Diff::ONE)),
1551 )
1552 .as_collection(move |key_val_iter, _| pairer.split(key_val_iter))
1553 .explode_one({
1554 let zero_diffs = zero_diffs.clone();
1555 move |(key, row)| {
1556 let datum = row.iter().next().unwrap();
1557 let mut diffs = zero_diffs.clone();
1558 diffs.0[accumulable_index] = datum_to_accumulator(&aggr.func, datum);
1559 diffs.1 = Diff::ONE;
1560 ((key, ()), diffs)
1561 }
1562 });
1563 to_aggregate.push(collection);
1564 }
1565
1566 let collection = if to_aggregate.len() == 1 {
1568 to_aggregate.remove(0)
1569 } else {
1570 differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
1571 };
1572
1573 let mut datums1 = DatumVec::new();
1575 let mut datums2 = DatumVec::new();
1576 let mfp_after1 = mfp_after.clone();
1577 let mfp_after2 = mfp_after.filter(|mfp| mfp.could_error());
1578 let full_aggrs2 = full_aggrs.clone();
1579
1580 let error_logger = self.error_logger();
1581 let err_full_aggrs = full_aggrs.clone();
1582 let (arranged_output, arranged_errs) = collection
1583 .mz_arrange::<RowBatcher<_,_>, RowBuilder<_,_>, RowSpine<_, (Vec<Accum>, Diff)>>("ArrangeAccumulable [val: empty]")
1584 .reduce_pair::<_, RowRowBuilder<_,_>, RowRowSpine<_, _>, _, RowErrBuilder<_,_>, RowErrSpine<_, _>>(
1585 "ReduceAccumulable",
1586 "AccumulableErrorCheck",
1587 {
1588 move |key, input, output| {
1589 let (ref accums, total) = input[0].1;
1590
1591 let temp_storage = RowArena::new();
1592 let datum_iter = key.to_datum_iter();
1593 let mut datums_local = datums1.borrow();
1594 datums_local.extend(datum_iter);
1595 let key_len = datums_local.len();
1596 for (aggr, accum) in full_aggrs.iter().zip_eq(accums) {
1597 datums_local.push(finalize_accum(&aggr.func, accum, total));
1598 }
1599
1600 if let Some(row) = evaluate_mfp_after(
1601 &mfp_after1,
1602 &mut datums_local,
1603 &temp_storage,
1604 key_len,
1605 ) {
1606 output.push((row, Diff::ONE));
1607 }
1608 }
1609 },
1610 move |key, input, output| {
1611 let (ref accums, total) = input[0].1;
1612 for (aggr, accum) in err_full_aggrs.iter().zip_eq(accums) {
1613 if total == Diff::ZERO && !accum.is_zero() {
1616 error_logger.log(
1617 "Net-zero records with non-zero accumulation in ReduceAccumulable",
1618 &format!("aggr={aggr:?}, accum={accum:?}"),
1619 );
1620 let key = key.to_row();
1621 let message = format!(
1622 "Invalid data in source, saw net-zero records for key {key} \
1623 with non-zero accumulation in accumulable aggregate"
1624 );
1625 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1626 }
1627 match (&aggr.func, &accum) {
1628 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1629 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
1630 | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1631 if accum.is_negative() {
1632 error_logger.log(
1633 "Invalid negative unsigned aggregation in ReduceAccumulable",
1634 &format!("aggr={aggr:?}, accum={accum:?}"),
1635 );
1636 let key = key.to_row();
1637 let message = format!(
1638 "Invalid data in source, saw negative accumulation with \
1639 unsigned type for key {key}"
1640 );
1641 output.push((EvalError::Internal(message.into()).into(), Diff::ONE));
1642 }
1643 }
1644 _ => (), }
1646 }
1647
1648 let Some(mfp) = &mfp_after2 else { return };
1650 let temp_storage = RowArena::new();
1651 let datum_iter = key.to_datum_iter();
1652 let mut datums_local = datums2.borrow();
1653 datums_local.extend(datum_iter);
1654 for (aggr, accum) in full_aggrs2.iter().zip_eq(accums) {
1655 datums_local.push(finalize_accum(&aggr.func, accum, total));
1656 }
1657
1658 if let Result::Err(e) = mfp.evaluate_inner(&mut datums_local, &temp_storage) {
1659 output.push((e.into(), Diff::ONE));
1660 }
1661 },
1662 );
1663 (
1664 arranged_output,
1665 arranged_errs.as_collection(|_key, error| error.clone()),
1666 )
1667 }
1668}
1669
1670fn evaluate_mfp_after<'a, 'b>(
1674 mfp_after: &'a Option<SafeMfpPlan>,
1675 datums_local: &'b mut mz_repr::DatumVecBorrow<'a>,
1676 temp_storage: &'a RowArena,
1677 key_len: usize,
1678) -> Option<Row> {
1679 let mut row_builder = SharedRow::get();
1680 if let Some(mfp) = mfp_after {
1683 if let Ok(Some(iter)) = mfp.evaluate_iter(datums_local, temp_storage) {
1686 Some(row_builder.pack_using(iter.skip(key_len)))
1689 } else {
1690 None
1691 }
1692 } else {
1693 Some(row_builder.pack_using(&datums_local[key_len..]))
1694 }
1695}
1696
1697fn accumulable_zero(aggr_func: &AggregateFunc) -> Accum {
1698 match aggr_func {
1699 AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
1700 trues: Diff::ZERO,
1701 falses: Diff::ZERO,
1702 },
1703 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
1704 accum: AccumCount::ZERO,
1705 pos_infs: Diff::ZERO,
1706 neg_infs: Diff::ZERO,
1707 nans: Diff::ZERO,
1708 non_nulls: Diff::ZERO,
1709 },
1710 AggregateFunc::SumNumeric => Accum::Numeric {
1711 accum: OrderedDecimal(NumericAgg::zero()),
1712 pos_infs: Diff::ZERO,
1713 neg_infs: Diff::ZERO,
1714 nans: Diff::ZERO,
1715 non_nulls: Diff::ZERO,
1716 },
1717 _ => Accum::SimpleNumber {
1718 accum: AccumCount::ZERO,
1719 non_nulls: Diff::ZERO,
1720 },
1721 }
1722}
1723
1724static FLOAT_SCALE: LazyLock<f64> = LazyLock::new(|| f64::from(1 << 24));
1725
1726fn datum_to_accumulator(aggregate_func: &AggregateFunc, datum: Datum) -> Accum {
1727 match aggregate_func {
1728 AggregateFunc::Count => Accum::SimpleNumber {
1729 accum: AccumCount::ZERO, non_nulls: if datum.is_null() {
1731 Diff::ZERO
1732 } else {
1733 Diff::ONE
1734 },
1735 },
1736 AggregateFunc::Any | AggregateFunc::All => match datum {
1737 Datum::True => Accum::Bool {
1738 trues: Diff::ONE,
1739 falses: Diff::ZERO,
1740 },
1741 Datum::Null => Accum::Bool {
1742 trues: Diff::ZERO,
1743 falses: Diff::ZERO,
1744 },
1745 Datum::False => Accum::Bool {
1746 trues: Diff::ZERO,
1747 falses: Diff::ONE,
1748 },
1749 x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
1750 },
1751 AggregateFunc::Dummy => match datum {
1752 Datum::Dummy => Accum::SimpleNumber {
1753 accum: AccumCount::ZERO,
1754 non_nulls: Diff::ZERO,
1755 },
1756 x => panic!("Invalid argument to AggregateFunc::Dummy: {x:?}"),
1757 },
1758 AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
1759 let n = match datum {
1760 Datum::Float32(n) => f64::from(*n),
1761 Datum::Float64(n) => *n,
1762 Datum::Null => 0f64,
1763 x => panic!("Invalid argument to AggregateFunc::{aggregate_func:?}: {x:?}"),
1764 };
1765
1766 let nans = Diff::from(n.is_nan());
1767 let pos_infs = Diff::from(n == f64::INFINITY);
1768 let neg_infs = Diff::from(n == f64::NEG_INFINITY);
1769 let non_nulls = Diff::from(datum != Datum::Null);
1770
1771 let accum = if nans.is_positive() || pos_infs.is_positive() || neg_infs.is_positive() {
1774 AccumCount::ZERO
1775 } else {
1776 #[allow(clippy::as_conversions)]
1779 { (n * *FLOAT_SCALE) as i128 }.into()
1780 };
1781
1782 Accum::Float {
1783 accum,
1784 pos_infs,
1785 neg_infs,
1786 nans,
1787 non_nulls,
1788 }
1789 }
1790 AggregateFunc::SumNumeric => match datum {
1791 Datum::Numeric(n) => {
1792 let (accum, pos_infs, neg_infs, nans) = if n.0.is_infinite() {
1793 if n.0.is_negative() {
1794 (NumericAgg::zero(), Diff::ZERO, Diff::ONE, Diff::ZERO)
1795 } else {
1796 (NumericAgg::zero(), Diff::ONE, Diff::ZERO, Diff::ZERO)
1797 }
1798 } else if n.0.is_nan() {
1799 (NumericAgg::zero(), Diff::ZERO, Diff::ZERO, Diff::ONE)
1800 } else {
1801 let mut cx_agg = numeric::cx_agg();
1804 (cx_agg.to_width(n.0), Diff::ZERO, Diff::ZERO, Diff::ZERO)
1805 };
1806
1807 Accum::Numeric {
1808 accum: OrderedDecimal(accum),
1809 pos_infs,
1810 neg_infs,
1811 nans,
1812 non_nulls: Diff::ONE,
1813 }
1814 }
1815 Datum::Null => Accum::Numeric {
1816 accum: OrderedDecimal(NumericAgg::zero()),
1817 pos_infs: Diff::ZERO,
1818 neg_infs: Diff::ZERO,
1819 nans: Diff::ZERO,
1820 non_nulls: Diff::ZERO,
1821 },
1822 x => panic!("Invalid argument to AggregateFunc::SumNumeric: {x:?}"),
1823 },
1824 _ => {
1825 match datum {
1829 Datum::Int16(i) => Accum::SimpleNumber {
1830 accum: i.into(),
1831 non_nulls: Diff::ONE,
1832 },
1833 Datum::Int32(i) => Accum::SimpleNumber {
1834 accum: i.into(),
1835 non_nulls: Diff::ONE,
1836 },
1837 Datum::Int64(i) => Accum::SimpleNumber {
1838 accum: i.into(),
1839 non_nulls: Diff::ONE,
1840 },
1841 Datum::UInt16(u) => Accum::SimpleNumber {
1842 accum: u.into(),
1843 non_nulls: Diff::ONE,
1844 },
1845 Datum::UInt32(u) => Accum::SimpleNumber {
1846 accum: u.into(),
1847 non_nulls: Diff::ONE,
1848 },
1849 Datum::UInt64(u) => Accum::SimpleNumber {
1850 accum: u.into(),
1851 non_nulls: Diff::ONE,
1852 },
1853 Datum::MzTimestamp(t) => Accum::SimpleNumber {
1854 accum: u64::from(t).into(),
1855 non_nulls: Diff::ONE,
1856 },
1857 Datum::Null => Accum::SimpleNumber {
1858 accum: AccumCount::ZERO,
1859 non_nulls: Diff::ZERO,
1860 },
1861 x => panic!("Accumulating non-integer data: {x:?}"),
1862 }
1863 }
1864 }
1865}
1866
1867fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Diff) -> Datum<'a> {
1868 if total.is_positive() && accum.is_zero() && *aggr_func != AggregateFunc::Count {
1872 Datum::Null
1873 } else {
1874 match (&aggr_func, &accum) {
1875 (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
1876 Datum::Int64(non_nulls.into_inner())
1877 }
1878 (AggregateFunc::All, Accum::Bool { falses, trues }) => {
1879 if falses.is_positive() {
1881 Datum::False
1882 } else if *trues == total {
1883 Datum::True
1884 } else {
1885 Datum::Null
1886 }
1887 }
1888 (AggregateFunc::Any, Accum::Bool { falses, trues }) => {
1889 if trues.is_positive() {
1891 Datum::True
1892 } else if *falses == total {
1893 Datum::False
1894 } else {
1895 Datum::Null
1896 }
1897 }
1898 (AggregateFunc::Dummy, _) => Datum::Dummy,
1899 (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
1901 | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
1902 #[allow(clippy::as_conversions)]
1907 Datum::Int64(accum.into_inner() as i64)
1908 }
1909 (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => Datum::from(*accum),
1910 (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
1911 | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
1912 if !accum.is_negative() {
1913 #[allow(clippy::as_conversions)]
1919 Datum::UInt64(accum.into_inner() as u64)
1920 } else {
1921 Datum::Null
1925 }
1926 }
1927 (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
1928 if !accum.is_negative() {
1929 Datum::from(*accum)
1930 } else {
1931 Datum::Null
1935 }
1936 }
1937 (
1938 AggregateFunc::SumFloat32,
1939 Accum::Float {
1940 accum,
1941 pos_infs,
1942 neg_infs,
1943 nans,
1944 non_nulls: _,
1945 },
1946 ) => {
1947 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1948 Datum::from(f32::NAN)
1951 } else if pos_infs.is_positive() {
1952 Datum::from(f32::INFINITY)
1953 } else if neg_infs.is_positive() {
1954 Datum::from(f32::NEG_INFINITY)
1955 } else {
1956 #[allow(clippy::as_conversions)]
1958 {
1959 Datum::from(((accum.into_inner() as f64) / *FLOAT_SCALE) as f32)
1960 }
1961 }
1962 }
1963 (
1964 AggregateFunc::SumFloat64,
1965 Accum::Float {
1966 accum,
1967 pos_infs,
1968 neg_infs,
1969 nans,
1970 non_nulls: _,
1971 },
1972 ) => {
1973 if nans.is_positive() || (pos_infs.is_positive() && neg_infs.is_positive()) {
1974 Datum::from(f64::NAN)
1977 } else if pos_infs.is_positive() {
1978 Datum::from(f64::INFINITY)
1979 } else if neg_infs.is_positive() {
1980 Datum::from(f64::NEG_INFINITY)
1981 } else {
1982 #[allow(clippy::as_conversions)]
1984 {
1985 Datum::from((accum.into_inner() as f64) / *FLOAT_SCALE)
1986 }
1987 }
1988 }
1989 (
1990 AggregateFunc::SumNumeric,
1991 Accum::Numeric {
1992 accum,
1993 pos_infs,
1994 neg_infs,
1995 nans,
1996 non_nulls: _,
1997 },
1998 ) => {
1999 let mut cx_datum = numeric::cx_datum();
2000 let d = cx_datum.to_width(accum.0);
2001 let inf_d = d.is_infinite();
2007 let neg_d = d.is_negative();
2008 let pos_inf = pos_infs.is_positive() || (inf_d && !neg_d);
2009 let neg_inf = neg_infs.is_positive() || (inf_d && neg_d);
2010 if nans.is_positive() || (pos_inf && neg_inf) {
2011 Datum::from(Numeric::nan())
2014 } else if pos_inf {
2015 Datum::from(Numeric::infinity())
2016 } else if neg_inf {
2017 let mut cx = numeric::cx_datum();
2018 let mut d = Numeric::infinity();
2019 cx.neg(&mut d);
2020 Datum::from(d)
2021 } else {
2022 Datum::from(d)
2023 }
2024 }
2025 _ => panic!(
2026 "Unexpected accumulation (aggr={:?}, accum={accum:?})",
2027 aggr_func
2028 ),
2029 }
2030 }
2031}
2032
2033type AccumCount = mz_ore::Overflowing<i128>;
2035
2036#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2047enum Accum {
2048 Bool {
2050 trues: Diff,
2052 falses: Diff,
2054 },
2055 SimpleNumber {
2057 accum: AccumCount,
2059 non_nulls: Diff,
2061 },
2062 Float {
2064 accum: AccumCount,
2067 pos_infs: Diff,
2069 neg_infs: Diff,
2071 nans: Diff,
2073 non_nulls: Diff,
2075 },
2076 Numeric {
2078 accum: OrderedDecimal<NumericAgg>,
2080 pos_infs: Diff,
2082 neg_infs: Diff,
2084 nans: Diff,
2086 non_nulls: Diff,
2088 },
2089}
2090
2091impl IsZero for Accum {
2092 fn is_zero(&self) -> bool {
2093 match self {
2094 Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
2095 Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
2096 Accum::Float {
2097 accum,
2098 pos_infs,
2099 neg_infs,
2100 nans,
2101 non_nulls,
2102 } => {
2103 accum.is_zero()
2104 && pos_infs.is_zero()
2105 && neg_infs.is_zero()
2106 && nans.is_zero()
2107 && non_nulls.is_zero()
2108 }
2109 Accum::Numeric {
2110 accum,
2111 pos_infs,
2112 neg_infs,
2113 nans,
2114 non_nulls,
2115 } => {
2116 accum.0.is_zero()
2117 && pos_infs.is_zero()
2118 && neg_infs.is_zero()
2119 && nans.is_zero()
2120 && non_nulls.is_zero()
2121 }
2122 }
2123 }
2124}
2125
2126impl Semigroup for Accum {
2127 fn plus_equals(&mut self, other: &Accum) {
2128 match (&mut *self, other) {
2129 (
2130 Accum::Bool { trues, falses },
2131 Accum::Bool {
2132 trues: other_trues,
2133 falses: other_falses,
2134 },
2135 ) => {
2136 *trues += other_trues;
2137 *falses += other_falses;
2138 }
2139 (
2140 Accum::SimpleNumber { accum, non_nulls },
2141 Accum::SimpleNumber {
2142 accum: other_accum,
2143 non_nulls: other_non_nulls,
2144 },
2145 ) => {
2146 *accum += other_accum;
2147 *non_nulls += other_non_nulls;
2148 }
2149 (
2150 Accum::Float {
2151 accum,
2152 pos_infs,
2153 neg_infs,
2154 nans,
2155 non_nulls,
2156 },
2157 Accum::Float {
2158 accum: other_accum,
2159 pos_infs: other_pos_infs,
2160 neg_infs: other_neg_infs,
2161 nans: other_nans,
2162 non_nulls: other_non_nulls,
2163 },
2164 ) => {
2165 *accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
2166 warn!("Float accumulator overflow. Incorrect results possible");
2167 accum.wrapping_add(*other_accum)
2168 });
2169 *pos_infs += other_pos_infs;
2170 *neg_infs += other_neg_infs;
2171 *nans += other_nans;
2172 *non_nulls += other_non_nulls;
2173 }
2174 (
2175 Accum::Numeric {
2176 accum,
2177 pos_infs,
2178 neg_infs,
2179 nans,
2180 non_nulls,
2181 },
2182 Accum::Numeric {
2183 accum: other_accum,
2184 pos_infs: other_pos_infs,
2185 neg_infs: other_neg_infs,
2186 nans: other_nans,
2187 non_nulls: other_non_nulls,
2188 },
2189 ) => {
2190 let mut cx_agg = numeric::cx_agg();
2191 cx_agg.add(&mut accum.0, &other_accum.0);
2192 assert!(!cx_agg.status().rounded(), "Accum::Numeric overflow");
2198 cx_agg.reduce(&mut accum.0);
2217 *pos_infs += other_pos_infs;
2218 *neg_infs += other_neg_infs;
2219 *nans += other_nans;
2220 *non_nulls += other_non_nulls;
2221 }
2222 (l, r) => unreachable!(
2223 "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
2224 ),
2225 }
2226 }
2227}
2228
2229impl Multiply<Diff> for Accum {
2230 type Output = Accum;
2231
2232 fn multiply(self, factor: &Diff) -> Accum {
2233 let factor = *factor;
2234 match self {
2235 Accum::Bool { trues, falses } => Accum::Bool {
2236 trues: trues * factor,
2237 falses: falses * factor,
2238 },
2239 Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
2240 accum: accum * AccumCount::from(factor),
2241 non_nulls: non_nulls * factor,
2242 },
2243 Accum::Float {
2244 accum,
2245 pos_infs,
2246 neg_infs,
2247 nans,
2248 non_nulls,
2249 } => Accum::Float {
2250 accum: accum
2251 .checked_mul(AccumCount::from(factor))
2252 .unwrap_or_else(|| {
2253 warn!("Float accumulator overflow. Incorrect results possible");
2254 accum.wrapping_mul(AccumCount::from(factor))
2255 }),
2256 pos_infs: pos_infs * factor,
2257 neg_infs: neg_infs * factor,
2258 nans: nans * factor,
2259 non_nulls: non_nulls * factor,
2260 },
2261 Accum::Numeric {
2262 accum,
2263 pos_infs,
2264 neg_infs,
2265 nans,
2266 non_nulls,
2267 } => {
2268 let mut cx = numeric::cx_agg();
2269 let mut f = NumericAgg::from(factor.into_inner());
2270 cx.mul(&mut f, &accum.0);
2274 assert!(!cx.status().rounded(), "Accum::Numeric multiply overflow");
2280 Accum::Numeric {
2281 accum: OrderedDecimal(f),
2282 pos_infs: pos_infs * factor,
2283 neg_infs: neg_infs * factor,
2284 nans: nans * factor,
2285 non_nulls: non_nulls * factor,
2286 }
2287 }
2288 }
2289 }
2290}
2291
2292impl Columnation for Accum {
2293 type InnerRegion = CopyRegion<Self>;
2294}
2295
2296mod monoids {
2298
2299 use differential_dataflow::containers::{Columnation, Region};
2315 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
2316 use mz_expr::AggregateFunc;
2317 use mz_ore::soft_panic_or_log;
2318 use mz_repr::{Datum, Diff, Row};
2319 use serde::{Deserialize, Serialize};
2320
2321 #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
2323 pub enum ReductionMonoid {
2324 Min(Row),
2325 Max(Row),
2326 }
2327
2328 impl ReductionMonoid {
2329 pub fn finalize(&self) -> &Row {
2330 use ReductionMonoid::*;
2331 match self {
2332 Min(row) | Max(row) => row,
2333 }
2334 }
2335 }
2336
2337 impl Clone for ReductionMonoid {
2338 fn clone(&self) -> Self {
2339 use ReductionMonoid::*;
2340 match self {
2341 Min(row) => Min(row.clone()),
2342 Max(row) => Max(row.clone()),
2343 }
2344 }
2345
2346 fn clone_from(&mut self, source: &Self) {
2347 use ReductionMonoid::*;
2348
2349 let mut row = std::mem::take(match self {
2350 Min(row) | Max(row) => row,
2351 });
2352
2353 let source_row = match source {
2354 Min(row) | Max(row) => row,
2355 };
2356
2357 row.clone_from(source_row);
2358
2359 match source {
2360 Min(_) => *self = Min(row),
2361 Max(_) => *self = Max(row),
2362 }
2363 }
2364 }
2365
2366 impl Multiply<Diff> for ReductionMonoid {
2367 type Output = Self;
2368
2369 fn multiply(self, factor: &Diff) -> Self {
2370 assert!(factor.is_positive());
2375 self
2376 }
2377 }
2378
2379 impl Semigroup for ReductionMonoid {
2380 fn plus_equals(&mut self, rhs: &Self) {
2381 match (self, rhs) {
2382 (ReductionMonoid::Min(lhs), ReductionMonoid::Min(rhs)) => {
2383 let swap = {
2384 let lhs_val = lhs.unpack_first();
2385 let rhs_val = rhs.unpack_first();
2386 match (lhs_val, rhs_val) {
2388 (_, Datum::Null) => false,
2389 (Datum::Null, _) => true,
2390 (lhs, rhs) => rhs < lhs,
2391 }
2392 };
2393 if swap {
2394 lhs.clone_from(rhs);
2395 }
2396 }
2397 (ReductionMonoid::Max(lhs), ReductionMonoid::Max(rhs)) => {
2398 let swap = {
2399 let lhs_val = lhs.unpack_first();
2400 let rhs_val = rhs.unpack_first();
2401 match (lhs_val, rhs_val) {
2403 (_, Datum::Null) => false,
2404 (Datum::Null, _) => true,
2405 (lhs, rhs) => rhs > lhs,
2406 }
2407 };
2408 if swap {
2409 lhs.clone_from(rhs);
2410 }
2411 }
2412 (lhs, rhs) => {
2413 soft_panic_or_log!(
2414 "Mismatched monoid variants in reduction! lhs: {lhs:?} rhs: {rhs:?}"
2415 );
2416 }
2417 }
2418 }
2419 }
2420
2421 impl IsZero for ReductionMonoid {
2422 fn is_zero(&self) -> bool {
2423 false
2429 }
2430 }
2431
2432 impl Columnation for ReductionMonoid {
2433 type InnerRegion = ReductionMonoidRegion;
2434 }
2435
2436 #[derive(Default)]
2440 pub struct ReductionMonoidRegion {
2441 inner: <Row as Columnation>::InnerRegion,
2442 }
2443
2444 impl Region for ReductionMonoidRegion {
2445 type Item = ReductionMonoid;
2446
2447 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
2448 use ReductionMonoid::*;
2449 match item {
2450 Min(row) => Min(unsafe { self.inner.copy(row) }),
2451 Max(row) => Max(unsafe { self.inner.copy(row) }),
2452 }
2453 }
2454
2455 fn clear(&mut self) {
2456 self.inner.clear();
2457 }
2458
2459 fn reserve_items<'a, I>(&mut self, items: I)
2460 where
2461 Self: 'a,
2462 I: Iterator<Item = &'a Self::Item> + Clone,
2463 {
2464 self.inner
2465 .reserve_items(items.map(ReductionMonoid::finalize));
2466 }
2467
2468 fn reserve_regions<'a, I>(&mut self, regions: I)
2469 where
2470 Self: 'a,
2471 I: Iterator<Item = &'a Self> + Clone,
2472 {
2473 self.inner.reserve_regions(regions.map(|r| &r.inner));
2474 }
2475
2476 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
2477 self.inner.heap_size(callback);
2478 }
2479 }
2480
2481 pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
2484 match func {
2485 AggregateFunc::MaxNumeric
2486 | AggregateFunc::MaxInt16
2487 | AggregateFunc::MaxInt32
2488 | AggregateFunc::MaxInt64
2489 | AggregateFunc::MaxUInt16
2490 | AggregateFunc::MaxUInt32
2491 | AggregateFunc::MaxUInt64
2492 | AggregateFunc::MaxMzTimestamp
2493 | AggregateFunc::MaxFloat32
2494 | AggregateFunc::MaxFloat64
2495 | AggregateFunc::MaxBool
2496 | AggregateFunc::MaxString
2497 | AggregateFunc::MaxDate
2498 | AggregateFunc::MaxTimestamp
2499 | AggregateFunc::MaxTimestampTz
2500 | AggregateFunc::MaxInterval
2501 | AggregateFunc::MaxTime => Some(ReductionMonoid::Max(row)),
2502 AggregateFunc::MinNumeric
2503 | AggregateFunc::MinInt16
2504 | AggregateFunc::MinInt32
2505 | AggregateFunc::MinInt64
2506 | AggregateFunc::MinUInt16
2507 | AggregateFunc::MinUInt32
2508 | AggregateFunc::MinUInt64
2509 | AggregateFunc::MinMzTimestamp
2510 | AggregateFunc::MinFloat32
2511 | AggregateFunc::MinFloat64
2512 | AggregateFunc::MinBool
2513 | AggregateFunc::MinString
2514 | AggregateFunc::MinDate
2515 | AggregateFunc::MinTimestamp
2516 | AggregateFunc::MinTimestampTz
2517 | AggregateFunc::MinInterval
2518 | AggregateFunc::MinTime => Some(ReductionMonoid::Min(row)),
2519 AggregateFunc::SumInt16
2520 | AggregateFunc::SumInt32
2521 | AggregateFunc::SumInt64
2522 | AggregateFunc::SumUInt16
2523 | AggregateFunc::SumUInt32
2524 | AggregateFunc::SumUInt64
2525 | AggregateFunc::SumFloat32
2526 | AggregateFunc::SumFloat64
2527 | AggregateFunc::SumNumeric
2528 | AggregateFunc::Count
2529 | AggregateFunc::Any
2530 | AggregateFunc::All
2531 | AggregateFunc::Dummy
2532 | AggregateFunc::JsonbAgg { .. }
2533 | AggregateFunc::JsonbObjectAgg { .. }
2534 | AggregateFunc::MapAgg { .. }
2535 | AggregateFunc::ArrayConcat { .. }
2536 | AggregateFunc::ListConcat { .. }
2537 | AggregateFunc::StringAgg { .. }
2538 | AggregateFunc::RowNumber { .. }
2539 | AggregateFunc::Rank { .. }
2540 | AggregateFunc::DenseRank { .. }
2541 | AggregateFunc::LagLead { .. }
2542 | AggregateFunc::FirstValue { .. }
2543 | AggregateFunc::LastValue { .. }
2544 | AggregateFunc::WindowAggregate { .. }
2545 | AggregateFunc::FusedValueWindowFunc { .. }
2546 | AggregateFunc::FusedWindowAggregate { .. } => None,
2547 }
2548 }
2549}
2550
2551mod window_agg_helpers {
2552 use crate::render::reduce::*;
2553
2554 pub enum OneByOneAggrImpls {
2559 Accumulable(AccumulableOneByOneAggr),
2560 Hierarchical(HierarchicalOneByOneAggr),
2561 Basic(mz_expr::NaiveOneByOneAggr),
2562 }
2563
2564 impl mz_expr::OneByOneAggr for OneByOneAggrImpls {
2565 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
2566 match reduction_type(agg) {
2567 ReductionType::Basic => {
2568 OneByOneAggrImpls::Basic(mz_expr::NaiveOneByOneAggr::new(agg, reverse))
2569 }
2570 ReductionType::Accumulable => {
2571 OneByOneAggrImpls::Accumulable(AccumulableOneByOneAggr::new(agg))
2572 }
2573 ReductionType::Hierarchical => {
2574 OneByOneAggrImpls::Hierarchical(HierarchicalOneByOneAggr::new(agg))
2575 }
2576 }
2577 }
2578
2579 fn give(&mut self, d: &Datum) {
2580 match self {
2581 OneByOneAggrImpls::Basic(i) => i.give(d),
2582 OneByOneAggrImpls::Accumulable(i) => i.give(d),
2583 OneByOneAggrImpls::Hierarchical(i) => i.give(d),
2584 }
2585 }
2586
2587 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2588 match self {
2590 OneByOneAggrImpls::Basic(i) => i.get_current_aggregate(temp_storage),
2591 OneByOneAggrImpls::Accumulable(i) => i.get_current_aggregate(temp_storage),
2592 OneByOneAggrImpls::Hierarchical(i) => i.get_current_aggregate(temp_storage),
2593 }
2594 }
2595 }
2596
2597 pub struct AccumulableOneByOneAggr {
2598 aggr_func: AggregateFunc,
2599 accum: Accum,
2600 total: Diff,
2601 }
2602
2603 impl AccumulableOneByOneAggr {
2604 fn new(aggr_func: &AggregateFunc) -> Self {
2605 AccumulableOneByOneAggr {
2606 aggr_func: aggr_func.clone(),
2607 accum: accumulable_zero(aggr_func),
2608 total: Diff::ZERO,
2609 }
2610 }
2611
2612 fn give(&mut self, d: &Datum) {
2613 self.accum
2614 .plus_equals(&datum_to_accumulator(&self.aggr_func, d.clone()));
2615 self.total += Diff::ONE;
2616 }
2617
2618 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2619 temp_storage.make_datum(|packer| {
2620 packer.push(finalize_accum(&self.aggr_func, &self.accum, self.total));
2621 })
2622 }
2623 }
2624
2625 pub struct HierarchicalOneByOneAggr {
2626 aggr_func: AggregateFunc,
2627 monoid: ReductionMonoid,
2630 }
2631
2632 impl HierarchicalOneByOneAggr {
2633 fn new(aggr_func: &AggregateFunc) -> Self {
2634 let mut row_buf = Row::default();
2635 row_buf.packer().push(Datum::Null);
2636 HierarchicalOneByOneAggr {
2637 aggr_func: aggr_func.clone(),
2638 monoid: get_monoid(row_buf, aggr_func)
2639 .expect("aggr_func should be a hierarchical aggregation function"),
2640 }
2641 }
2642
2643 fn give(&mut self, d: &Datum) {
2644 let mut row_buf = Row::default();
2645 row_buf.packer().push(d);
2646 let m = get_monoid(row_buf, &self.aggr_func)
2647 .expect("aggr_func should be a hierarchical aggregation function");
2648 self.monoid.plus_equals(&m);
2649 }
2650
2651 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
2652 temp_storage.make_datum(|packer| packer.extend(self.monoid.finalize().iter()))
2653 }
2654 }
2655}