pub trait StateMachine<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> {
    // Required method
    fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>(
        &self,
        fold: F,
        hash: H,
    ) -> Stream<S, R>
       where S::Timestamp: Hash + Eq;
}
Expand description

Generic state-transition machinery: each key has a state, and receives a sequence of events. Events are applied in time-order, but no other promises are made. Each state transition can produce output, which is sent.

state_machine will buffer inputs if earlier inputs may still arrive. it will directly apply updates for the current time reflected in the notificator, though. In the case of partially ordered times, the only guarantee is that updates are not applied out of order, not that there is some total order on times respecting the total order (updates may be interleaved). Provides the state_machine method.

Required Methods§

source

fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>( &self, fold: F, hash: H, ) -> Stream<S, R>
where S::Timestamp: Hash + Eq,

Tracks a state for each presented key, using user-supplied state transition logic.

The transition logic fold may mutate the state, and produce both output records and a bool indicating that it is appropriate to deregister the state, cleaning up once the state is no longer helpful.

§Examples
use timely::dataflow::operators::{ToStream, Map, Inspect};
use timely::dataflow::operators::aggregation::StateMachine;

timely::example(|scope| {

    // these results happen to be right, but aren't guaranteed.
    // the system is at liberty to re-order within a timestamp.
    let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
                      (1,1), (1,4), (1,9), (1,16), (1,25)];

        (0..10).to_stream(scope)
               .map(|x| (x % 2, x))
               .state_machine(
                   |_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
                   |key| *key as u64
               )
               .inspect(move |x| assert!(result.contains(x)));
});

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)>