timely/dataflow/operators/core/
ok_err.rs
1use crate::container::{Container, SizableContainer, PushInto};
4use crate::Data;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8
9pub trait OkErr<S: Scope, C: Container> {
11 fn ok_err<C1, D1, C2, D2, L>(
32 &self,
33 logic: L,
34 ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
35 where
36 C1: SizableContainer + PushInto<D1> + Data,
37 C2: SizableContainer + PushInto<D2> + Data,
38 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
39 ;
40}
41
42impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C> {
43 fn ok_err<C1, D1, C2, D2, L>(
44 &self,
45 mut logic: L,
46 ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
47 where
48 C1: SizableContainer + PushInto<D1> + Data,
49 C2: SizableContainer + PushInto<D2> + Data,
50 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
51 {
52 let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
53
54 let mut input = builder.new_input(self, Pipeline);
55 let (mut output1, stream1) = builder.new_output();
56 let (mut output2, stream2) = builder.new_output();
57
58 builder.build(move |_| {
59 move |_frontiers| {
60 let mut output1_handle = output1.activate();
61 let mut output2_handle = output2.activate();
62
63 input.for_each(|time, data| {
64 let mut out1 = output1_handle.session(&time);
65 let mut out2 = output2_handle.session(&time);
66 for datum in data.drain() {
67 match logic(datum) {
68 Ok(datum) => out1.give(datum),
69 Err(datum) => out2.give(datum),
70 }
71 }
72 });
73 }
74 });
75
76 (stream1, stream2)
77 }
78}