Skip to main content

timely/dataflow/operators/vec/
delay.rs

1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::order::{PartialOrder, TotalOrder};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::{StreamVec, Scope};
8use crate::dataflow::operators::generic::operator::Operator;
9
10/// Methods to advance the timestamps of records or batches of records.
11pub trait Delay<G: Scope, D: 'static> {
12
13    /// Advances the timestamp of records using a supplied function.
14    ///
15    /// The function *must* advance the timestamp; the operator will test that the
16    /// new timestamp is greater or equal to the old timestamp, and will assert if
17    /// it is not.
18    ///
19    /// # Examples
20    ///
21    /// The following example takes the sequence `0..10` at time `0`
22    /// and delays each element `i` to time `i`.
23    ///
24    /// ```
25    /// use timely::dataflow::operators::{ToStream, Operator};
26    /// use timely::dataflow::operators::vec::Delay;
27    /// use timely::dataflow::channels::pact::Pipeline;
28    ///
29    /// timely::example(|scope| {
30    ///     (0..10).to_stream(scope)
31    ///            .delay(|data, time| *data)
32    ///            .sink(Pipeline, "example", |(input, frontier)| {
33    ///                input.for_each_time(|time, data| {
34    ///                    println!("data at time: {:?}", time);
35    ///                });
36    ///            });
37    /// });
38    /// ```
39    fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self;
40
41    /// Advances the timestamp of records using a supplied function.
42    ///
43    /// This method is a specialization of `delay` for when the timestamp is totally
44    /// ordered. In this case, we can use a priority queue rather than an unsorted
45    /// list to manage the potentially available timestamps.
46    ///
47    /// # Examples
48    ///
49    /// The following example takes the sequence `0..10` at time `0`
50    /// and delays each element `i` to time `i`.
51    ///
52    /// ```
53    /// use timely::dataflow::operators::{ToStream, Operator};
54    /// use timely::dataflow::operators::vec::Delay;
55    /// use timely::dataflow::channels::pact::Pipeline;
56    ///
57    /// timely::example(|scope| {
58    ///     (0..10).to_stream(scope)
59    ///            .delay(|data, time| *data)
60    ///            .sink(Pipeline, "example", |(input, frontier)| {
61    ///                input.for_each_time(|time, data| {
62    ///                    println!("data at time: {:?}", time);
63    ///                });
64    ///            });
65    /// });
66    /// ```
67    fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self
68    where G::Timestamp: TotalOrder;
69
70    /// Advances the timestamp of batches of records using a supplied function.
71    ///
72    /// The operator will test that the new timestamp is greater or equal to the
73    /// old timestamp, and will assert if it is not. The batch version does not
74    /// consult the data, and may only view the timestamp itself.
75    ///
76    /// # Examples
77    ///
78    /// The following example takes the sequence `0..10` at time `0`
79    /// and delays each batch (there is just one) to time `1`.
80    ///
81    /// ```
82    /// use timely::dataflow::operators::{ToStream, Operator};
83    /// use timely::dataflow::operators::vec::Delay;
84    /// use timely::dataflow::channels::pact::Pipeline;
85    ///
86    /// timely::example(|scope| {
87    ///     (0..10).to_stream(scope)
88    ///            .delay_batch(|time| time + 1)
89    ///            .sink(Pipeline, "example", |(input, frontier)| {
90    ///                input.for_each_time(|time, data| {
91    ///                    println!("data at time: {:?}", time);
92    ///                });
93    ///            });
94    /// });
95    /// ```
96    fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self;
97}
98
99impl<G: Scope<Timestamp: ::std::hash::Hash>, D: 'static> Delay<G, D> for StreamVec<G, D> {
100    fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, mut func: L) -> Self {
101        let mut elements = HashMap::new();
102        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
103            input.for_each_time(|time, data| {
104                for datum in data.flat_map(|d| d.drain(..)) {
105                    let new_time = func(&datum, &time);
106                    assert!(time.time().less_equal(&new_time));
107                    elements.entry(new_time.clone())
108                            .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
109                            .push(datum);
110                }
111            });
112
113            // for each available notification, send corresponding set
114            notificator.for_each(|time,_,_| {
115                if let Some(mut data) = elements.remove(&time) {
116                    output.session(&time).give_iterator(data.drain(..));
117                }
118            });
119        })
120    }
121
122    fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self
123    where G::Timestamp: TotalOrder
124    {
125        self.delay(func)
126    }
127
128    fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(self, mut func: L) -> Self {
129        let mut elements = HashMap::new();
130        self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
131            input.for_each_time(|time, data| {
132                let new_time = func(&time);
133                assert!(time.time().less_equal(&new_time));
134                elements.entry(new_time.clone())
135                        .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
136                        .extend(data.map(std::mem::take));
137            });
138
139            // for each available notification, send corresponding set
140            notificator.for_each(|time,_,_| {
141                if let Some(mut datas) = elements.remove(&time) {
142                    for mut data in datas.drain(..) {
143                        output.session(&time).give_container(&mut data);
144                    }
145                }
146            });
147        })
148    }
149}