1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! Operators acting on timestamps to logically delay records

use std::collections::HashMap;

use crate::Data;
use crate::order::{PartialOrder, TotalOrder};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;

/// Methods to advance the timestamps of records or batches of records.
pub trait Delay<G: Scope, D: Data> {

    /// Advances the timestamp of records using a supplied function.
    ///
    /// The function *must* advance the timestamp; the operator will test that the
    /// new timestamp is greater or equal to the old timestamp, and will assert if
    /// it is not.
    ///
    /// # Examples
    ///
    /// The following example takes the sequence `0..10` at time `0`
    /// and delays each element `i` to time `i`.
    ///
    /// ```
    /// use timely::dataflow::operators::{ToStream, Delay, Operator};
    /// use timely::dataflow::channels::pact::Pipeline;
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .delay(|data, time| *data)
    ///            .sink(Pipeline, "example", |input| {
    ///                input.for_each(|time, data| {
    ///                    println!("data at time: {:?}", time);
    ///                });
    ///            });
    /// });
    /// ```
    fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;

    /// Advances the timestamp of records using a supplied function.
    ///
    /// This method is a specialization of `delay` for when the timestamp is totally
    /// ordered. In this case, we can use a priority queue rather than an unsorted
    /// list to manage the potentially available timestamps.
    ///
    /// # Examples
    ///
    /// The following example takes the sequence `0..10` at time `0`
    /// and delays each element `i` to time `i`.
    ///
    /// ```
    /// use timely::dataflow::operators::{ToStream, Delay, Operator};
    /// use timely::dataflow::channels::pact::Pipeline;
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .delay(|data, time| *data)
    ///            .sink(Pipeline, "example", |input| {
    ///                input.for_each(|time, data| {
    ///                    println!("data at time: {:?}", time);
    ///                });
    ///            });
    /// });
    /// ```
    fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
    where G::Timestamp: TotalOrder;

    /// Advances the timestamp of batches of records using a supplied function.
    ///
    /// The operator will test that the new timestamp is greater or equal to the
    /// old timestamp, and will assert if it is not. The batch version does not
    /// consult the data, and may only view the timestamp itself.
    ///
    /// # Examples
    ///
    /// The following example takes the sequence `0..10` at time `0`
    /// and delays each batch (there is just one) to time `1`.
    ///
    /// ```
    /// use timely::dataflow::operators::{ToStream, Delay, Operator};
    /// use timely::dataflow::channels::pact::Pipeline;
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .delay_batch(|time| time + 1)
    ///            .sink(Pipeline, "example", |input| {
    ///                input.for_each(|time, data| {
    ///                    println!("data at time: {:?}", time);
    ///                });
    ///            });
    /// });
    /// ```
    fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
}

impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
    fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
        let mut elements = HashMap::new();
        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
            input.for_each(|time, data| {
                for datum in data.drain(..) {
                    let new_time = func(&datum, &time);
                    assert!(time.time().less_equal(&new_time));
                    elements.entry(new_time.clone())
                            .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
                            .push(datum);
                }
            });

            // for each available notification, send corresponding set
            notificator.for_each(|time,_,_| {
                if let Some(mut data) = elements.remove(&time) {
                    output.session(&time).give_iterator(data.drain(..));
                }
            });
        })
    }

    fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
    where G::Timestamp: TotalOrder
    {
        self.delay(func)
    }

    fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
        let mut elements = HashMap::new();
        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
            input.for_each(|time, data| {
                let new_time = func(&time);
                assert!(time.time().less_equal(&new_time));
                elements.entry(new_time.clone())
                        .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
                        .push(std::mem::take(data));
            });

            // for each available notification, send corresponding set
            notificator.for_each(|time,_,_| {
                if let Some(mut datas) = elements.remove(&time) {
                    for mut data in datas.drain(..) {
                        output.session(&time).give_container(&mut data);
                    }
                }
            });
        })
    }
}