1use differential_dataflow::Data;
15use differential_dataflow::IntoOwned;
16use differential_dataflow::containers::Columnation;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
19use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
20use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
21use mz_expr::MirScalarExpr;
22use mz_repr::Diff;
23use timely::Container;
24use timely::container::PushInto;
25use timely::dataflow::Scope;
26use timely::progress::Timestamp;
27use timely::progress::timestamp::Refines;
28
29use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
30use crate::extensions::reduce::MzReduce;
31use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
32use crate::row_spine::RowRowBuilder;
33use crate::typedefs::{ErrBatcher, ErrBuilder};
34
35fn threshold_arrangement<G, K, V, T1, Bu2, T2, L>(
37 arrangement: &Arranged<G, T1>,
38 name: &str,
39 logic: L,
40) -> Arranged<G, TraceAgent<T2>>
41where
42 G: Scope,
43 G::Timestamp: Lattice + Columnation,
44 V: Data + Columnation,
45 T1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
46 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
47 for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V>,
48 K: Columnation + Data,
49 Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
50 Bu2::Input: Container + PushInto<((K, V), G::Timestamp, Diff)>,
51 T2: for<'a> Trace<
52 Key<'a> = T1::Key<'a>,
53 Val<'a> = T1::Val<'a>,
54 Time = G::Timestamp,
55 Diff = Diff,
56 > + 'static,
57 T2::Batch: Batch,
58 L: Fn(&Diff) -> bool + 'static,
59 Arranged<G, TraceAgent<T2>>: ArrangementSize,
60{
61 arrangement.mz_reduce_abelian::<_, _, _, Bu2, T2>(name, move |_key, s, t| {
62 for (record, count) in s.iter() {
63 if logic(count) {
64 t.push(((*record).into_owned(), *count));
65 }
66 }
67 })
68}
69
70pub fn build_threshold_basic<G, T>(
75 input: CollectionBundle<G, T>,
76 key: Vec<MirScalarExpr>,
77) -> CollectionBundle<G, T>
78where
79 G: Scope,
80 G::Timestamp: Lattice + Refines<T> + Columnation,
81 T: Timestamp + Lattice + Columnation,
82{
83 let arrangement = input
84 .arrangement(&key)
85 .expect("Arrangement ensured to exist");
86 match arrangement {
87 ArrangementFlavor::Local(oks, errs) => {
88 let oks = threshold_arrangement::<_, _, _, _, RowRowBuilder<_, _>, _, _>(
89 &oks,
90 "Threshold local",
91 |count| count.is_positive(),
92 );
93 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
94 }
95 ArrangementFlavor::Trace(_, oks, errs) => {
96 let oks = threshold_arrangement::<_, _, _, _, RowRowBuilder<_, _>, _, _>(
97 &oks,
98 "Threshold trace",
99 |count| count.is_positive(),
100 );
101 let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
102 let errs = errs
103 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange threshold basic err");
104 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
105 }
106 }
107}
108
109impl<G, T> Context<G, T>
110where
111 G: Scope,
112 G::Timestamp: Lattice + Refines<T> + Columnation,
113 T: Timestamp + Lattice + Columnation,
114{
115 pub(crate) fn render_threshold(
116 &self,
117 input: CollectionBundle<G, T>,
118 threshold_plan: ThresholdPlan,
119 ) -> CollectionBundle<G, T> {
120 match threshold_plan {
121 ThresholdPlan::Basic(BasicThresholdPlan {
122 ensure_arrangement: (key, _, _),
123 }) => {
124 build_threshold_basic(input, key)
128 }
129 }
130 }
131}