Skip to main content

timely/dataflow/operators/core/
reclock.rs

1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::Container;
4use crate::order::PartialOrder;
5use crate::dataflow::{Scope, Stream};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for reclocking a stream.
10pub trait Reclock<S: Scope> {
11    /// Delays records until an input is observed on the `clock` input.
12    ///
13    /// The source stream is buffered until a record is seen on the clock input,
14    /// at which point a notification is requested and all data with time less
15    /// or equal to the clock time are sent. This method does not ensure that all
16    /// workers receive the same clock records, which can be accomplished with
17    /// `broadcast`.
18    ///
19    /// # Examples
20    ///
21    /// ```
22    /// use timely::dataflow::operators::{ToStream, Reclock, Capture};
23    /// use timely::dataflow::operators::vec::{Delay, Map};
24    /// use timely::dataflow::operators::capture::Extract;
25    ///
26    /// let captured = timely::example(|scope| {
27    ///
28    ///     // produce data 0..10 at times 0..10.
29    ///     let data = (0..10).to_stream(scope)
30    ///                       .delay(|x,t| *x);
31    ///
32    ///     // product clock ticks at three times.
33    ///     let clock = vec![3, 5, 8].into_iter()
34    ///                              .to_stream(scope)
35    ///                              .delay(|x,t| *x)
36    ///                              .map(|_| ());
37    ///
38    ///     // reclock the data.
39    ///     data.reclock(clock)
40    ///         .capture()
41    /// });
42    ///
43    /// let extracted = captured.extract();
44    /// assert_eq!(extracted.len(), 3);
45    /// assert_eq!(extracted[0], (3, vec![0,1,2,3]));
46    /// assert_eq!(extracted[1], (5, vec![4,5]));
47    /// assert_eq!(extracted[2], (8, vec![6,7,8]));
48    /// ```
49    fn reclock<TC: Container>(self, clock: Stream<S, TC>) -> Self;
50}
51
52impl<S: Scope, C: Container> Reclock<S> for Stream<S, C> {
53    fn reclock<TC: Container>(self, clock: Stream<S, TC>) -> Stream<S, C> {
54
55        let mut stash = vec![];
56
57        self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| {
58
59            // stash each data input with its timestamp.
60            input1.for_each_time(|cap, data| {
61                for data in data {
62                    stash.push((cap.time().clone(), std::mem::take(data)));
63                }
64            });
65
66            // request notification at time, to flush stash.
67            input2.for_each_time(|time, _data| {
68                notificator.notify_at(time.retain(output.output_index()));
69            });
70
71            // each time with complete stash can be flushed.
72            notificator.for_each(|cap,_,_| {
73                let mut session = output.session(&cap);
74                for &mut (ref t, ref mut data) in &mut stash {
75                    if t.less_equal(cap.time()) {
76                        session.give_container(data);
77                    }
78                }
79                stash.retain(|x| !x.0.less_equal(cap.time()));
80            });
81        })
82    }
83}