Skip to main content

timely/dataflow/operators/core/
ok_err.rs

1//! Operators that separate one stream into two streams based on some condition
2
3use crate::Container;
4use crate::progress::Timestamp;
5use crate::container::{DrainContainer, SizableContainer, PushInto};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
8use crate::dataflow::operators::generic::OutputBuilder;
9use crate::dataflow::Stream;
10
11/// Extension trait for `Stream`.
12pub trait OkErr<'scope, T: Timestamp, C: DrainContainer> {
13    /// Takes one input stream and splits it into two output streams.
14    /// For each record, the supplied closure is called with the data.
15    /// If it returns `Ok(x)`, then `x` will be sent
16    /// to the first returned stream; otherwise, if it returns `Err(e)`,
17    /// then `e` will be sent to the second.
18    ///
19    /// # Examples
20    /// ```
21    /// use timely::dataflow::operators::{ToStream, Inspect};
22    /// use timely::dataflow::operators::core::OkErr;
23    ///
24    /// timely::example(|scope| {
25    ///     let (odd, even) = (0..10)
26    ///         .to_stream(scope)
27    ///         .container::<Vec<_>>()
28    ///         .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) });
29    ///
30    ///     even.container::<Vec<_>>().inspect(|x| println!("even: {:?}", x));
31    ///     odd.container::<Vec<_>>().inspect(|x| println!("odd: {:?}", x));
32    /// });
33    /// ```
34    fn ok_err<C1, D1, C2, D2, L>(
35        self,
36        logic: L,
37    ) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)
38    where
39        C1: Container + SizableContainer + PushInto<D1>,
40        C2: Container + SizableContainer + PushInto<D2>,
41        L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
42    ;
43}
44
45impl<'scope, T: Timestamp, C: Container + DrainContainer> OkErr<'scope, T, C> for Stream<'scope, T, C> {
46    fn ok_err<C1, D1, C2, D2, L>(
47        self,
48        mut logic: L,
49    ) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)
50    where
51        C1: Container + SizableContainer + PushInto<D1>,
52        C2: Container + SizableContainer + PushInto<D2>,
53        L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
54    {
55        let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
56
57        let mut input = builder.new_input(self, Pipeline);
58        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
59        let (output1, stream1) = builder.new_output();
60        let (output2, stream2) = builder.new_output();
61
62        let mut output1 = OutputBuilder::from(output1);
63        let mut output2 = OutputBuilder::from(output2);
64
65        builder.build(move |_| {
66            move |_frontiers| {
67                let mut output1_handle = output1.activate();
68                let mut output2_handle = output2.activate();
69
70                input.for_each_time(|time, data| {
71                    let mut out1 = output1_handle.session(&time);
72                    let mut out2 = output2_handle.session(&time);
73                    for datum in data.flat_map(|d| d.drain()) {
74                        match logic(datum) {
75                            Ok(datum) => out1.give(datum),
76                            Err(datum) => out2.give(datum),
77                        }
78                    }
79                });
80            }
81        });
82
83        (stream1, stream2)
84    }
85}