1use differential_dataflow::Data;
15use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
16use differential_dataflow::trace::implementations::BatchContainer;
17use differential_dataflow::trace::{Builder, Trace, TraceReader};
18use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
19use mz_expr::MirScalarExpr;
20use mz_repr::Diff;
21use mz_timely_util::columnation::ColumnationChunker;
22use timely::Container;
23use timely::container::PushInto;
24
25use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
26use crate::extensions::reduce::{ClearContainer, MzReduce};
27use crate::render::RenderTimestamp;
28use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
29use crate::typedefs::{ErrBatcher, ErrBuilder, MzData, MzTimestamp};
30use mz_row_spine::RowRowBuilder;
31
32fn threshold_arrangement<'scope, Ts, T1, Bu2, T2, L>(
34 arrangement: Arranged<'scope, T1>,
35 name: &str,
36 logic: L,
37) -> Arranged<'scope, TraceAgent<T2>>
38where
39 Ts: MzTimestamp,
40 T1: TraceReader<
41 KeyContainer: BatchContainer<Owned: MzData + Data>,
42 ValOwn: MzData + Data,
43 Time = Ts,
44 Diff = Diff,
45 > + Clone
46 + 'static,
47 Bu2: Builder<
48 Time = Ts,
49 Input: Container
50 + ClearContainer
51 + PushInto<(
52 (<T1::KeyContainer as BatchContainer>::Owned, T1::ValOwn),
53 Ts,
54 Diff,
55 )>,
56 Output = T2::Batch,
57 >,
58 T2: for<'a> Trace<
59 Key<'a> = T1::Key<'a>,
60 Val<'a> = T1::Val<'a>,
61 KeyContainer: BatchContainer<Owned = <T1::KeyContainer as BatchContainer>::Owned>,
62 ValOwn = T1::ValOwn,
63 Time = Ts,
64 Diff = Diff,
65 > + 'static,
66 L: Fn(&Diff) -> bool + 'static,
67 Arranged<'scope, TraceAgent<T2>>: ArrangementSize,
68{
69 arrangement.mz_reduce_abelian::<_, Bu2, T2>(name, move |_key, s, t| {
70 for (record, count) in s.iter() {
71 if logic(count) {
72 t.push((T1::owned_val(*record), *count));
73 }
74 }
75 })
76}
77
78pub fn build_threshold_basic<'scope, T: RenderTimestamp>(
83 input: CollectionBundle<'scope, T>,
84 key: Vec<MirScalarExpr>,
85) -> CollectionBundle<'scope, T> {
86 let arrangement = input
87 .arrangement(&key)
88 .expect("Arrangement ensured to exist");
89 match arrangement {
90 ArrangementFlavor::Local(oks, errs) => {
91 let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
92 oks,
93 "Threshold local",
94 |count| count.is_positive(),
95 );
96 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
97 }
98 ArrangementFlavor::Trace(_, oks, errs) => {
99 let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
100 oks,
101 "Threshold trace",
102 |count| count.is_positive(),
103 );
104 let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
105 let errs = errs
106 .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
107 "Arrange threshold basic err",
108 );
109 CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
110 }
111 }
112}
113
114impl<'scope, T: RenderTimestamp> Context<'scope, T> {
115 pub(crate) fn render_threshold(
116 &self,
117 input: CollectionBundle<'scope, T>,
118 threshold_plan: ThresholdPlan,
119 ) -> CollectionBundle<'scope, T> {
120 match threshold_plan {
121 ThresholdPlan::Basic(BasicThresholdPlan {
122 ensure_arrangement: (key, _, _),
123 }) => {
124 build_threshold_basic(input, key)
128 }
129 }
130 }
131}