timely/dataflow/operators/vec/aggregation/aggregate.rs
1//! General purpose intra-timestamp aggregation
2use std::hash::Hash;
3use std::collections::HashMap;
4
5use crate::ExchangeData;
6use crate::progress::Timestamp;
7use crate::dataflow::StreamVec;
8use crate::dataflow::operators::generic::operator::Operator;
9use crate::dataflow::channels::pact::Exchange;
10
11/// Generic intra-timestamp aggregation
12///
13/// Extension method supporting aggregation of keyed data within timestamp.
14/// For inter-timestamp aggregation, consider `StateMachine`.
15pub trait Aggregate<'scope, T: Timestamp, K: ExchangeData+Hash, V: ExchangeData> {
16 /// Aggregates data of the form `(key, val)`, using user-supplied logic.
17 ///
18 /// The `aggregate` method is implemented for streams of `(K, V)` data,
19 /// and takes functions `fold`, `emit`, and `hash`; used to combine new `V`
20 /// data with existing `D` state, to produce `R` output from `D` state, and
21 /// to route `K` keys, respectively.
22 ///
23 /// Aggregation happens within each time, and results are produced once the
24 /// time is complete.
25 ///
26 /// # Examples
27 /// ```
28 /// use timely::dataflow::operators::{ToStream, Inspect};
29 /// use timely::dataflow::operators::vec::{Map, aggregation::Aggregate};
30 ///
31 /// timely::example(|scope| {
32 ///
33 /// (0..10).to_stream(scope)
34 /// .map(|x| (x % 2, x))
35 /// .aggregate(
36 /// |_key, val, agg| { *agg += val; },
37 /// |key, agg: i32| (key, agg),
38 /// |key| *key as u64
39 /// )
40 /// .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25)));
41 /// });
42 /// ```
43 ///
44 /// By changing the type of the aggregate value, one can accumulate into different types.
45 /// Here we accumulate the data into a `Vec<i32>` and report its length (which we could
46 /// obviously do more efficiently; imagine we were doing a hash instead).
47 ///
48 /// ```
49 /// use timely::dataflow::operators::{ToStream, Inspect};
50 /// use timely::dataflow::operators::vec::{Map, aggregation::Aggregate};
51 ///
52 /// timely::example(|scope| {
53 ///
54 /// (0..10)
55 /// .to_stream(scope)
56 /// .map(|x| (x % 2, x))
57 /// .aggregate::<_,Vec<i32>,_,_,_>(
58 /// |_key, val, agg| { agg.push(val); },
59 /// |key, agg| (key, agg.len()),
60 /// |key| *key as u64
61 /// )
62 /// .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5)));
63 /// });
64 /// ```
65 fn aggregate<R: 'static, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
66 self,
67 fold: F,
68 emit: E,
69 hash: H) -> StreamVec<'scope, T, R> where T: Eq;
70}
71
72impl<'scope, T: Timestamp + Hash, K: ExchangeData+Clone+Hash+Eq, V: ExchangeData> Aggregate<'scope, T, K, V> for StreamVec<'scope, T, (K, V)> {
73
74 fn aggregate<R: 'static, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
75 self,
76 fold: F,
77 emit: E,
78 hash: H) -> StreamVec<'scope, T, R> where T: Eq {
79
80 let mut aggregates = HashMap::new();
81 self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
82
83 // read each input, fold into aggregates
84 input.for_each_time(|time, data| {
85 let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
86 for (key, val) in data.flat_map(|d| d.drain(..)) {
87 let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
88 fold(&key, val, agg);
89 }
90 notificator.notify_at(time.retain(output.output_index()));
91 });
92
93 // pop completed aggregates, send along whatever
94 notificator.for_each(|time,_,_| {
95 if let Some(aggs) = aggregates.remove(time.time()) {
96 let mut session = output.session(&time);
97 for (key, agg) in aggs {
98 session.give(emit(key, agg));
99 }
100 }
101 });
102 })
103
104 }
105}