differential_dataflow/operators/
count.rs1use timely::order::TotalOrder;
4use timely::dataflow::*;
5use timely::dataflow::operators::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8use crate::lattice::Lattice;
9use crate::{ExchangeData, Collection};
10use crate::difference::{IsZero, Semigroup};
11use crate::hashable::Hashable;
12use crate::collection::AsCollection;
13use crate::operators::arrange::{Arranged, ArrangeBySelf};
14use crate::trace::{BatchReader, Cursor, TraceReader};
15
16pub trait CountTotal<G: Scope<Timestamp: TotalOrder+Lattice+Ord>, K: ExchangeData, R: Semigroup> {
18    fn count_total(&self) -> Collection<G, (K, R), isize> {
34        self.count_total_core()
35    }
36
37    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2>;
43}
44
45impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
46where
47    G: Scope<Timestamp: TotalOrder+Lattice+Ord>,
48{
49    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
50        self.arrange_by_self_named("Arrange: CountTotal")
51            .count_total_core()
52    }
53}
54
55impl<G, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
56where
57    G: Scope<Timestamp=T1::Time>,
58    T1: for<'a> TraceReader<
59        Key<'a> = &'a K,
60        Val<'a>=&'a (),
61        Time: TotalOrder,
62        Diff: ExchangeData+Semigroup<T1::DiffGat<'a>>
63    >+Clone+'static,
64    K: ExchangeData,
65{
66    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, T1::Diff), R2> {
67
68        let mut trace = self.trace.clone();
69
70        self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
71
72            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
74            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
75
76            move |input, output| {
77
78                let mut batch_cursors = Vec::new();
79                let mut batch_storage = Vec::new();
80
81                lower_limit.clear();
83                lower_limit.extend(upper_limit.borrow().iter().cloned());
84
85                let mut cap = None;
86                input.for_each(|capability, batches| {
87                    if cap.is_none() {                          cap = Some(capability.retain());
89                    }
90                    for batch in batches.drain(..) {
91                        upper_limit.clone_from(batch.upper());  batch_cursors.push(batch.cursor());
93                        batch_storage.push(batch);
94                    }
95                });
96
97                if let Some(capability) = cap {
98
99                    let mut session = output.session(&capability);
100
101                    use crate::trace::cursor::CursorList;
102                    let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
103                    let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
104
105                    while let Some(key) = batch_cursor.get_key(&batch_storage) {
106                        let mut count: Option<T1::Diff> = None;
107
108                        trace_cursor.seek_key(&trace_storage, key);
109                        if trace_cursor.get_key(&trace_storage) == Some(key) {
110                            trace_cursor.map_times(&trace_storage, |_, diff| {
111                                count.as_mut().map(|c| c.plus_equals(&diff));
112                                if count.is_none() { count = Some(T1::owned_diff(diff)); }
113                            });
114                        }
115
116                        batch_cursor.map_times(&batch_storage, |time, diff| {
117
118                            if let Some(count) = count.as_ref() {
119                                if !count.is_zero() {
120                                    session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(-1i8)));
121                                }
122                            }
123                            count.as_mut().map(|c| c.plus_equals(&diff));
124                            if count.is_none() { count = Some(T1::owned_diff(diff)); }
125                            if let Some(count) = count.as_ref() {
126                                if !count.is_zero() {
127                                    session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(1i8)));
128                                }
129                            }
130                        });
131
132                        batch_cursor.step_key(&batch_storage);
133                    }
134                }
135
136                trace.advance_upper(&mut upper_limit);
138                trace.set_logical_compaction(upper_limit.borrow());
139                trace.set_physical_compaction(upper_limit.borrow());
140            }
141        })
142        .as_collection()
143    }
144}