timely/dataflow/operators/core/
ok_err.rs1use crate::Container;
4use crate::progress::Timestamp;
5use crate::container::{DrainContainer, SizableContainer, PushInto};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
8use crate::dataflow::operators::generic::OutputBuilder;
9use crate::dataflow::Stream;
10
11pub trait OkErr<'scope, T: Timestamp, C: DrainContainer> {
13 fn ok_err<C1, D1, C2, D2, L>(
35 self,
36 logic: L,
37 ) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)
38 where
39 C1: Container + SizableContainer + PushInto<D1>,
40 C2: Container + SizableContainer + PushInto<D2>,
41 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
42 ;
43}
44
45impl<'scope, T: Timestamp, C: Container + DrainContainer> OkErr<'scope, T, C> for Stream<'scope, T, C> {
46 fn ok_err<C1, D1, C2, D2, L>(
47 self,
48 mut logic: L,
49 ) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)
50 where
51 C1: Container + SizableContainer + PushInto<D1>,
52 C2: Container + SizableContainer + PushInto<D2>,
53 L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
54 {
55 let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
56
57 let mut input = builder.new_input(self, Pipeline);
58 builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
59 let (output1, stream1) = builder.new_output();
60 let (output2, stream2) = builder.new_output();
61
62 let mut output1 = OutputBuilder::from(output1);
63 let mut output2 = OutputBuilder::from(output2);
64
65 builder.build(move |_| {
66 move |_frontiers| {
67 let mut output1_handle = output1.activate();
68 let mut output2_handle = output2.activate();
69
70 input.for_each_time(|time, data| {
71 let mut out1 = output1_handle.session(&time);
72 let mut out2 = output2_handle.session(&time);
73 for datum in data.flat_map(|d| d.drain()) {
74 match logic(datum) {
75 Ok(datum) => out1.give(datum),
76 Err(datum) => out2.give(datum),
77 }
78 }
79 });
80 }
81 });
82
83 (stream1, stream2)
84 }
85}