timely/dataflow/operators/vec/aggregation/state_machine.rs
1//! General purpose state transition operator.
2use std::hash::Hash;
3use std::collections::HashMap;
4
5use crate::ExchangeData;
6use crate::progress::Timestamp;
7use crate::dataflow::StreamVec;
8use crate::dataflow::operators::generic::operator::Operator;
9use crate::dataflow::channels::pact::Exchange;
10
11/// Provides the `state_machine` method.
12///
13/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
14/// Events are applied in time-order, but no other promises are made. Each state transition can
15/// produce output, which is sent.
16///
17/// `state_machine` will buffer inputs if earlier inputs may still arrive. it will directly apply
18/// updates for the current time reflected in the notificator, though. In the case of partially
19/// ordered times, the only guarantee is that updates are not applied out of order, not that there
20/// is some total order on times respecting the total order (updates may be interleaved).
21pub trait StateMachine<'scope, T: Timestamp, K: ExchangeData+Hash+Eq, V: ExchangeData> {
22 /// Tracks a state for each presented key, using user-supplied state transition logic.
23 ///
24 /// The transition logic `fold` may mutate the state, and produce both output records and
25 /// a `bool` indicating that it is appropriate to deregister the state, cleaning up once
26 /// the state is no longer helpful.
27 ///
28 /// # Examples
29 /// ```
30 /// use timely::dataflow::operators::{ToStream, Inspect};
31 /// use timely::dataflow::operators::vec::{Map, aggregation::StateMachine};
32 ///
33 /// timely::example(|scope| {
34 ///
35 /// // these results happen to be right, but aren't guaranteed.
36 /// // the system is at liberty to re-order within a timestamp.
37 /// let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
38 /// (1,1), (1,4), (1,9), (1,16), (1,25)];
39 ///
40 /// (0..10).to_stream(scope)
41 /// .map(|x| (x % 2, x))
42 /// .state_machine(
43 /// |_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
44 /// |key| *key as u64
45 /// )
46 /// .inspect(move |x| assert!(result.contains(x)));
47 /// });
48 /// ```
49 fn state_machine<
50 R: 'static, // output type
51 D: Default+'static, // per-key state (data)
52 I: IntoIterator<Item=R>, // type of output iterator
53 F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
54 H: Fn(&K)->u64+'static, // "hash" function for keys
55 >(self, fold: F, hash: H) -> StreamVec<'scope, T, R> where T : Hash+Eq ;
56}
57
58impl<'scope, T: Timestamp, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> StateMachine<'scope, T, K, V> for StreamVec<'scope, T, (K, V)> {
59 fn state_machine<
60 R: 'static, // output type
61 D: Default+'static, // per-key state (data)
62 I: IntoIterator<Item=R>, // type of output iterator
63 F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
64 H: Fn(&K)->u64+'static, // "hash" function for keys
65 >(self, fold: F, hash: H) -> StreamVec<'scope, T, R> where T : Hash+Eq {
66
67 let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
68 let mut states = HashMap::new(); // keys -> state
69
70 self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
71
72 // go through each time with data, process each (key, val) pair.
73 notificator.for_each(|time,_,_| {
74 if let Some(pend) = pending.remove(time.time()) {
75 let mut session = output.session(&time);
76 for (key, val) in pend {
77 let (remove, output) = {
78 let state = states.entry(key.clone()).or_insert_with(Default::default);
79 fold(&key, val, state)
80 };
81 if remove { states.remove(&key); }
82 session.give_iterator(output.into_iter());
83 }
84 }
85 });
86
87 // stash each input and request a notification when ready
88 input.for_each_time(|time, data| {
89
90 // stash if not time yet
91 if notificator.frontier(0).less_than(time.time()) {
92 for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); }
93 notificator.notify_at(time.retain(output.output_index()));
94 }
95 else {
96 // else we can process immediately
97 let mut session = output.session(&time);
98 for (key, val) in data.flat_map(|d| d.drain(..)) {
99 let (remove, output) = {
100 let state = states.entry(key.clone()).or_insert_with(Default::default);
101 fold(&key, val, state)
102 };
103 if remove { states.remove(&key); }
104 session.give_iterator(output.into_iter());
105 }
106 }
107 });
108 })
109 }
110}