differential_dataflow/operators/
threshold.rs1use timely::order::TotalOrder;
7use timely::dataflow::*;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, Collection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::{Arranged, ArrangeBySelf};
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19pub trait ThresholdTotal<G: Scope<Timestamp: TotalOrder+Lattice+Ord>, K: ExchangeData, R: ExchangeData+Semigroup> {
21    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
23    where
24        R2: Semigroup+'static,
25        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26        ;
27    fn threshold_total<R2: Abelian+'static, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
43        self.threshold_semigroup(move |key, new, old| {
44            let mut new = thresh(key, new);
45            if let Some(old) = old {
46                let mut add = thresh(key, old);
47                add.negate();
48                new.plus_equals(&add);
49            }
50            if !new.is_zero() { Some(new) } else { None }
51        })
52    }
53    fn distinct_total(&self) -> Collection<G, K, isize> {
73        self.distinct_total_core()
74    }
75
76    fn distinct_total_core<R2: Abelian+From<i8>+'static>(&self) -> Collection<G, K, R2> {
82        self.threshold_total(|_,_| R2::from(1i8))
83    }
84
85}
86
87impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
88where
89    G: Scope<Timestamp: TotalOrder+Lattice+Ord>,
90{
91    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
92    where
93        R2: Semigroup+'static,
94        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
95    {
96        self.arrange_by_self_named("Arrange: ThresholdTotal")
97            .threshold_semigroup(thresh)
98    }
99}
100
101impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
102where
103    G: Scope<Timestamp=T1::Time>,
104    T1: for<'a> TraceReader<
105        Key<'a>=&'a K,
106        Val<'a>=&'a (),
107        Time: TotalOrder,
108        Diff : ExchangeData + Semigroup<T1::DiffGat<'a>>,
109    >+Clone+'static,
110    K: ExchangeData,
111{
112    fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
113    where
114        R2: Semigroup+'static,
115        F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
116    {
117
118        let mut trace = self.trace.clone();
119
120        self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
121
122            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
124            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
125
126            move |input, output| {
127
128                let mut batch_cursors = Vec::new();
129                let mut batch_storage = Vec::new();
130
131                lower_limit.clear();
133                lower_limit.extend(upper_limit.borrow().iter().cloned());
134
135                let mut cap = None;
136                input.for_each(|capability, batches| {
137                    if cap.is_none() {                          cap = Some(capability.retain());
139                    }
140                    for batch in batches.drain(..) {
141                        upper_limit.clone_from(batch.upper());  batch_cursors.push(batch.cursor());
143                        batch_storage.push(batch);
144                    }
145                });
146
147                if let Some(capability) = cap {
148
149                    let mut session = output.session(&capability);
150
151                    use crate::trace::cursor::CursorList;
152                    let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
153                    let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
154
155                    while let Some(key) = batch_cursor.get_key(&batch_storage) {
156                        let mut count: Option<T1::Diff> = None;
157
158                        trace_cursor.seek_key(&trace_storage, key);
160                        if trace_cursor.get_key(&trace_storage) == Some(key) {
161                            trace_cursor.map_times(&trace_storage, |_, diff| {
162                                count.as_mut().map(|c| c.plus_equals(&diff));
163                                if count.is_none() { count = Some(T1::owned_diff(diff)); }
164                            });
165                        }
166
167                        batch_cursor.map_times(&batch_storage, |time, diff| {
170
171                            let difference =
172                            match &count {
173                                Some(old) => {
174                                    let mut temp = old.clone();
175                                    temp.plus_equals(&diff);
176                                    thresh(key, &temp, Some(old))
177                                },
178                                None => { thresh(key, &T1::owned_diff(diff), None) },
179                            };
180
181                            if let Some(count) = &mut count {
183                                count.plus_equals(&diff);
184                            }
185                            else {
186                                count = Some(T1::owned_diff(diff));
187                            }
188
189                            if let Some(difference) = difference {
190                                if !difference.is_zero() {
191                                    session.give((key.clone(), T1::owned_time(time), difference));
192                                }
193                            }
194                        });
195
196                        batch_cursor.step_key(&batch_storage);
197                    }
198                }
199
200                trace.advance_upper(&mut upper_limit);
202                trace.set_logical_compaction(upper_limit.borrow());
203                trace.set_physical_compaction(upper_limit.borrow());
204            }
205        })
206        .as_collection()
207    }
208}