timely/dataflow/operators/
branch.rs

1//! Operators that separate one stream into two streams based on some condition
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
5use crate::dataflow::{Scope, Stream, StreamCore};
6use crate::{Container, Data};
7
8/// Extension trait for `Stream`.
9pub trait Branch<S: Scope, D: Data> {
10    /// Takes one input stream and splits it into two output streams.
11    /// For each record, the supplied closure is called with a reference to
12    /// the data and its time. If it returns `true`, the record will be sent
13    /// to the second returned stream, otherwise it will be sent to the first.
14    ///
15    /// If the result of the closure only depends on the time, not the data,
16    /// `branch_when` should be used instead.
17    ///
18    /// # Examples
19    /// ```
20    /// use timely::dataflow::operators::{ToStream, Branch, Inspect};
21    ///
22    /// timely::example(|scope| {
23    ///     let (odd, even) = (0..10)
24    ///         .to_stream(scope)
25    ///         .branch(|_time, x| *x % 2 == 0);
26    ///
27    ///     even.inspect(|x| println!("even numbers: {:?}", x));
28    ///     odd.inspect(|x| println!("odd numbers: {:?}", x));
29    /// });
30    /// ```
31    fn branch(
32        &self,
33        condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
34    ) -> (Stream<S, D>, Stream<S, D>);
35}
36
37impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
38    fn branch(
39        &self,
40        condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
41    ) -> (Stream<S, D>, Stream<S, D>) {
42        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
43        builder.set_notify(false);
44
45        let mut input = builder.new_input(self, Pipeline);
46        let (mut output1, stream1) = builder.new_output();
47        let (mut output2, stream2) = builder.new_output();
48
49        builder.build(move |_| {
50            move |_frontiers| {
51                let mut output1_handle = output1.activate();
52                let mut output2_handle = output2.activate();
53
54                input.for_each(|time, data| {
55                    let mut out1 = output1_handle.session(&time);
56                    let mut out2 = output2_handle.session(&time);
57                    for datum in data.drain(..) {
58                        if condition(time.time(), &datum) {
59                            out2.give(datum);
60                        } else {
61                            out1.give(datum);
62                        }
63                    }
64                });
65            }
66        });
67
68        (stream1, stream2)
69    }
70}
71
72/// Extension trait for `Stream`.
73pub trait BranchWhen<T>: Sized {
74    /// Takes one input stream and splits it into two output streams.
75    /// For each time, the supplied closure is called. If it returns `true`,
76    /// the records for that will be sent to the second returned stream, otherwise
77    /// they will be sent to the first.
78    ///
79    /// # Examples
80    /// ```
81    /// use timely::dataflow::operators::{ToStream, BranchWhen, Inspect, Delay};
82    ///
83    /// timely::example(|scope| {
84    ///     let (before_five, after_five) = (0..10)
85    ///         .to_stream(scope)
86    ///         .delay(|x,t| *x) // data 0..10 at time 0..10
87    ///         .branch_when(|time| time >= &5);
88    ///
89    ///     before_five.inspect(|x| println!("Times 0-4: {:?}", x));
90    ///     after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
91    /// });
92    /// ```
93    fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
94}
95
96impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
97    fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
98        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
99        builder.set_notify(false);
100
101        let mut input = builder.new_input(self, Pipeline);
102        let (mut output1, stream1) = builder.new_output();
103        let (mut output2, stream2) = builder.new_output();
104
105        builder.build(move |_| {
106
107            move |_frontiers| {
108                let mut output1_handle = output1.activate();
109                let mut output2_handle = output2.activate();
110
111                input.for_each(|time, data| {
112                    let mut out = if condition(time.time()) {
113                        output2_handle.session(&time)
114                    } else {
115                        output1_handle.session(&time)
116                    };
117                    out.give_container(data);
118                });
119            }
120        });
121
122        (stream1, stream2)
123    }
124}