differential_dataflow/operators/
threshold.rs
1use timely::order::TotalOrder;
7use timely::dataflow::*;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, Collection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::{Arranged, ArrangeBySelf};
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
21 fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
23 where
24 R2: Semigroup+'static,
25 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26 ;
27 fn threshold_total<R2: Abelian+'static, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
43 self.threshold_semigroup(move |key, new, old| {
44 let mut new = thresh(key, new);
45 if let Some(old) = old {
46 let mut add = thresh(key, old);
47 add.negate();
48 new.plus_equals(&add);
49 }
50 if !new.is_zero() { Some(new) } else { None }
51 })
52 }
53 fn distinct_total(&self) -> Collection<G, K, isize> {
73 self.distinct_total_core()
74 }
75
76 fn distinct_total_core<R2: Abelian+From<i8>+'static>(&self) -> Collection<G, K, R2> {
82 self.threshold_total(|_,_| R2::from(1i8))
83 }
84
85}
86
87impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
88where G::Timestamp: TotalOrder+Lattice+Ord {
89 fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
90 where
91 R2: Semigroup+'static,
92 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
93 {
94 self.arrange_by_self_named("Arrange: ThresholdTotal")
95 .threshold_semigroup(thresh)
96 }
97}
98
99impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
100where
101 G: Scope<Timestamp=T1::Time>,
102 T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a ()>+Clone+'static,
103 for<'a> T1::Diff : Semigroup<T1::DiffGat<'a>>,
104 K: ExchangeData,
105 T1::Time: TotalOrder,
106 T1::Diff: ExchangeData,
107{
108 fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
109 where
110 R2: Semigroup+'static,
111 F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
112 {
113
114 let mut trace = self.trace.clone();
115
116 self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
117
118 let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
120 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
121
122 move |input, output| {
123
124 let mut batch_cursors = Vec::new();
125 let mut batch_storage = Vec::new();
126
127 lower_limit.clear();
129 lower_limit.extend(upper_limit.borrow().iter().cloned());
130
131 let mut cap = None;
132 input.for_each(|capability, batches| {
133 if cap.is_none() { cap = Some(capability.retain());
135 }
136 for batch in batches.drain(..) {
137 upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor());
139 batch_storage.push(batch);
140 }
141 });
142
143 use crate::IntoOwned;
144 if let Some(capability) = cap {
145
146 let mut session = output.session(&capability);
147
148 use crate::trace::cursor::CursorList;
149 let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
150 let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
151
152 while let Some(key) = batch_cursor.get_key(&batch_storage) {
153 let mut count: Option<T1::Diff> = None;
154
155 trace_cursor.seek_key(&trace_storage, key);
157 if trace_cursor.get_key(&trace_storage) == Some(key) {
158 trace_cursor.map_times(&trace_storage, |_, diff| {
159 count.as_mut().map(|c| c.plus_equals(&diff));
160 if count.is_none() { count = Some(diff.into_owned()); }
161 });
162 }
163
164 batch_cursor.map_times(&batch_storage, |time, diff| {
167
168 let difference =
169 match &count {
170 Some(old) => {
171 let mut temp = old.clone();
172 temp.plus_equals(&diff);
173 thresh(key, &temp, Some(old))
174 },
175 None => { thresh(key, &diff.into_owned(), None) },
176 };
177
178 if let Some(count) = &mut count {
180 count.plus_equals(&diff);
181 }
182 else {
183 count = Some(diff.into_owned());
184 }
185
186 if let Some(difference) = difference {
187 if !difference.is_zero() {
188 session.give((key.clone(), time.into_owned(), difference));
189 }
190 }
191 });
192
193 batch_cursor.step_key(&batch_storage);
194 }
195 }
196
197 trace.advance_upper(&mut upper_limit);
199 trace.set_logical_compaction(upper_limit.borrow());
200 trace.set_physical_compaction(upper_limit.borrow());
201 }
202 })
203 .as_collection()
204 }
205}