timely/dataflow/operators/delay.rs
1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::Data;
6use crate::order::{PartialOrder, TotalOrder};
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::{Stream, Scope};
9use crate::dataflow::operators::generic::operator::Operator;
10
11/// Methods to advance the timestamps of records or batches of records.
12pub trait Delay<G: Scope, D: Data> {
13
14 /// Advances the timestamp of records using a supplied function.
15 ///
16 /// The function *must* advance the timestamp; the operator will test that the
17 /// new timestamp is greater or equal to the old timestamp, and will assert if
18 /// it is not.
19 ///
20 /// # Examples
21 ///
22 /// The following example takes the sequence `0..10` at time `0`
23 /// and delays each element `i` to time `i`.
24 ///
25 /// ```
26 /// use timely::dataflow::operators::{ToStream, Delay, Operator};
27 /// use timely::dataflow::channels::pact::Pipeline;
28 ///
29 /// timely::example(|scope| {
30 /// (0..10).to_stream(scope)
31 /// .delay(|data, time| *data)
32 /// .sink(Pipeline, "example", |input| {
33 /// input.for_each(|time, data| {
34 /// println!("data at time: {:?}", time);
35 /// });
36 /// });
37 /// });
38 /// ```
39 fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
40
41 /// Advances the timestamp of records using a supplied function.
42 ///
43 /// This method is a specialization of `delay` for when the timestamp is totally
44 /// ordered. In this case, we can use a priority queue rather than an unsorted
45 /// list to manage the potentially available timestamps.
46 ///
47 /// # Examples
48 ///
49 /// The following example takes the sequence `0..10` at time `0`
50 /// and delays each element `i` to time `i`.
51 ///
52 /// ```
53 /// use timely::dataflow::operators::{ToStream, Delay, Operator};
54 /// use timely::dataflow::channels::pact::Pipeline;
55 ///
56 /// timely::example(|scope| {
57 /// (0..10).to_stream(scope)
58 /// .delay(|data, time| *data)
59 /// .sink(Pipeline, "example", |input| {
60 /// input.for_each(|time, data| {
61 /// println!("data at time: {:?}", time);
62 /// });
63 /// });
64 /// });
65 /// ```
66 fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
67 where G::Timestamp: TotalOrder;
68
69 /// Advances the timestamp of batches of records using a supplied function.
70 ///
71 /// The operator will test that the new timestamp is greater or equal to the
72 /// old timestamp, and will assert if it is not. The batch version does not
73 /// consult the data, and may only view the timestamp itself.
74 ///
75 /// # Examples
76 ///
77 /// The following example takes the sequence `0..10` at time `0`
78 /// and delays each batch (there is just one) to time `1`.
79 ///
80 /// ```
81 /// use timely::dataflow::operators::{ToStream, Delay, Operator};
82 /// use timely::dataflow::channels::pact::Pipeline;
83 ///
84 /// timely::example(|scope| {
85 /// (0..10).to_stream(scope)
86 /// .delay_batch(|time| time + 1)
87 /// .sink(Pipeline, "example", |input| {
88 /// input.for_each(|time, data| {
89 /// println!("data at time: {:?}", time);
90 /// });
91 /// });
92 /// });
93 /// ```
94 fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
95}
96
97impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
98 fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
99 let mut elements = HashMap::new();
100 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
101 input.for_each(|time, data| {
102 for datum in data.drain(..) {
103 let new_time = func(&datum, &time);
104 assert!(time.time().less_equal(&new_time));
105 elements.entry(new_time.clone())
106 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
107 .push(datum);
108 }
109 });
110
111 // for each available notification, send corresponding set
112 notificator.for_each(|time,_,_| {
113 if let Some(mut data) = elements.remove(&time) {
114 output.session(&time).give_iterator(data.drain(..));
115 }
116 });
117 })
118 }
119
120 fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
121 where G::Timestamp: TotalOrder
122 {
123 self.delay(func)
124 }
125
126 fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
127 let mut elements = HashMap::new();
128 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
129 input.for_each(|time, data| {
130 let new_time = func(&time);
131 assert!(time.time().less_equal(&new_time));
132 elements.entry(new_time.clone())
133 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
134 .push(std::mem::take(data));
135 });
136
137 // for each available notification, send corresponding set
138 notificator.for_each(|time,_,_| {
139 if let Some(mut datas) = elements.remove(&time) {
140 for mut data in datas.drain(..) {
141 output.session(&time).give_container(&mut data);
142 }
143 }
144 });
145 })
146 }
147}