timely/dataflow/operators/core/
reclock.rs

1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::{Container, Data};
4use crate::order::PartialOrder;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for reclocking a stream.
10pub trait Reclock<S: Scope> {
11    /// Delays records until an input is observed on the `clock` input.
12    ///
13    /// The source stream is buffered until a record is seen on the clock input,
14    /// at which point a notification is requested and all data with time less
15    /// or equal to the clock time are sent. This method does not ensure that all
16    /// workers receive the same clock records, which can be accomplished with
17    /// `broadcast`.
18    ///
19    /// # Examples
20    ///
21    /// ```
22    /// use timely::dataflow::operators::{ToStream, Delay, Map, Reclock, Capture};
23    /// use timely::dataflow::operators::capture::Extract;
24    ///
25    /// let captured = timely::example(|scope| {
26    ///
27    ///     // produce data 0..10 at times 0..10.
28    ///     let data = (0..10).to_stream(scope)
29    ///                       .delay(|x,t| *x);
30    ///
31    ///     // product clock ticks at three times.
32    ///     let clock = vec![3, 5, 8].into_iter()
33    ///                              .to_stream(scope)
34    ///                              .delay(|x,t| *x)
35    ///                              .map(|_| ());
36    ///
37    ///     // reclock the data.
38    ///     data.reclock(&clock)
39    ///         .capture()
40    /// });
41    ///
42    /// let extracted = captured.extract();
43    /// assert_eq!(extracted.len(), 3);
44    /// assert_eq!(extracted[0], (3, vec![0,1,2,3]));
45    /// assert_eq!(extracted[1], (5, vec![4,5]));
46    /// assert_eq!(extracted[2], (8, vec![6,7,8]));
47    /// ```
48    fn reclock<TC: Container + Data>(&self, clock: &StreamCore<S, TC>) -> Self;
49}
50
51impl<S: Scope, C: Container + Data> Reclock<S> for StreamCore<S, C> {
52    fn reclock<TC: Container + Data>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {
53
54        let mut stash = vec![];
55
56        self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| {
57
58            // stash each data input with its timestamp.
59            input1.for_each(|cap, data| {
60                stash.push((cap.time().clone(), std::mem::take(data)));
61            });
62
63            // request notification at time, to flush stash.
64            input2.for_each(|time, _data| {
65                notificator.notify_at(time.retain());
66            });
67
68            // each time with complete stash can be flushed.
69            notificator.for_each(|cap,_,_| {
70                let mut session = output.session(&cap);
71                for &mut (ref t, ref mut data) in &mut stash {
72                    if t.less_equal(cap.time()) {
73                        session.give_container(data);
74                    }
75                }
76                stash.retain(|x| !x.0.less_equal(cap.time()));
77            });
78        })
79    }
80}