timely/dataflow/operators/aggregation/state_machine.rs
1//! General purpose state transition operator.
2use std::hash::Hash;
3use std::collections::HashMap;
4
5use crate::{Data, ExchangeData};
6use crate::dataflow::{Stream, Scope};
7use crate::dataflow::operators::generic::operator::Operator;
8use crate::dataflow::channels::pact::Exchange;
9
10/// Provides the `state_machine` method.
11///
12/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
13/// Events are applied in time-order, but no other promises are made. Each state transition can
14/// produce output, which is sent.
15///
16/// `state_machine` will buffer inputs if earlier inputs may still arrive. it will directly apply
17/// updates for the current time reflected in the notificator, though. In the case of partially
18/// ordered times, the only guarantee is that updates are not applied out of order, not that there
19/// is some total order on times respecting the total order (updates may be interleaved).
20pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
21 /// Tracks a state for each presented key, using user-supplied state transition logic.
22 ///
23 /// The transition logic `fold` may mutate the state, and produce both output records and
24 /// a `bool` indicating that it is appropriate to deregister the state, cleaning up once
25 /// the state is no longer helpful.
26 ///
27 /// # Examples
28 /// ```
29 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
30 /// use timely::dataflow::operators::aggregation::StateMachine;
31 ///
32 /// timely::example(|scope| {
33 ///
34 /// // these results happen to be right, but aren't guaranteed.
35 /// // the system is at liberty to re-order within a timestamp.
36 /// let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
37 /// (1,1), (1,4), (1,9), (1,16), (1,25)];
38 ///
39 /// (0..10).to_stream(scope)
40 /// .map(|x| (x % 2, x))
41 /// .state_machine(
42 /// |_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
43 /// |key| *key as u64
44 /// )
45 /// .inspect(move |x| assert!(result.contains(x)));
46 /// });
47 /// ```
48 fn state_machine<
49 R: Data, // output type
50 D: Default+'static, // per-key state (data)
51 I: IntoIterator<Item=R>, // type of output iterator
52 F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
53 H: Fn(&K)->u64+'static, // "hash" function for keys
54 >(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
55}
56
57impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
58 fn state_machine<
59 R: Data, // output type
60 D: Default+'static, // per-key state (data)
61 I: IntoIterator<Item=R>, // type of output iterator
62 F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
63 H: Fn(&K)->u64+'static, // "hash" function for keys
64 >(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
65
66 let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
67 let mut states = HashMap::new(); // keys -> state
68
69 self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
70
71 // go through each time with data, process each (key, val) pair.
72 notificator.for_each(|time,_,_| {
73 if let Some(pend) = pending.remove(time.time()) {
74 let mut session = output.session(&time);
75 for (key, val) in pend {
76 let (remove, output) = {
77 let state = states.entry(key.clone()).or_insert_with(Default::default);
78 fold(&key, val, state)
79 };
80 if remove { states.remove(&key); }
81 session.give_iterator(output.into_iter());
82 }
83 }
84 });
85
86 // stash each input and request a notification when ready
87 input.for_each(|time, data| {
88
89 // stash if not time yet
90 if notificator.frontier(0).less_than(time.time()) {
91 pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data);
92 notificator.notify_at(time.retain());
93 }
94 else {
95 // else we can process immediately
96 let mut session = output.session(&time);
97 for (key, val) in data.drain(..) {
98 let (remove, output) = {
99 let state = states.entry(key.clone()).or_insert_with(Default::default);
100 fold(&key, val, state)
101 };
102 if remove { states.remove(&key); }
103 session.give_iterator(output.into_iter());
104 }
105 }
106 });
107 })
108 }
109}