1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//! General purpose state transition operator.
use std::hash::Hash;
use std::collections::HashMap;

use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;

/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
/// Events are applied in time-order, but no other promises are made. Each state transition can
/// produce output, which is sent.
///
/// `state_machine` will buffer inputs if earlier inputs may still arrive. it will directly apply
/// updates for the current time reflected in the notificator, though. In the case of partially
/// ordered times, the only guarantee is that updates are not applied out of order, not that there
/// is some total order on times respecting the total order (updates may be interleaved).

/// Provides the `state_machine` method.
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
    /// Tracks a state for each presented key, using user-supplied state transition logic.
    ///
    /// The transition logic `fold` may mutate the state, and produce both output records and
    /// a `bool` indicating that it is appropriate to deregister the state, cleaning up once
    /// the state is no longer helpful.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
    /// use timely::dataflow::operators::aggregation::StateMachine;
    ///
    /// timely::example(|scope| {
    ///
    ///     // these results happen to be right, but aren't guaranteed.
    ///     // the system is at liberty to re-order within a timestamp.
    ///     let result = vec![(0,0), (0,2), (0,6), (0,12), (0,20),
    ///                       (1,1), (1,4), (1,9), (1,16), (1,25)];
    ///
    ///         (0..10).to_stream(scope)
    ///                .map(|x| (x % 2, x))
    ///                .state_machine(
    ///                    |_key, val, agg| { *agg += val; (false, Some((*_key, *agg))) },
    ///                    |key| *key as u64
    ///                )
    ///                .inspect(move |x| assert!(result.contains(x)));
    /// });
    /// ```
    fn state_machine<
        R: Data,                                    // output type
        D: Default+'static,                         // per-key state (data)
        I: IntoIterator<Item=R>,                    // type of output iterator
        F: Fn(&K, V, &mut D)->(bool, I)+'static,    // state update logic
        H: Fn(&K)->u64+'static,                     // "hash" function for keys
    >(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
}

impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
    fn state_machine<
            R: Data,                                    // output type
            D: Default+'static,                         // per-key state (data)
            I: IntoIterator<Item=R>,                    // type of output iterator
            F: Fn(&K, V, &mut D)->(bool, I)+'static,    // state update logic
            H: Fn(&K)->u64+'static,                     // "hash" function for keys
        >(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {

        let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new();   // times -> (keys -> state)
        let mut states = HashMap::new();    // keys -> state

        self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {

            // go through each time with data, process each (key, val) pair.
            notificator.for_each(|time,_,_| {
                if let Some(pend) = pending.remove(time.time()) {
                    let mut session = output.session(&time);
                    for (key, val) in pend {
                        let (remove, output) = {
                            let state = states.entry(key.clone()).or_insert_with(Default::default);
                            fold(&key, val, state)
                        };
                        if remove { states.remove(&key); }
                        session.give_iterator(output.into_iter());
                    }
                }
            });

            // stash each input and request a notification when ready
            input.for_each(|time, data| {

                // stash if not time yet
                if notificator.frontier(0).less_than(time.time()) {
                    pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
                    notificator.notify_at(time.retain());
                }
                else {
                    // else we can process immediately
                    let mut session = output.session(&time);
                    for (key, val) in data.drain(..) {
                        let (remove, output) = {
                            let state = states.entry(key.clone()).or_insert_with(Default::default);
                            fold(&key, val, state)
                        };
                        if remove { states.remove(&key); }
                        session.give_iterator(output.into_iter());
                    }
                }
            });
        })
    }
}