timely/dataflow/operators/core/reclock.rs
1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::{Container, Data};
4use crate::order::PartialOrder;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for reclocking a stream.
10pub trait Reclock<S: Scope> {
11 /// Delays records until an input is observed on the `clock` input.
12 ///
13 /// The source stream is buffered until a record is seen on the clock input,
14 /// at which point a notification is requested and all data with time less
15 /// or equal to the clock time are sent. This method does not ensure that all
16 /// workers receive the same clock records, which can be accomplished with
17 /// `broadcast`.
18 ///
19 /// # Examples
20 ///
21 /// ```
22 /// use timely::dataflow::operators::{ToStream, Delay, Map, Reclock, Capture};
23 /// use timely::dataflow::operators::capture::Extract;
24 ///
25 /// let captured = timely::example(|scope| {
26 ///
27 /// // produce data 0..10 at times 0..10.
28 /// let data = (0..10).to_stream(scope)
29 /// .delay(|x,t| *x);
30 ///
31 /// // product clock ticks at three times.
32 /// let clock = vec![3, 5, 8].into_iter()
33 /// .to_stream(scope)
34 /// .delay(|x,t| *x)
35 /// .map(|_| ());
36 ///
37 /// // reclock the data.
38 /// data.reclock(&clock)
39 /// .capture()
40 /// });
41 ///
42 /// let extracted = captured.extract();
43 /// assert_eq!(extracted.len(), 3);
44 /// assert_eq!(extracted[0], (3, vec![0,1,2,3]));
45 /// assert_eq!(extracted[1], (5, vec![4,5]));
46 /// assert_eq!(extracted[2], (8, vec![6,7,8]));
47 /// ```
48 fn reclock<TC: Container + Data>(&self, clock: &StreamCore<S, TC>) -> Self;
49}
50
51impl<S: Scope, C: Container + Data> Reclock<S> for StreamCore<S, C> {
52 fn reclock<TC: Container + Data>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {
53
54 let mut stash = vec![];
55
56 self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| {
57
58 // stash each data input with its timestamp.
59 input1.for_each(|cap, data| {
60 stash.push((cap.time().clone(), std::mem::take(data)));
61 });
62
63 // request notification at time, to flush stash.
64 input2.for_each(|time, _data| {
65 notificator.notify_at(time.retain());
66 });
67
68 // each time with complete stash can be flushed.
69 notificator.for_each(|cap,_,_| {
70 let mut session = output.session(&cap);
71 for &mut (ref t, ref mut data) in &mut stash {
72 if t.less_equal(cap.time()) {
73 session.give_container(data);
74 }
75 }
76 stash.retain(|x| !x.0.less_equal(cap.time()));
77 });
78 })
79 }
80}