timely/dataflow/operators/vec/delay.rs
1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::order::{PartialOrder, TotalOrder};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::{StreamVec, Scope};
8use crate::dataflow::operators::generic::operator::Operator;
9
10/// Methods to advance the timestamps of records or batches of records.
11pub trait Delay<G: Scope, D: 'static> {
12
13 /// Advances the timestamp of records using a supplied function.
14 ///
15 /// The function *must* advance the timestamp; the operator will test that the
16 /// new timestamp is greater or equal to the old timestamp, and will assert if
17 /// it is not.
18 ///
19 /// # Examples
20 ///
21 /// The following example takes the sequence `0..10` at time `0`
22 /// and delays each element `i` to time `i`.
23 ///
24 /// ```
25 /// use timely::dataflow::operators::{ToStream, Operator};
26 /// use timely::dataflow::operators::vec::Delay;
27 /// use timely::dataflow::channels::pact::Pipeline;
28 ///
29 /// timely::example(|scope| {
30 /// (0..10).to_stream(scope)
31 /// .delay(|data, time| *data)
32 /// .sink(Pipeline, "example", |(input, frontier)| {
33 /// input.for_each_time(|time, data| {
34 /// println!("data at time: {:?}", time);
35 /// });
36 /// });
37 /// });
38 /// ```
39 fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self;
40
41 /// Advances the timestamp of records using a supplied function.
42 ///
43 /// This method is a specialization of `delay` for when the timestamp is totally
44 /// ordered. In this case, we can use a priority queue rather than an unsorted
45 /// list to manage the potentially available timestamps.
46 ///
47 /// # Examples
48 ///
49 /// The following example takes the sequence `0..10` at time `0`
50 /// and delays each element `i` to time `i`.
51 ///
52 /// ```
53 /// use timely::dataflow::operators::{ToStream, Operator};
54 /// use timely::dataflow::operators::vec::Delay;
55 /// use timely::dataflow::channels::pact::Pipeline;
56 ///
57 /// timely::example(|scope| {
58 /// (0..10).to_stream(scope)
59 /// .delay(|data, time| *data)
60 /// .sink(Pipeline, "example", |(input, frontier)| {
61 /// input.for_each_time(|time, data| {
62 /// println!("data at time: {:?}", time);
63 /// });
64 /// });
65 /// });
66 /// ```
67 fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self
68 where G::Timestamp: TotalOrder;
69
70 /// Advances the timestamp of batches of records using a supplied function.
71 ///
72 /// The operator will test that the new timestamp is greater or equal to the
73 /// old timestamp, and will assert if it is not. The batch version does not
74 /// consult the data, and may only view the timestamp itself.
75 ///
76 /// # Examples
77 ///
78 /// The following example takes the sequence `0..10` at time `0`
79 /// and delays each batch (there is just one) to time `1`.
80 ///
81 /// ```
82 /// use timely::dataflow::operators::{ToStream, Operator};
83 /// use timely::dataflow::operators::vec::Delay;
84 /// use timely::dataflow::channels::pact::Pipeline;
85 ///
86 /// timely::example(|scope| {
87 /// (0..10).to_stream(scope)
88 /// .delay_batch(|time| time + 1)
89 /// .sink(Pipeline, "example", |(input, frontier)| {
90 /// input.for_each_time(|time, data| {
91 /// println!("data at time: {:?}", time);
92 /// });
93 /// });
94 /// });
95 /// ```
96 fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self;
97}
98
99impl<G: Scope<Timestamp: ::std::hash::Hash>, D: 'static> Delay<G, D> for StreamVec<G, D> {
100 fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, mut func: L) -> Self {
101 let mut elements = HashMap::new();
102 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
103 input.for_each_time(|time, data| {
104 for datum in data.flat_map(|d| d.drain(..)) {
105 let new_time = func(&datum, &time);
106 assert!(time.time().less_equal(&new_time));
107 elements.entry(new_time.clone())
108 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
109 .push(datum);
110 }
111 });
112
113 // for each available notification, send corresponding set
114 notificator.for_each(|time,_,_| {
115 if let Some(mut data) = elements.remove(&time) {
116 output.session(&time).give_iterator(data.drain(..));
117 }
118 });
119 })
120 }
121
122 fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(self, func: L) -> Self
123 where G::Timestamp: TotalOrder
124 {
125 self.delay(func)
126 }
127
128 fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(self, mut func: L) -> Self {
129 let mut elements = HashMap::new();
130 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
131 input.for_each_time(|time, data| {
132 let new_time = func(&time);
133 assert!(time.time().less_equal(&new_time));
134 elements.entry(new_time.clone())
135 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time, output.output_index())); Vec::new() })
136 .extend(data.map(std::mem::take));
137 });
138
139 // for each available notification, send corresponding set
140 notificator.for_each(|time,_,_| {
141 if let Some(mut datas) = elements.remove(&time) {
142 for mut data in datas.drain(..) {
143 output.session(&time).give_container(&mut data);
144 }
145 }
146 });
147 })
148 }
149}