timely/dataflow/operators/core/reclock.rs
1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::Container;
4use crate::order::PartialOrder;
5use crate::dataflow::{Scope, Stream};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for reclocking a stream.
10pub trait Reclock<S: Scope> {
11 /// Delays records until an input is observed on the `clock` input.
12 ///
13 /// The source stream is buffered until a record is seen on the clock input,
14 /// at which point a notification is requested and all data with time less
15 /// or equal to the clock time are sent. This method does not ensure that all
16 /// workers receive the same clock records, which can be accomplished with
17 /// `broadcast`.
18 ///
19 /// # Examples
20 ///
21 /// ```
22 /// use timely::dataflow::operators::{ToStream, Reclock, Capture};
23 /// use timely::dataflow::operators::vec::{Delay, Map};
24 /// use timely::dataflow::operators::capture::Extract;
25 ///
26 /// let captured = timely::example(|scope| {
27 ///
28 /// // produce data 0..10 at times 0..10.
29 /// let data = (0..10).to_stream(scope)
30 /// .delay(|x,t| *x);
31 ///
32 /// // product clock ticks at three times.
33 /// let clock = vec![3, 5, 8].into_iter()
34 /// .to_stream(scope)
35 /// .delay(|x,t| *x)
36 /// .map(|_| ());
37 ///
38 /// // reclock the data.
39 /// data.reclock(clock)
40 /// .capture()
41 /// });
42 ///
43 /// let extracted = captured.extract();
44 /// assert_eq!(extracted.len(), 3);
45 /// assert_eq!(extracted[0], (3, vec![0,1,2,3]));
46 /// assert_eq!(extracted[1], (5, vec![4,5]));
47 /// assert_eq!(extracted[2], (8, vec![6,7,8]));
48 /// ```
49 fn reclock<TC: Container>(self, clock: Stream<S, TC>) -> Self;
50}
51
52impl<S: Scope, C: Container> Reclock<S> for Stream<S, C> {
53 fn reclock<TC: Container>(self, clock: Stream<S, TC>) -> Stream<S, C> {
54
55 let mut stash = vec![];
56
57 self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| {
58
59 // stash each data input with its timestamp.
60 input1.for_each_time(|cap, data| {
61 for data in data {
62 stash.push((cap.time().clone(), std::mem::take(data)));
63 }
64 });
65
66 // request notification at time, to flush stash.
67 input2.for_each_time(|time, _data| {
68 notificator.notify_at(time.retain(output.output_index()));
69 });
70
71 // each time with complete stash can be flushed.
72 notificator.for_each(|cap,_,_| {
73 let mut session = output.session(&cap);
74 for &mut (ref t, ref mut data) in &mut stash {
75 if t.less_equal(cap.time()) {
76 session.give_container(data);
77 }
78 }
79 stash.retain(|x| !x.0.less_equal(cap.time()));
80 });
81 })
82 }
83}