Skip to main content

timely/dataflow/operators/vec/
delay.rs

1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::order::TotalOrder;
6use crate::progress::Timestamp;
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::StreamVec;
9use crate::dataflow::operators::generic::operator::Operator;
10
11/// Methods to advance the timestamps of records or batches of records.
12pub trait Delay<T: Timestamp, D: 'static> {
13
14    /// Advances the timestamp of records using a supplied function.
15    ///
16    /// The function *must* advance the timestamp; the operator will test that the
17    /// new timestamp is greater or equal to the old timestamp, and will assert if
18    /// it is not.
19    ///
20    /// # Examples
21    ///
22    /// The following example takes the sequence `0..10` at time `0`
23    /// and delays each element `i` to time `i`.
24    ///
25    /// ```
26    /// use timely::dataflow::operators::{ToStream, Operator};
27    /// use timely::dataflow::operators::vec::Delay;
28    /// use timely::dataflow::channels::pact::Pipeline;
29    ///
30    /// timely::example(|scope| {
31    ///     (0..10).to_stream(scope)
32    ///            .delay(|data, time| *data)
33    ///            .sink(Pipeline, "example", |(input, frontier)| {
34    ///                input.for_each_time(|time, data| {
35    ///                    println!("data at time: {:?}", time);
36    ///                });
37    ///            });
38    /// });
39    /// ```
40    fn delay<L: FnMut(&D, &T)->T+'static>(self, func: L) -> Self;
41
42    /// Advances the timestamp of records using a supplied function.
43    ///
44    /// This method is a specialization of `delay` for when the timestamp is totally
45    /// ordered. In this case, we can use a priority queue rather than an unsorted
46    /// list to manage the potentially available timestamps.
47    ///
48    /// # Examples
49    ///
50    /// The following example takes the sequence `0..10` at time `0`
51    /// and delays each element `i` to time `i`.
52    ///
53    /// ```
54    /// use timely::dataflow::operators::{ToStream, Operator};
55    /// use timely::dataflow::operators::vec::Delay;
56    /// use timely::dataflow::channels::pact::Pipeline;
57    ///
58    /// timely::example(|scope| {
59    ///     (0..10).to_stream(scope)
60    ///            .delay(|data, time| *data)
61    ///            .sink(Pipeline, "example", |(input, frontier)| {
62    ///                input.for_each_time(|time, data| {
63    ///                    println!("data at time: {:?}", time);
64    ///                });
65    ///            });
66    /// });
67    /// ```
68    fn delay_total<L: FnMut(&D, &T)->T+'static>(self, func: L) -> Self
69    where T: TotalOrder;
70
71    /// Advances the timestamp of batches of records using a supplied function.
72    ///
73    /// The operator will test that the new timestamp is greater or equal to the
74    /// old timestamp, and will assert if it is not. The batch version does not
75    /// consult the data, and may only view the timestamp itself.
76    ///
77    /// # Examples
78    ///
79    /// The following example takes the sequence `0..10` at time `0`
80    /// and delays each batch (there is just one) to time `1`.
81    ///
82    /// ```
83    /// use timely::dataflow::operators::{ToStream, Operator};
84    /// use timely::dataflow::operators::vec::Delay;
85    /// use timely::dataflow::channels::pact::Pipeline;
86    ///
87    /// timely::example(|scope| {
88    ///     (0..10).to_stream(scope)
89    ///            .delay_batch(|time| time + 1)
90    ///            .sink(Pipeline, "example", |(input, frontier)| {
91    ///                input.for_each_time(|time, data| {
92    ///                    println!("data at time: {:?}", time);
93    ///                });
94    ///            });
95    /// });
96    /// ```
97    fn delay_batch<L: FnMut(&T)->T+'static>(self, func: L) -> Self;
98}
99
100impl<T: Timestamp + ::std::hash::Hash, D: 'static> Delay<T, D> for StreamVec<'_, T, D> {
101    fn delay<L: FnMut(&D, &T)->T+'static>(self, mut func: L) -> Self {
102        let mut elements = HashMap::new();
103        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
104            input.for_each_time(|time, data| {
105                for datum in data.flat_map(|d| d.drain(..)) {
106                    let new_time = func(&datum, &time);
107                    assert!(time.time().less_equal(&new_time));
108                    elements.entry(new_time.clone())
109                            .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
110                            .push(datum);
111                }
112            });
113
114            // for each available notification, send corresponding set
115            notificator.for_each(|time,_,_| {
116                if let Some(mut data) = elements.remove(&time) {
117                    output.session(&time).give_iterator(data.drain(..));
118                }
119            });
120        })
121    }
122
123    fn delay_total<L: FnMut(&D, &T)->T+'static>(self, func: L) -> Self
124    where T: TotalOrder
125    {
126        self.delay(func)
127    }
128
129    fn delay_batch<L: FnMut(&T)->T+'static>(self, mut func: L) -> Self {
130        let mut elements = HashMap::new();
131        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
132            input.for_each_time(|time, data| {
133                let new_time = func(&time);
134                assert!(time.time().less_equal(&new_time));
135                elements.entry(new_time.clone())
136                        .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
137                        .extend(data.map(std::mem::take));
138            });
139
140            // for each available notification, send corresponding set
141            notificator.for_each(|time,_,_| {
142                if let Some(mut datas) = elements.remove(&time) {
143                    for mut data in datas.drain(..) {
144                        output.session(&time).give_container(&mut data);
145                    }
146                }
147            });
148        })
149    }
150}