1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
//! Reduce the collection to one occurrence of each distinct element.
//!
//! The `distinct_total` and `distinct_total_u` operators are optimizations of the more general
//! `distinct` and `distinct_u` operators for the case in which time is totally ordered.

use timely::order::TotalOrder;
use timely::dataflow::*;
use timely::dataflow::operators::Operator;
use timely::dataflow::channels::pact::Pipeline;

use crate::lattice::Lattice;
use crate::{ExchangeData, Collection};
use crate::difference::{Semigroup, Abelian};
use crate::hashable::Hashable;
use crate::collection::AsCollection;
use crate::operators::arrange::{Arranged, ArrangeBySelf};
use crate::trace::{BatchReader, Cursor, TraceReader};

/// Extension trait for the `distinct` differential dataflow method.
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
    /// Reduces the collection to one occurrence of each distinct element.
    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
    where
        R2: Semigroup+'static,
        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
        ;
    /// Reduces the collection to one occurrence of each distinct element.
    ///
    /// # Examples
    ///
    /// ```
    /// use differential_dataflow::input::Input;
    /// use differential_dataflow::operators::ThresholdTotal;
    ///
    /// ::timely::example(|scope| {
    ///     // report the number of occurrences of each key
    ///     scope.new_collection_from(1 .. 10).1
    ///          .map(|x| x / 3)
    ///          .threshold_total(|_,c| c % 2);
    /// });
    /// ```
    fn threshold_total<R2: Abelian+'static, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
        self.threshold_semigroup(move |key, new, old| {
            let mut new = thresh(key, new);
            if let Some(old) = old { 
                let mut add = thresh(key, old);
                add.negate();
                new.plus_equals(&add); 
            }
            if !new.is_zero() { Some(new) } else { None }
        })
    }
    /// Reduces the collection to one occurrence of each distinct element.
    ///
    /// This reduction only tests whether the weight associated with a record is non-zero, and otherwise
    /// ignores its specific value. To take more general actions based on the accumulated weight, consider
    /// the `threshold` method.
    ///
    /// # Examples
    ///
    /// ```
    /// use differential_dataflow::input::Input;
    /// use differential_dataflow::operators::ThresholdTotal;
    ///
    /// ::timely::example(|scope| {
    ///     // report the number of occurrences of each key
    ///     scope.new_collection_from(1 .. 10).1
    ///          .map(|x| x / 3)
    ///          .distinct_total();
    /// });
    /// ```
    fn distinct_total(&self) -> Collection<G, K, isize> {
        self.distinct_total_core()
    }

    /// Distinct for general integer differences.
    ///
    /// This method allows `distinct` to produce collections whose difference
    /// type is something other than an `isize` integer, for example perhaps an
    /// `i32`.
    fn distinct_total_core<R2: Abelian+From<i8>+'static>(&self) -> Collection<G, K, R2> {
        self.threshold_total(|_,_| R2::from(1i8))
    }

}

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: TotalOrder+Lattice+Ord {
    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
    where
        R2: Semigroup+'static,
        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
    {
        self.arrange_by_self_named("Arrange: ThresholdTotal")
            .threshold_semigroup(thresh)
    }
}

impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
where
    G: Scope<Timestamp=T1::Time>,
    T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a ()>+Clone+'static,
    for<'a> T1::Diff : Semigroup<T1::DiffGat<'a>>,
    K: ExchangeData,
    T1::Time: TotalOrder,
    T1::Diff: ExchangeData,
{
    fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
    where
        R2: Semigroup+'static,
        F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
    {

        let mut trace = self.trace.clone();

        self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

            // tracks the lower and upper limit of received batches.
            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

            move |input, output| {

                let mut batch_cursors = Vec::new();
                let mut batch_storage = Vec::new();

                // Downgrde previous upper limit to be current lower limit.
                lower_limit.clear();
                lower_limit.extend(upper_limit.borrow().iter().cloned());

                let mut cap = None;
                input.for_each(|capability, batches| {
                    if cap.is_none() {                          // NB: Assumes batches are in-order
                        cap = Some(capability.retain());
                    }
                    for batch in batches.drain(..) {
                        upper_limit.clone_from(batch.upper());  // NB: Assumes batches are in-order
                        batch_cursors.push(batch.cursor());
                        batch_storage.push(batch);
                    }
                });

                use crate::trace::cursor::IntoOwned;
                if let Some(capability) = cap {

                    let mut session = output.session(&capability);

                    use crate::trace::cursor::CursorList;
                    let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
                    let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();

                    while let Some(key) = batch_cursor.get_key(&batch_storage) {
                        let mut count: Option<T1::Diff> = None;

                        // Compute the multiplicity of this key before the current batch.
                        trace_cursor.seek_key(&trace_storage, key);
                        if trace_cursor.get_key(&trace_storage) == Some(key) {
                            trace_cursor.map_times(&trace_storage, |_, diff| {
                                count.as_mut().map(|c| c.plus_equals(&diff));
                                if count.is_none() { count = Some(diff.into_owned()); }
                            });
                        }

                        // Apply `thresh` both before and after `diff` is applied to `count`.
                        // If the result is non-zero, send it along.
                        batch_cursor.map_times(&batch_storage, |time, diff| {

                            let difference =
                            match &count {
                                Some(old) => {
                                    let mut temp = old.clone();
                                    temp.plus_equals(&diff);
                                    thresh(key, &temp, Some(old))
                                },
                                None => { thresh(key, &diff.into_owned(), None) },
                            };

                            // Either add or assign `diff` to `count`.
                            if let Some(count) = &mut count {
                                count.plus_equals(&diff);
                            }
                            else {
                                count = Some(diff.into_owned());
                            }

                            if let Some(difference) = difference {
                                if !difference.is_zero() {
                                    session.give((key.clone(), time.into_owned(), difference));
                                }
                            }
                        });

                        batch_cursor.step_key(&batch_storage);
                    }
                }

                // tidy up the shared input trace.
                trace.advance_upper(&mut upper_limit);
                trace.set_logical_compaction(upper_limit.borrow());
                trace.set_physical_compaction(upper_limit.borrow());
            }
        })
        .as_collection()
    }
}