differential_dataflow/operators/
threshold.rs

1//! Reduce the collection to one occurrence of each distinct element.
2//!
3//! The `distinct_total` and `distinct_total_u` operators are optimizations of the more general
4//! `distinct` and `distinct_u` operators for the case in which time is totally ordered.
5
6use timely::order::TotalOrder;
7use timely::dataflow::*;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, Collection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::{Arranged, ArrangeBySelf};
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19/// Extension trait for the `distinct` differential dataflow method.
20pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
21    /// Reduces the collection to one occurrence of each distinct element.
22    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
23    where
24        R2: Semigroup+'static,
25        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26        ;
27    /// Reduces the collection to one occurrence of each distinct element.
28    ///
29    /// # Examples
30    ///
31    /// ```
32    /// use differential_dataflow::input::Input;
33    /// use differential_dataflow::operators::ThresholdTotal;
34    ///
35    /// ::timely::example(|scope| {
36    ///     // report the number of occurrences of each key
37    ///     scope.new_collection_from(1 .. 10).1
38    ///          .map(|x| x / 3)
39    ///          .threshold_total(|_,c| c % 2);
40    /// });
41    /// ```
42    fn threshold_total<R2: Abelian+'static, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
43        self.threshold_semigroup(move |key, new, old| {
44            let mut new = thresh(key, new);
45            if let Some(old) = old { 
46                let mut add = thresh(key, old);
47                add.negate();
48                new.plus_equals(&add); 
49            }
50            if !new.is_zero() { Some(new) } else { None }
51        })
52    }
53    /// Reduces the collection to one occurrence of each distinct element.
54    ///
55    /// This reduction only tests whether the weight associated with a record is non-zero, and otherwise
56    /// ignores its specific value. To take more general actions based on the accumulated weight, consider
57    /// the `threshold` method.
58    ///
59    /// # Examples
60    ///
61    /// ```
62    /// use differential_dataflow::input::Input;
63    /// use differential_dataflow::operators::ThresholdTotal;
64    ///
65    /// ::timely::example(|scope| {
66    ///     // report the number of occurrences of each key
67    ///     scope.new_collection_from(1 .. 10).1
68    ///          .map(|x| x / 3)
69    ///          .distinct_total();
70    /// });
71    /// ```
72    fn distinct_total(&self) -> Collection<G, K, isize> {
73        self.distinct_total_core()
74    }
75
76    /// Distinct for general integer differences.
77    ///
78    /// This method allows `distinct` to produce collections whose difference
79    /// type is something other than an `isize` integer, for example perhaps an
80    /// `i32`.
81    fn distinct_total_core<R2: Abelian+From<i8>+'static>(&self) -> Collection<G, K, R2> {
82        self.threshold_total(|_,_| R2::from(1i8))
83    }
84
85}
86
87impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
88where G::Timestamp: TotalOrder+Lattice+Ord {
89    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
90    where
91        R2: Semigroup+'static,
92        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
93    {
94        self.arrange_by_self_named("Arrange: ThresholdTotal")
95            .threshold_semigroup(thresh)
96    }
97}
98
99impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
100where
101    G: Scope<Timestamp=T1::Time>,
102    T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a ()>+Clone+'static,
103    for<'a> T1::Diff : Semigroup<T1::DiffGat<'a>>,
104    K: ExchangeData,
105    T1::Time: TotalOrder,
106    T1::Diff: ExchangeData,
107{
108    fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
109    where
110        R2: Semigroup+'static,
111        F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
112    {
113
114        let mut trace = self.trace.clone();
115
116        self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
117
118            // tracks the lower and upper limit of received batches.
119            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
120            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
121
122            move |input, output| {
123
124                let mut batch_cursors = Vec::new();
125                let mut batch_storage = Vec::new();
126
127                // Downgrde previous upper limit to be current lower limit.
128                lower_limit.clear();
129                lower_limit.extend(upper_limit.borrow().iter().cloned());
130
131                let mut cap = None;
132                input.for_each(|capability, batches| {
133                    if cap.is_none() {                          // NB: Assumes batches are in-order
134                        cap = Some(capability.retain());
135                    }
136                    for batch in batches.drain(..) {
137                        upper_limit.clone_from(batch.upper());  // NB: Assumes batches are in-order
138                        batch_cursors.push(batch.cursor());
139                        batch_storage.push(batch);
140                    }
141                });
142
143                use crate::IntoOwned;
144                if let Some(capability) = cap {
145
146                    let mut session = output.session(&capability);
147
148                    use crate::trace::cursor::CursorList;
149                    let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
150                    let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
151
152                    while let Some(key) = batch_cursor.get_key(&batch_storage) {
153                        let mut count: Option<T1::Diff> = None;
154
155                        // Compute the multiplicity of this key before the current batch.
156                        trace_cursor.seek_key(&trace_storage, key);
157                        if trace_cursor.get_key(&trace_storage) == Some(key) {
158                            trace_cursor.map_times(&trace_storage, |_, diff| {
159                                count.as_mut().map(|c| c.plus_equals(&diff));
160                                if count.is_none() { count = Some(diff.into_owned()); }
161                            });
162                        }
163
164                        // Apply `thresh` both before and after `diff` is applied to `count`.
165                        // If the result is non-zero, send it along.
166                        batch_cursor.map_times(&batch_storage, |time, diff| {
167
168                            let difference =
169                            match &count {
170                                Some(old) => {
171                                    let mut temp = old.clone();
172                                    temp.plus_equals(&diff);
173                                    thresh(key, &temp, Some(old))
174                                },
175                                None => { thresh(key, &diff.into_owned(), None) },
176                            };
177
178                            // Either add or assign `diff` to `count`.
179                            if let Some(count) = &mut count {
180                                count.plus_equals(&diff);
181                            }
182                            else {
183                                count = Some(diff.into_owned());
184                            }
185
186                            if let Some(difference) = difference {
187                                if !difference.is_zero() {
188                                    session.give((key.clone(), time.into_owned(), difference));
189                                }
190                            }
191                        });
192
193                        batch_cursor.step_key(&batch_storage);
194                    }
195                }
196
197                // tidy up the shared input trace.
198                trace.advance_upper(&mut upper_limit);
199                trace.set_logical_compaction(upper_limit.borrow());
200                trace.set_physical_compaction(upper_limit.borrow());
201            }
202        })
203        .as_collection()
204    }
205}