differential_dataflow/operators/
count.rs

1//! Count the number of occurrences of each element.
2
3use timely::order::TotalOrder;
4use timely::dataflow::*;
5use timely::dataflow::operators::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8use crate::lattice::Lattice;
9use crate::{IntoOwned, ExchangeData, Collection};
10use crate::difference::{IsZero, Semigroup};
11use crate::hashable::Hashable;
12use crate::collection::AsCollection;
13use crate::operators::arrange::{Arranged, ArrangeBySelf};
14use crate::trace::{BatchReader, Cursor, TraceReader};
15
16/// Extension trait for the `count` differential dataflow method.
17pub trait CountTotal<G: Scope, K: ExchangeData, R: Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
18    /// Counts the number of occurrences of each element.
19    ///
20    /// # Examples
21    ///
22    /// ```
23    /// use differential_dataflow::input::Input;
24    /// use differential_dataflow::operators::CountTotal;
25    ///
26    /// ::timely::example(|scope| {
27    ///     // report the number of occurrences of each key
28    ///     scope.new_collection_from(1 .. 10).1
29    ///          .map(|x| x / 3)
30    ///          .count_total();
31    /// });
32    /// ```
33    fn count_total(&self) -> Collection<G, (K, R), isize> {
34        self.count_total_core()
35    }
36
37    /// Count for general integer differences.
38    ///
39    /// This method allows `count_total` to produce collections whose difference
40    /// type is something other than an `isize` integer, for example perhaps an
41    /// `i32`.
42    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2>;
43}
44
45impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
46where G::Timestamp: TotalOrder+Lattice+Ord {
47    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
48        self.arrange_by_self_named("Arrange: CountTotal")
49            .count_total_core()
50    }
51}
52
53impl<G, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
54where
55    G: Scope<Timestamp=T1::Time>,
56    T1: for<'a> TraceReader<Val<'a>=&'a ()>+Clone+'static,
57    for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
58    for<'a> T1::Diff : Semigroup<T1::DiffGat<'a>>,
59    K: ExchangeData,
60    T1::Time: TotalOrder,
61    T1::Diff: ExchangeData,
62{
63    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, T1::Diff), R2> {
64
65        let mut trace = self.trace.clone();
66
67        self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
68
69            // tracks the lower and upper limit of received batches.
70            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
71            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
72
73            move |input, output| {
74
75                let mut batch_cursors = Vec::new();
76                let mut batch_storage = Vec::new();
77
78                // Downgrade previous upper limit to be current lower limit.
79                lower_limit.clear();
80                lower_limit.extend(upper_limit.borrow().iter().cloned());
81
82                let mut cap = None;
83                input.for_each(|capability, batches| {
84                    if cap.is_none() {                          // NB: Assumes batches are in-order
85                        cap = Some(capability.retain());
86                    }
87                    for batch in batches.drain(..) {
88                        upper_limit.clone_from(batch.upper());  // NB: Assumes batches are in-order
89                        batch_cursors.push(batch.cursor());
90                        batch_storage.push(batch);
91                    }
92                });
93
94                if let Some(capability) = cap {
95
96                    let mut session = output.session(&capability);
97
98                    use crate::trace::cursor::CursorList;
99                    let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
100                    let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
101
102                    while let Some(key) = batch_cursor.get_key(&batch_storage) {
103                        let mut count: Option<T1::Diff> = None;
104
105                        trace_cursor.seek_key(&trace_storage, key);
106                        if trace_cursor.get_key(&trace_storage) == Some(key) {
107                            trace_cursor.map_times(&trace_storage, |_, diff| {
108                                count.as_mut().map(|c| c.plus_equals(&diff));
109                                if count.is_none() { count = Some(diff.into_owned()); }
110                            });
111                        }
112
113                        batch_cursor.map_times(&batch_storage, |time, diff| {
114
115                            if let Some(count) = count.as_ref() {
116                                if !count.is_zero() {
117                                    session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
118                                }
119                            }
120                            count.as_mut().map(|c| c.plus_equals(&diff));
121                            if count.is_none() { count = Some(diff.into_owned()); }
122                            if let Some(count) = count.as_ref() {
123                                if !count.is_zero() {
124                                    session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
125                                }
126                            }
127                        });
128
129                        batch_cursor.step_key(&batch_storage);
130                    }
131                }
132
133                // tidy up the shared input trace.
134                trace.advance_upper(&mut upper_limit);
135                trace.set_logical_compaction(upper_limit.borrow());
136                trace.set_physical_compaction(upper_limit.borrow());
137            }
138        })
139        .as_collection()
140    }
141}