1use differential_dataflow::Data;
15use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
16use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
17use differential_dataflow::trace::{Builder, Trace, TraceReader};
18use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
19use mz_expr::MirScalarExpr;
20use mz_repr::Diff;
21use timely::Container;
22use timely::container::PushInto;
23use timely::dataflow::Scope;
24use timely::progress::timestamp::Refines;
25
26use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
27use crate::extensions::reduce::MzReduce;
28use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
29use crate::row_spine::RowRowBuilder;
30use crate::typedefs::{ErrBatcher, ErrBuilder, MzData, MzTimestamp};
31
32fn threshold_arrangement<G, T1, Bu2, T2, L>(
34 arrangement: &Arranged<G, T1>,
35 name: &str,
36 logic: L,
37) -> Arranged<G, TraceAgent<T2>>
38where
39 G: Scope,
40 G::Timestamp: MzTimestamp,
41 T1: TraceReader<KeyOwn: MzData + Data, ValOwn: MzData + Data, Time = G::Timestamp, Diff = Diff>
42 + Clone
43 + 'static,
44 Bu2: Builder<
45 Time = G::Timestamp,
46 Input: Container
47 + MergerChunk
48 + PushInto<((T1::KeyOwn, T1::ValOwn), G::Timestamp, Diff)>,
49 Output = T2::Batch,
50 >,
51 T2: for<'a> Trace<
52 Key<'a> = T1::Key<'a>,
53 Val<'a> = T1::Val<'a>,
54 KeyOwn = T1::KeyOwn,
55 ValOwn = T1::ValOwn,
56 Time = G::Timestamp,
57 Diff = Diff,
58 > + 'static,
59 L: Fn(&Diff) -> bool + 'static,
60 Arranged<G, TraceAgent<T2>>: ArrangementSize,
61{
62 arrangement.mz_reduce_abelian::<_, Bu2, T2>(name, move |_key, s, t| {
63 for (record, count) in s.iter() {
64 if logic(count) {
65 t.push((T1::owned_val(*record), *count));
66 }
67 }
68 })
69}
70
71pub fn build_threshold_basic<G, T>(
76 input: CollectionBundle<G, T>,
77 key: Vec<MirScalarExpr>,
78) -> CollectionBundle<G, T>
79where
80 G: Scope,
81 G::Timestamp: MzTimestamp + Refines<T>,
82 T: MzTimestamp,
83{
84 let arrangement = input
85 .arrangement(&key)
86 .expect("Arrangement ensured to exist");
87 match arrangement {
88 ArrangementFlavor::Local(oks, errs) => {
89 let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
90 &oks,
91 "Threshold local",
92 |count| count.is_positive(),
93 );
94 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
95 }
96 ArrangementFlavor::Trace(_, oks, errs) => {
97 let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
98 &oks,
99 "Threshold trace",
100 |count| count.is_positive(),
101 );
102 let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
103 let errs = errs
104 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange threshold basic err");
105 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
106 }
107 }
108}
109
110impl<G, T> Context<G, T>
111where
112 G: Scope,
113 G::Timestamp: MzTimestamp + Refines<T>,
114 T: MzTimestamp,
115{
116 pub(crate) fn render_threshold(
117 &self,
118 input: CollectionBundle<G, T>,
119 threshold_plan: ThresholdPlan,
120 ) -> CollectionBundle<G, T> {
121 match threshold_plan {
122 ThresholdPlan::Basic(BasicThresholdPlan {
123 ensure_arrangement: (key, _, _),
124 }) => {
125 build_threshold_basic(input, key)
129 }
130 }
131 }
132}