Skip to main content

timely/dataflow/operators/vec/
branch.rs

1//! Operators that separate one stream into two streams based on some condition
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::generic::OutputBuilder;
5use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
6use crate::dataflow::{Scope, StreamVec, Stream};
7use crate::Container;
8
9/// Extension trait for `StreamVec`.
10pub trait Branch<S: Scope, D> {
11    /// Takes one input stream and splits it into two output streams.
12    /// For each record, the supplied closure is called with a reference to
13    /// the data and its time. If it returns `true`, the record will be sent
14    /// to the second returned stream, otherwise it will be sent to the first.
15    ///
16    /// If the result of the closure only depends on the time, not the data,
17    /// `branch_when` should be used instead.
18    ///
19    /// # Examples
20    /// ```
21    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Branch};
22    ///
23    /// timely::example(|scope| {
24    ///     let (odd, even) = (0..10)
25    ///         .to_stream(scope)
26    ///         .branch(|_time, x| *x % 2 == 0);
27    ///
28    ///     even.inspect(|x| println!("even numbers: {:?}", x));
29    ///     odd.inspect(|x| println!("odd numbers: {:?}", x));
30    /// });
31    /// ```
32    fn branch(
33        self,
34        condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
35    ) -> (StreamVec<S, D>, StreamVec<S, D>);
36}
37
38impl<S: Scope, D: 'static> Branch<S, D> for StreamVec<S, D> {
39    fn branch(
40        self,
41        condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
42    ) -> (StreamVec<S, D>, StreamVec<S, D>) {
43        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
44        builder.set_notify(false);
45
46        let mut input = builder.new_input(self, Pipeline);
47        let (output1, stream1) = builder.new_output();
48        let (output2, stream2) = builder.new_output();
49
50        let mut output1 = OutputBuilder::from(output1);
51        let mut output2 = OutputBuilder::from(output2);
52
53        builder.build(move |_| {
54            move |_frontiers| {
55                let mut output1_handle = output1.activate();
56                let mut output2_handle = output2.activate();
57
58                input.for_each_time(|time, data| {
59                    let mut out1 = output1_handle.session(&time);
60                    let mut out2 = output2_handle.session(&time);
61                    for datum in data.flat_map(|d| d.drain(..)) {
62                        if condition(time.time(), &datum) {
63                            out2.give(datum);
64                        } else {
65                            out1.give(datum);
66                        }
67                    }
68                });
69            }
70        });
71
72        (stream1, stream2)
73    }
74}
75
76/// Extension trait for `Stream`.
77pub trait BranchWhen<T>: Sized {
78    /// Takes one input stream and splits it into two output streams.
79    /// For each time, the supplied closure is called. If it returns `true`,
80    /// the records for that will be sent to the second returned stream, otherwise
81    /// they will be sent to the first.
82    ///
83    /// # Examples
84    /// ```
85    /// use timely::dataflow::operators::{ToStream, Inspect};
86    /// use timely::dataflow::operators::vec::{BranchWhen, Delay};
87    ///
88    /// timely::example(|scope| {
89    ///     let (before_five, after_five) = (0..10)
90    ///         .to_stream(scope)
91    ///         .container::<Vec<_>>()
92    ///         .delay(|x,t| *x) // data 0..10 at time 0..10
93    ///         .branch_when(|time| time >= &5);
94    ///
95    ///     before_five.inspect(|x| println!("Times 0-4: {:?}", x));
96    ///     after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
97    /// });
98    /// ```
99    fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
100}
101
102impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for Stream<S, C> {
103    fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
104        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
105        builder.set_notify(false);
106
107        let mut input = builder.new_input(self, Pipeline);
108        let (output1, stream1) = builder.new_output();
109        let (output2, stream2) = builder.new_output();
110
111        let mut output1 = OutputBuilder::from(output1);
112        let mut output2 = OutputBuilder::from(output2);
113
114        builder.build(move |_| {
115
116            move |_frontiers| {
117                let mut output1_handle = output1.activate();
118                let mut output2_handle = output2.activate();
119
120                input.for_each_time(|time, data| {
121                    let mut out = if condition(time.time()) {
122                        output2_handle.session(&time)
123                    } else {
124                        output1_handle.session(&time)
125                    };
126                    out.give_containers(data);
127                });
128            }
129        });
130
131        (stream1, stream2)
132    }
133}