timely/dataflow/operators/
branch.rs1use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::generic::OutputBuilder;
5use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
6use crate::dataflow::{Scope, Stream, StreamCore};
7use crate::{Container, Data};
8
9pub trait Branch<S: Scope, D: Data> {
11 fn branch(
33 &self,
34 condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
35 ) -> (Stream<S, D>, Stream<S, D>);
36}
37
38impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
39 fn branch(
40 &self,
41 condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
42 ) -> (Stream<S, D>, Stream<S, D>) {
43 let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
44 builder.set_notify(false);
45
46 let mut input = builder.new_input(self, Pipeline);
47 let (output1, stream1) = builder.new_output();
48 let (output2, stream2) = builder.new_output();
49
50 let mut output1 = OutputBuilder::from(output1);
51 let mut output2 = OutputBuilder::from(output2);
52
53 builder.build(move |_| {
54 move |_frontiers| {
55 let mut output1_handle = output1.activate();
56 let mut output2_handle = output2.activate();
57
58 input.activate().for_each_time(|time, data| {
59 let mut out1 = output1_handle.session(&time);
60 let mut out2 = output2_handle.session(&time);
61 for datum in data.flat_map(|d| d.drain(..)) {
62 if condition(time.time(), &datum) {
63 out2.give(datum);
64 } else {
65 out1.give(datum);
66 }
67 }
68 });
69 }
70 });
71
72 (stream1, stream2)
73 }
74}
75
76pub trait BranchWhen<T>: Sized {
78 fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
98}
99
100impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
101 fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
102 let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
103 builder.set_notify(false);
104
105 let mut input = builder.new_input(self, Pipeline);
106 let (output1, stream1) = builder.new_output();
107 let (output2, stream2) = builder.new_output();
108
109 let mut output1 = OutputBuilder::from(output1);
110 let mut output2 = OutputBuilder::from(output2);
111
112 builder.build(move |_| {
113
114 move |_frontiers| {
115 let mut output1_handle = output1.activate();
116 let mut output2_handle = output2.activate();
117
118 input.activate().for_each_time(|time, data| {
119 let mut out = if condition(time.time()) {
120 output2_handle.session(&time)
121 } else {
122 output1_handle.session(&time)
123 };
124 out.give_containers(data);
125 });
126 }
127 });
128
129 (stream1, stream2)
130 }
131}