1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
//! General purpose state transition operator.
use std::hash::Hash;
use std::collections::HashMap;
use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;
/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
/// Events are applied in time-order, but no other promises are made. Each state transition can
/// produce output, which is sent.
///
/// `state_machine` will buffer inputs if earlier inputs may still arrive. it will directly apply
/// updates for the current time reflected in the notificator, though. In the case of partially
/// ordered times, the only guarantee is that updates are not applied out of order, not that there
/// is some total order on times respecting the total order (updates may be interleaved).
/// Provides the `state_machine` method.
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
/// Tracks a state for each presented key, using user-supplied state transition logic.
///
/// The transition logic `fold` may mutate the state, and produce both output records and
/// a `bool` indicating that it is appropriate to deregister the state, cleaning up once
/// the state is no longer helpful.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Map, Inspect};
/// use timely::dataflow::operators::aggregation::StateMachine;
///
/// timely::example(|scope| {
///
/// // these results happen to be right, but aren't guaranteed.
/// // the system is at liberty to re-order within a timestamp.
/// let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
/// (1,1), (1,4), (1,9), (1,16), (1,25)];
///
/// (0..10).to_stream(scope)
/// .map(|x| (x % 2, x))
/// .state_machine(
/// |_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
/// |key| *key as u64
/// )
/// .inspect(move |x| assert!(result.contains(x)));
/// });
/// ```
fn state_machine<
R: Data, // output type
D: Default+'static, // per-key state (data)
I: IntoIterator<Item=R>, // type of output iterator
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
H: Fn(&K)->u64+'static, // "hash" function for keys
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
}
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
fn state_machine<
R: Data, // output type
D: Default+'static, // per-key state (data)
I: IntoIterator<Item=R>, // type of output iterator
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
H: Fn(&K)->u64+'static, // "hash" function for keys
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
let mut states = HashMap::new(); // keys -> state
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
// go through each time with data, process each (key, val) pair.
notificator.for_each(|time,_,_| {
if let Some(pend) = pending.remove(time.time()) {
let mut session = output.session(&time);
for (key, val) in pend {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
// stash each input and request a notification when ready
input.for_each(|time, data| {
// stash if not time yet
if notificator.frontier(0).less_than(time.time()) {
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
notificator.notify_at(time.retain());
}
else {
// else we can process immediately
let mut session = output.session(&time);
for (key, val) in data.drain(..) {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
})
}
}