pub trait Aggregate<S: Scope, K: ExchangeData + Hash, V: ExchangeData> {
// Required method
fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, E: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
emit: E,
hash: H,
) -> Stream<S, R>
where S::Timestamp: Eq;
}
Expand description
Generic intra-timestamp aggregation
Extension method supporting aggregation of keyed data within timestamp.
For inter-timestamp aggregation, consider StateMachine
.
Required Methods§
sourcefn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, E: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
emit: E,
hash: H,
) -> Stream<S, R>
fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, E: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>( &self, fold: F, emit: E, hash: H, ) -> Stream<S, R>
Aggregates data of the form (key, val)
, using user-supplied logic.
The aggregate
method is implemented for streams of (K, V)
data,
and takes functions fold
, emit
, and hash
; used to combine new V
data with existing D
state, to produce R
output from D
state, and
to route K
keys, respectively.
Aggregation happens within each time, and results are produced once the time is complete.
§Examples
use timely::dataflow::operators::{ToStream, Map, Inspect};
use timely::dataflow::operators::aggregation::Aggregate;
timely::example(|scope| {
(0..10).to_stream(scope)
.map(|x| (x % 2, x))
.aggregate(
|_key, val, agg| { *agg += val; },
|key, agg: i32| (key, agg),
|key| *key as u64
)
.inspect(|x| assert!(*x == (0, 20) || *x == (1, 25)));
});
By changing the type of the aggregate value, one can accumulate into different types.
Here we accumulate the data into a Vec<i32>
and report its length (which we could
obviously do more efficiently; imagine we were doing a hash instead).
use timely::dataflow::operators::{ToStream, Map, Inspect};
use timely::dataflow::operators::aggregation::Aggregate;
timely::example(|scope| {
(0..10).to_stream(scope)
.map(|x| (x % 2, x))
.aggregate::<_,Vec<i32>,_,_,_>(
|_key, val, agg| { agg.push(val); },
|key, agg| (key, agg.len()),
|key| *key as u64
)
.inspect(|x| assert!(*x == (0, 5) || *x == (1, 5)));
});