timely/dataflow/operators/core/
ok_err.rs1use crate::container::{Container, SizableContainer, PushInto};
4use crate::Data;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8
9pub trait OkErr<S: Scope, C: Container> {
11 fn ok_err<C1, D1, C2, D2, L>(
32 &self,
33 logic: L,
34 ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
35 where
36 C1: SizableContainer + PushInto<D1> + Data,
37 C2: SizableContainer + PushInto<D2> + Data,
38 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
39 ;
40}
41
42impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C> {
43 fn ok_err<C1, D1, C2, D2, L>(
44 &self,
45 mut logic: L,
46 ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
47 where
48 C1: SizableContainer + PushInto<D1> + Data,
49 C2: SizableContainer + PushInto<D2> + Data,
50 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
51 {
52 let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
53 builder.set_notify(false);
54
55 let mut input = builder.new_input(self, Pipeline);
56 let (mut output1, stream1) = builder.new_output();
57 let (mut output2, stream2) = builder.new_output();
58
59 builder.build(move |_| {
60 move |_frontiers| {
61 let mut output1_handle = output1.activate();
62 let mut output2_handle = output2.activate();
63
64 input.for_each(|time, data| {
65 let mut out1 = output1_handle.session(&time);
66 let mut out2 = output2_handle.session(&time);
67 for datum in data.drain() {
68 match logic(datum) {
69 Ok(datum) => out1.give(datum),
70 Err(datum) => out2.give(datum),
71 }
72 }
73 });
74 }
75 });
76
77 (stream1, stream2)
78 }
79}