timely/dataflow/operators/aggregation/aggregate.rs
1//! General purpose intra-timestamp aggregation
2use std::hash::Hash;
3use std::collections::HashMap;
4
5use crate::{Data, ExchangeData};
6use crate::dataflow::{Stream, Scope};
7use crate::dataflow::operators::generic::operator::Operator;
8use crate::dataflow::channels::pact::Exchange;
9
10/// Generic intra-timestamp aggregation
11///
12/// Extension method supporting aggregation of keyed data within timestamp.
13/// For inter-timestamp aggregation, consider `StateMachine`.
14pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
15    /// Aggregates data of the form `(key, val)`, using user-supplied logic.
16    ///
17    /// The `aggregate` method is implemented for streams of `(K, V)` data,
18    /// and takes functions `fold`, `emit`, and `hash`; used to combine new `V`
19    /// data with existing `D` state, to produce `R` output from `D` state, and
20    /// to route `K` keys, respectively.
21    ///
22    /// Aggregation happens within each time, and results are produced once the
23    /// time is complete.
24    ///
25    /// # Examples
26    /// ```
27    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
28    /// use timely::dataflow::operators::aggregation::Aggregate;
29    ///
30    /// timely::example(|scope| {
31    ///
32    ///     (0..10).to_stream(scope)
33    ///         .map(|x| (x % 2, x))
34    ///         .aggregate(
35    ///             |_key, val, agg| { *agg += val; },
36    ///             |key, agg: i32| (key, agg),
37    ///             |key| *key as u64
38    ///         )
39    ///         .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25)));
40    /// });
41    /// ```
42    ///
43    /// By changing the type of the aggregate value, one can accumulate into different types.
44    /// Here we accumulate the data into a `Vec<i32>` and report its length (which we could
45    /// obviously do more efficiently; imagine we were doing a hash instead).
46    ///
47    /// ```
48    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
49    /// use timely::dataflow::operators::aggregation::Aggregate;
50    ///
51    /// timely::example(|scope| {
52    ///
53    ///     (0..10).to_stream(scope)
54    ///         .map(|x| (x % 2, x))
55    ///         .aggregate::<_,Vec<i32>,_,_,_>(
56    ///             |_key, val, agg| { agg.push(val); },
57    ///             |key, agg| (key, agg.len()),
58    ///             |key| *key as u64
59    ///         )
60    ///         .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5)));
61    /// });
62    /// ```
63    fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
64        &self,
65        fold: F,
66        emit: E,
67        hash: H) -> Stream<S, R> where S::Timestamp: Eq;
68}
69
70impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {
71
72    fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
73        &self,
74        fold: F,
75        emit: E,
76        hash: H) -> Stream<S, R> where S::Timestamp: Eq {
77
78        let mut aggregates = HashMap::new();
79        self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
80
81            // read each input, fold into aggregates
82            input.for_each(|time, data| {
83                let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
84                for (key, val) in data.drain(..) {
85                    let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
86                    fold(&key, val, agg);
87                }
88                notificator.notify_at(time.retain());
89            });
90
91            // pop completed aggregates, send along whatever
92            notificator.for_each(|time,_,_| {
93                if let Some(aggs) = aggregates.remove(time.time()) {
94                    let mut session = output.session(&time);
95                    for (key, agg) in aggs {
96                        session.give(emit(key, agg));
97                    }
98                }
99            });
100        })
101
102    }
103}