timely/dataflow/operators/core/
ok_err.rs1use crate::Container;
4use crate::container::{DrainContainer, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::operators::generic::OutputBuilder;
8use crate::dataflow::{Scope, Stream};
9
10pub trait OkErr<S: Scope, C: DrainContainer> {
12 fn ok_err<C1, D1, C2, D2, L>(
34 self,
35 logic: L,
36 ) -> (Stream<S, C1>, Stream<S, C2>)
37 where
38 C1: Container + SizableContainer + PushInto<D1>,
39 C2: Container + SizableContainer + PushInto<D2>,
40 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
41 ;
42}
43
44impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C> {
45 fn ok_err<C1, D1, C2, D2, L>(
46 self,
47 mut logic: L,
48 ) -> (Stream<S, C1>, Stream<S, C2>)
49 where
50 C1: Container + SizableContainer + PushInto<D1>,
51 C2: Container + SizableContainer + PushInto<D2>,
52 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
53 {
54 let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
55 builder.set_notify(false);
56
57 let mut input = builder.new_input(self, Pipeline);
58 let (output1, stream1) = builder.new_output();
59 let (output2, stream2) = builder.new_output();
60
61 let mut output1 = OutputBuilder::from(output1);
62 let mut output2 = OutputBuilder::from(output2);
63
64 builder.build(move |_| {
65 move |_frontiers| {
66 let mut output1_handle = output1.activate();
67 let mut output2_handle = output2.activate();
68
69 input.for_each_time(|time, data| {
70 let mut out1 = output1_handle.session(&time);
71 let mut out2 = output2_handle.session(&time);
72 for datum in data.flat_map(|d| d.drain()) {
73 match logic(datum) {
74 Ok(datum) => out1.give(datum),
75 Err(datum) => out2.give(datum),
76 }
77 }
78 });
79 }
80 });
81
82 (stream1, stream2)
83 }
84}