timely/dataflow/operators/core/
ok_err.rs1use crate::Container;
4use crate::container::{DrainContainer, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::operators::generic::OutputBuilder;
8use crate::dataflow::{Scope, StreamCore};
9
10pub trait OkErr<S: Scope, C: DrainContainer> {
12 fn ok_err<C1, D1, C2, D2, L>(
33 &self,
34 logic: L,
35 ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
36 where
37 C1: Container + SizableContainer + PushInto<D1>,
38 C2: Container + SizableContainer + PushInto<D2>,
39 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
40 ;
41}
42
43impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for StreamCore<S, C> {
44 fn ok_err<C1, D1, C2, D2, L>(
45 &self,
46 mut logic: L,
47 ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
48 where
49 C1: Container + SizableContainer + PushInto<D1>,
50 C2: Container + SizableContainer + PushInto<D2>,
51 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
52 {
53 let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
54 builder.set_notify(false);
55
56 let mut input = builder.new_input(self, Pipeline);
57 let (output1, stream1) = builder.new_output();
58 let (output2, stream2) = builder.new_output();
59
60 let mut output1 = OutputBuilder::from(output1);
61 let mut output2 = OutputBuilder::from(output2);
62
63 builder.build(move |_| {
64 move |_frontiers| {
65 let mut output1_handle = output1.activate();
66 let mut output2_handle = output2.activate();
67
68 input.activate().for_each_time(|time, data| {
69 let mut out1 = output1_handle.session(&time);
70 let mut out2 = output2_handle.session(&time);
71 for datum in data.flat_map(|d| d.drain()) {
72 match logic(datum) {
73 Ok(datum) => out1.give(datum),
74 Err(datum) => out2.give(datum),
75 }
76 }
77 });
78 }
79 });
80
81 (stream1, stream2)
82 }
83}