timely/dataflow/operators/vec/delay.rs
1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::order::TotalOrder;
6use crate::progress::Timestamp;
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::StreamVec;
9use crate::dataflow::operators::generic::operator::Operator;
10
11/// Methods to advance the timestamps of records or batches of records.
12pub trait Delay<T: Timestamp, D: 'static> {
13
14 /// Advances the timestamp of records using a supplied function.
15 ///
16 /// The function *must* advance the timestamp; the operator will test that the
17 /// new timestamp is greater or equal to the old timestamp, and will assert if
18 /// it is not.
19 ///
20 /// # Examples
21 ///
22 /// The following example takes the sequence `0..10` at time `0`
23 /// and delays each element `i` to time `i`.
24 ///
25 /// ```
26 /// use timely::dataflow::operators::{ToStream, Operator};
27 /// use timely::dataflow::operators::vec::Delay;
28 /// use timely::dataflow::channels::pact::Pipeline;
29 ///
30 /// timely::example(|scope| {
31 /// (0..10).to_stream(scope)
32 /// .delay(|data, time| *data)
33 /// .sink(Pipeline, "example", |(input, frontier)| {
34 /// input.for_each_time(|time, data| {
35 /// println!("data at time: {:?}", time);
36 /// });
37 /// });
38 /// });
39 /// ```
40 fn delay<L: FnMut(&D, &T)->T+'static>(self, func: L) -> Self;
41
42 /// Advances the timestamp of records using a supplied function.
43 ///
44 /// This method is a specialization of `delay` for when the timestamp is totally
45 /// ordered. In this case, we can use a priority queue rather than an unsorted
46 /// list to manage the potentially available timestamps.
47 ///
48 /// # Examples
49 ///
50 /// The following example takes the sequence `0..10` at time `0`
51 /// and delays each element `i` to time `i`.
52 ///
53 /// ```
54 /// use timely::dataflow::operators::{ToStream, Operator};
55 /// use timely::dataflow::operators::vec::Delay;
56 /// use timely::dataflow::channels::pact::Pipeline;
57 ///
58 /// timely::example(|scope| {
59 /// (0..10).to_stream(scope)
60 /// .delay(|data, time| *data)
61 /// .sink(Pipeline, "example", |(input, frontier)| {
62 /// input.for_each_time(|time, data| {
63 /// println!("data at time: {:?}", time);
64 /// });
65 /// });
66 /// });
67 /// ```
68 fn delay_total<L: FnMut(&D, &T)->T+'static>(self, func: L) -> Self
69 where T: TotalOrder;
70
71 /// Advances the timestamp of batches of records using a supplied function.
72 ///
73 /// The operator will test that the new timestamp is greater or equal to the
74 /// old timestamp, and will assert if it is not. The batch version does not
75 /// consult the data, and may only view the timestamp itself.
76 ///
77 /// # Examples
78 ///
79 /// The following example takes the sequence `0..10` at time `0`
80 /// and delays each batch (there is just one) to time `1`.
81 ///
82 /// ```
83 /// use timely::dataflow::operators::{ToStream, Operator};
84 /// use timely::dataflow::operators::vec::Delay;
85 /// use timely::dataflow::channels::pact::Pipeline;
86 ///
87 /// timely::example(|scope| {
88 /// (0..10).to_stream(scope)
89 /// .delay_batch(|time| time + 1)
90 /// .sink(Pipeline, "example", |(input, frontier)| {
91 /// input.for_each_time(|time, data| {
92 /// println!("data at time: {:?}", time);
93 /// });
94 /// });
95 /// });
96 /// ```
97 fn delay_batch<L: FnMut(&T)->T+'static>(self, func: L) -> Self;
98}
99
100impl<T: Timestamp + ::std::hash::Hash, D: 'static> Delay<T, D> for StreamVec<'_, T, D> {
101 fn delay<L: FnMut(&D, &T)->T+'static>(self, mut func: L) -> Self {
102 let mut elements = HashMap::new();
103 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
104 input.for_each_time(|time, data| {
105 for datum in data.flat_map(|d| d.drain(..)) {
106 let new_time = func(&datum, &time);
107 assert!(time.time().less_equal(&new_time));
108 elements.entry(new_time.clone())
109 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
110 .push(datum);
111 }
112 });
113
114 // for each available notification, send corresponding set
115 notificator.for_each(|time,_,_| {
116 if let Some(mut data) = elements.remove(&time) {
117 output.session(&time).give_iterator(data.drain(..));
118 }
119 });
120 })
121 }
122
123 fn delay_total<L: FnMut(&D, &T)->T+'static>(self, func: L) -> Self
124 where T: TotalOrder
125 {
126 self.delay(func)
127 }
128
129 fn delay_batch<L: FnMut(&T)->T+'static>(self, mut func: L) -> Self {
130 let mut elements = HashMap::new();
131 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
132 input.for_each_time(|time, data| {
133 let new_time = func(&time);
134 assert!(time.time().less_equal(&new_time));
135 elements.entry(new_time.clone())
136 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
137 .extend(data.map(std::mem::take));
138 });
139
140 // for each available notification, send corresponding set
141 notificator.for_each(|time,_,_| {
142 if let Some(mut datas) = elements.remove(&time) {
143 for mut data in datas.drain(..) {
144 output.session(&time).give_container(&mut data);
145 }
146 }
147 });
148 })
149 }
150}