Skip to main content

timely/dataflow/operators/vec/aggregation/
state_machine.rs

1//! General purpose state transition operator.
2use std::hash::Hash;
3use std::collections::HashMap;
4
5use crate::ExchangeData;
6use crate::progress::Timestamp;
7use crate::dataflow::StreamVec;
8use crate::dataflow::operators::generic::operator::Operator;
9use crate::dataflow::channels::pact::Exchange;
10
11/// Provides the `state_machine` method.
12///
13/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
14/// Events are applied in time-order, but no other promises are made. Each state transition can
15/// produce output, which is sent.
16///
17/// `state_machine` will buffer inputs if earlier inputs may still arrive. it will directly apply
18/// updates for the current time reflected in the notificator, though. In the case of partially
19/// ordered times, the only guarantee is that updates are not applied out of order, not that there
20/// is some total order on times respecting the total order (updates may be interleaved).
21pub trait StateMachine<'scope, T: Timestamp, K: ExchangeData+Hash+Eq, V: ExchangeData> {
22    /// Tracks a state for each presented key, using user-supplied state transition logic.
23    ///
24    /// The transition logic `fold` may mutate the state, and produce both output records and
25    /// a `bool` indicating that it is appropriate to deregister the state, cleaning up once
26    /// the state is no longer helpful.
27    ///
28    /// # Examples
29    /// ```
30    /// use timely::dataflow::operators::{ToStream, Inspect};
31    /// use timely::dataflow::operators::vec::{Map, aggregation::StateMachine};
32    ///
33    /// timely::example(|scope| {
34    ///
35    ///     // these results happen to be right, but aren't guaranteed.
36    ///     // the system is at liberty to re-order within a timestamp.
37    ///     let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
38    ///                       (1,1), (1,4), (1,9), (1,16), (1,25)];
39    ///
40    ///         (0..10).to_stream(scope)
41    ///                .map(|x| (x % 2, x))
42    ///                .state_machine(
43    ///                    |_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
44    ///                    |key| *key as u64
45    ///                )
46    ///                .inspect(move |x| assert!(result.contains(x)));
47    /// });
48    /// ```
49    fn state_machine<
50        R: 'static,                                 // output type
51        D: Default+'static,                         // per-key state (data)
52        I: IntoIterator<Item=R>,                    // type of output iterator
53        F: Fn(&K, V, &mut D)->(bool, I)+'static,    // state update logic
54        H: Fn(&K)->u64+'static,                     // "hash" function for keys
55    >(self, fold: F, hash: H) -> StreamVec<'scope, T, R> where T : Hash+Eq ;
56}
57
58impl<'scope, T: Timestamp, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> StateMachine<'scope, T, K, V> for StreamVec<'scope, T, (K, V)> {
59    fn state_machine<
60            R: 'static,                                 // output type
61            D: Default+'static,                         // per-key state (data)
62            I: IntoIterator<Item=R>,                    // type of output iterator
63            F: Fn(&K, V, &mut D)->(bool, I)+'static,    // state update logic
64            H: Fn(&K)->u64+'static,                     // "hash" function for keys
65        >(self, fold: F, hash: H) -> StreamVec<'scope, T, R> where T : Hash+Eq {
66
67        let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new();   // times -> (keys -> state)
68        let mut states = HashMap::new();    // keys -> state
69
70        self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
71
72            // go through each time with data, process each (key, val) pair.
73            notificator.for_each(|time,_,_| {
74                if let Some(pend) = pending.remove(time.time()) {
75                    let mut session = output.session(&time);
76                    for (key, val) in pend {
77                        let (remove, output) = {
78                            let state = states.entry(key.clone()).or_insert_with(Default::default);
79                            fold(&key, val, state)
80                        };
81                        if remove { states.remove(&key); }
82                        session.give_iterator(output.into_iter());
83                    }
84                }
85            });
86
87            // stash each input and request a notification when ready
88            input.for_each_time(|time, data| {
89
90                // stash if not time yet
91                if notificator.frontier(0).less_than(time.time()) {
92                    for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); }
93                    notificator.notify_at(time.retain(output.output_index()));
94                }
95                else {
96                    // else we can process immediately
97                    let mut session = output.session(&time);
98                    for (key, val) in data.flat_map(|d| d.drain(..)) {
99                        let (remove, output) = {
100                            let state = states.entry(key.clone()).or_insert_with(Default::default);
101                            fold(&key, val, state)
102                        };
103                        if remove { states.remove(&key); }
104                        session.give_iterator(output.into_iter());
105                    }
106                }
107            });
108        })
109    }
110}