Trait timely::dataflow::operators::aggregation::state_machine::StateMachine
source · pub trait StateMachine<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> {
// Required method
fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
hash: H,
) -> Stream<S, R>
where S::Timestamp: Hash + Eq;
}
Expand description
Generic state-transition machinery: each key has a state, and receives a sequence of events. Events are applied in time-order, but no other promises are made. Each state transition can produce output, which is sent.
state_machine
will buffer inputs if earlier inputs may still arrive. it will directly apply
updates for the current time reflected in the notificator, though. In the case of partially
ordered times, the only guarantee is that updates are not applied out of order, not that there
is some total order on times respecting the total order (updates may be interleaved).
Provides the state_machine
method.
Required Methods§
sourcefn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
hash: H,
) -> Stream<S, R>
fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>( &self, fold: F, hash: H, ) -> Stream<S, R>
Tracks a state for each presented key, using user-supplied state transition logic.
The transition logic fold
may mutate the state, and produce both output records and
a bool
indicating that it is appropriate to deregister the state, cleaning up once
the state is no longer helpful.
§Examples
use timely::dataflow::operators::{ToStream, Map, Inspect};
use timely::dataflow::operators::aggregation::StateMachine;
timely::example(|scope| {
// these results happen to be right, but aren't guaranteed.
// the system is at liberty to re-order within a timestamp.
let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
(1,1), (1,4), (1,9), (1,16), (1,25)];
(0..10).to_stream(scope)
.map(|x| (x % 2, x))
.state_machine(
|_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
|key| *key as u64
)
.inspect(move |x| assert!(result.contains(x)));
});