Skip to main content

timely/dataflow/operators/core/
ok_err.rs

1//! Operators that separate one stream into two streams based on some condition
2
3use crate::Container;
4use crate::container::{DrainContainer, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::operators::generic::OutputBuilder;
8use crate::dataflow::{Scope, Stream};
9
10/// Extension trait for `Stream`.
11pub trait OkErr<S: Scope, C: DrainContainer> {
12    /// Takes one input stream and splits it into two output streams.
13    /// For each record, the supplied closure is called with the data.
14    /// If it returns `Ok(x)`, then `x` will be sent
15    /// to the first returned stream; otherwise, if it returns `Err(e)`,
16    /// then `e` will be sent to the second.
17    ///
18    /// # Examples
19    /// ```
20    /// use timely::dataflow::operators::{ToStream, Inspect};
21    /// use timely::dataflow::operators::core::OkErr;
22    ///
23    /// timely::example(|scope| {
24    ///     let (odd, even) = (0..10)
25    ///         .to_stream(scope)
26    ///         .container::<Vec<_>>()
27    ///         .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) });
28    ///
29    ///     even.container::<Vec<_>>().inspect(|x| println!("even: {:?}", x));
30    ///     odd.container::<Vec<_>>().inspect(|x| println!("odd: {:?}", x));
31    /// });
32    /// ```
33    fn ok_err<C1, D1, C2, D2, L>(
34        self,
35        logic: L,
36    ) -> (Stream<S, C1>, Stream<S, C2>)
37    where
38        C1: Container + SizableContainer + PushInto<D1>,
39        C2: Container + SizableContainer + PushInto<D2>,
40        L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
41    ;
42}
43
44impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C> {
45    fn ok_err<C1, D1, C2, D2, L>(
46        self,
47        mut logic: L,
48    ) -> (Stream<S, C1>, Stream<S, C2>)
49    where
50        C1: Container + SizableContainer + PushInto<D1>,
51        C2: Container + SizableContainer + PushInto<D2>,
52        L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
53    {
54        let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
55        builder.set_notify(false);
56
57        let mut input = builder.new_input(self, Pipeline);
58        let (output1, stream1) = builder.new_output();
59        let (output2, stream2) = builder.new_output();
60
61        let mut output1 = OutputBuilder::from(output1);
62        let mut output2 = OutputBuilder::from(output2);
63
64        builder.build(move |_| {
65            move |_frontiers| {
66                let mut output1_handle = output1.activate();
67                let mut output2_handle = output2.activate();
68
69                input.for_each_time(|time, data| {
70                    let mut out1 = output1_handle.session(&time);
71                    let mut out2 = output2_handle.session(&time);
72                    for datum in data.flat_map(|d| d.drain()) {
73                        match logic(datum) {
74                            Ok(datum) => out1.give(datum),
75                            Err(datum) => out2.give(datum),
76                        }
77                    }
78                });
79            }
80        });
81
82        (stream1, stream2)
83    }
84}