timely/dataflow/operators/
delay.rs

1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::Data;
6use crate::order::{PartialOrder, TotalOrder};
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::{Stream, Scope};
9use crate::dataflow::operators::generic::operator::Operator;
10
11/// Methods to advance the timestamps of records or batches of records.
12pub trait Delay<G: Scope, D: Data> {
13
14    /// Advances the timestamp of records using a supplied function.
15    ///
16    /// The function *must* advance the timestamp; the operator will test that the
17    /// new timestamp is greater or equal to the old timestamp, and will assert if
18    /// it is not.
19    ///
20    /// # Examples
21    ///
22    /// The following example takes the sequence `0..10` at time `0`
23    /// and delays each element `i` to time `i`.
24    ///
25    /// ```
26    /// use timely::dataflow::operators::{ToStream, Delay, Operator};
27    /// use timely::dataflow::channels::pact::Pipeline;
28    ///
29    /// timely::example(|scope| {
30    ///     (0..10).to_stream(scope)
31    ///            .delay(|data, time| *data)
32    ///            .sink(Pipeline, "example", |input| {
33    ///                input.for_each(|time, data| {
34    ///                    println!("data at time: {:?}", time);
35    ///                });
36    ///            });
37    /// });
38    /// ```
39    fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
40
41    /// Advances the timestamp of records using a supplied function.
42    ///
43    /// This method is a specialization of `delay` for when the timestamp is totally
44    /// ordered. In this case, we can use a priority queue rather than an unsorted
45    /// list to manage the potentially available timestamps.
46    ///
47    /// # Examples
48    ///
49    /// The following example takes the sequence `0..10` at time `0`
50    /// and delays each element `i` to time `i`.
51    ///
52    /// ```
53    /// use timely::dataflow::operators::{ToStream, Delay, Operator};
54    /// use timely::dataflow::channels::pact::Pipeline;
55    ///
56    /// timely::example(|scope| {
57    ///     (0..10).to_stream(scope)
58    ///            .delay(|data, time| *data)
59    ///            .sink(Pipeline, "example", |input| {
60    ///                input.for_each(|time, data| {
61    ///                    println!("data at time: {:?}", time);
62    ///                });
63    ///            });
64    /// });
65    /// ```
66    fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
67    where G::Timestamp: TotalOrder;
68
69    /// Advances the timestamp of batches of records using a supplied function.
70    ///
71    /// The operator will test that the new timestamp is greater or equal to the
72    /// old timestamp, and will assert if it is not. The batch version does not
73    /// consult the data, and may only view the timestamp itself.
74    ///
75    /// # Examples
76    ///
77    /// The following example takes the sequence `0..10` at time `0`
78    /// and delays each batch (there is just one) to time `1`.
79    ///
80    /// ```
81    /// use timely::dataflow::operators::{ToStream, Delay, Operator};
82    /// use timely::dataflow::channels::pact::Pipeline;
83    ///
84    /// timely::example(|scope| {
85    ///     (0..10).to_stream(scope)
86    ///            .delay_batch(|time| time + 1)
87    ///            .sink(Pipeline, "example", |input| {
88    ///                input.for_each(|time, data| {
89    ///                    println!("data at time: {:?}", time);
90    ///                });
91    ///            });
92    /// });
93    /// ```
94    fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
95}
96
97impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
98    fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
99        let mut elements = HashMap::new();
100        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
101            input.for_each(|time, data| {
102                for datum in data.drain(..) {
103                    let new_time = func(&datum, &time);
104                    assert!(time.time().less_equal(&new_time));
105                    elements.entry(new_time.clone())
106                            .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
107                            .push(datum);
108                }
109            });
110
111            // for each available notification, send corresponding set
112            notificator.for_each(|time,_,_| {
113                if let Some(mut data) = elements.remove(&time) {
114                    output.session(&time).give_iterator(data.drain(..));
115                }
116            });
117        })
118    }
119
120    fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
121    where G::Timestamp: TotalOrder
122    {
123        self.delay(func)
124    }
125
126    fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
127        let mut elements = HashMap::new();
128        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
129            input.for_each(|time, data| {
130                let new_time = func(&time);
131                assert!(time.time().less_equal(&new_time));
132                elements.entry(new_time.clone())
133                        .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
134                        .push(std::mem::take(data));
135            });
136
137            // for each available notification, send corresponding set
138            notificator.for_each(|time,_,_| {
139                if let Some(mut datas) = elements.remove(&time) {
140                    for mut data in datas.drain(..) {
141                        output.session(&time).give_container(&mut data);
142                    }
143                }
144            });
145        })
146    }
147}