1use differential_dataflow::Data;
15use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
16use differential_dataflow::trace::{Builder, Trace, TraceReader};
17use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
18use mz_expr::MirScalarExpr;
19use mz_repr::Diff;
20use timely::Container;
21use timely::container::PushInto;
22use timely::dataflow::Scope;
23use timely::progress::timestamp::Refines;
24
25use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
26use crate::extensions::reduce::MzReduce;
27use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
28use crate::row_spine::RowRowBuilder;
29use crate::typedefs::{ErrBatcher, ErrBuilder, MzData, MzTimestamp};
30
31fn threshold_arrangement<G, T1, Bu2, T2, L>(
33 arrangement: &Arranged<G, T1>,
34 name: &str,
35 logic: L,
36) -> Arranged<G, TraceAgent<T2>>
37where
38 G: Scope,
39 G::Timestamp: MzTimestamp,
40 T1: TraceReader<KeyOwn: MzData + Data, ValOwn: MzData + Data, Time = G::Timestamp, Diff = Diff>
41 + Clone
42 + 'static,
43 Bu2: Builder<
44 Time = G::Timestamp,
45 Input: Container + PushInto<((T1::KeyOwn, T1::ValOwn), G::Timestamp, Diff)>,
46 Output = T2::Batch,
47 >,
48 T2: for<'a> Trace<
49 Key<'a> = T1::Key<'a>,
50 Val<'a> = T1::Val<'a>,
51 KeyOwn = T1::KeyOwn,
52 ValOwn = T1::ValOwn,
53 Time = G::Timestamp,
54 Diff = Diff,
55 > + 'static,
56 L: Fn(&Diff) -> bool + 'static,
57 Arranged<G, TraceAgent<T2>>: ArrangementSize,
58{
59 arrangement.mz_reduce_abelian::<_, Bu2, T2>(name, move |_key, s, t| {
60 for (record, count) in s.iter() {
61 if logic(count) {
62 t.push((T1::owned_val(*record), *count));
63 }
64 }
65 })
66}
67
68pub fn build_threshold_basic<G, T>(
73 input: CollectionBundle<G, T>,
74 key: Vec<MirScalarExpr>,
75) -> CollectionBundle<G, T>
76where
77 G: Scope,
78 G::Timestamp: MzTimestamp + Refines<T>,
79 T: MzTimestamp,
80{
81 let arrangement = input
82 .arrangement(&key)
83 .expect("Arrangement ensured to exist");
84 match arrangement {
85 ArrangementFlavor::Local(oks, errs) => {
86 let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
87 &oks,
88 "Threshold local",
89 |count| count.is_positive(),
90 );
91 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
92 }
93 ArrangementFlavor::Trace(_, oks, errs) => {
94 let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
95 &oks,
96 "Threshold trace",
97 |count| count.is_positive(),
98 );
99 let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
100 let errs = errs
101 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange threshold basic err");
102 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
103 }
104 }
105}
106
107impl<G, T> Context<G, T>
108where
109 G: Scope,
110 G::Timestamp: MzTimestamp + Refines<T>,
111 T: MzTimestamp,
112{
113 pub(crate) fn render_threshold(
114 &self,
115 input: CollectionBundle<G, T>,
116 threshold_plan: ThresholdPlan,
117 ) -> CollectionBundle<G, T> {
118 match threshold_plan {
119 ThresholdPlan::Basic(BasicThresholdPlan {
120 ensure_arrangement: (key, _, _),
121 }) => {
122 build_threshold_basic(input, key)
126 }
127 }
128 }
129}