timely/dataflow/operators/core/
ok_err.rs

1//! Operators that separate one stream into two streams based on some condition
2
3use crate::container::{Container, SizableContainer, PushInto};
4use crate::Data;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8
9/// Extension trait for `Stream`.
10pub trait OkErr<S: Scope, C: Container> {
11    /// Takes one input stream and splits it into two output streams.
12    /// For each record, the supplied closure is called with the data.
13    /// If it returns `Ok(x)`, then `x` will be sent
14    /// to the first returned stream; otherwise, if it returns `Err(e)`,
15    /// then `e` will be sent to the second.
16    ///
17    /// # Examples
18    /// ```
19    /// use timely::dataflow::operators::ToStream;
20    /// use timely::dataflow::operators::core::{OkErr, Inspect};
21    ///
22    /// timely::example(|scope| {
23    ///     let (odd, even) = (0..10)
24    ///         .to_stream(scope)
25    ///         .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) });
26    ///
27    ///     even.container::<Vec<_>>().inspect(|x| println!("even: {:?}", x));
28    ///     odd.container::<Vec<_>>().inspect(|x| println!("odd: {:?}", x));
29    /// });
30    /// ```
31    fn ok_err<C1, D1, C2, D2, L>(
32        &self,
33        logic: L,
34    ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
35    where
36        C1: SizableContainer + PushInto<D1> + Data,
37        C2: SizableContainer + PushInto<D2> + Data,
38        L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
39    ;
40}
41
42impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C> {
43    fn ok_err<C1, D1, C2, D2, L>(
44        &self,
45        mut logic: L,
46    ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
47    where
48        C1: SizableContainer + PushInto<D1> + Data,
49        C2: SizableContainer + PushInto<D2> + Data,
50        L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
51    {
52        let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
53
54        let mut input = builder.new_input(self, Pipeline);
55        let (mut output1, stream1) = builder.new_output();
56        let (mut output2, stream2) = builder.new_output();
57
58        builder.build(move |_| {
59            move |_frontiers| {
60                let mut output1_handle = output1.activate();
61                let mut output2_handle = output2.activate();
62
63                input.for_each(|time, data| {
64                    let mut out1 = output1_handle.session(&time);
65                    let mut out2 = output2_handle.session(&time);
66                    for datum in data.drain() {
67                        match logic(datum) {
68                            Ok(datum) => out1.give(datum),
69                            Err(datum) => out2.give(datum),
70                        }
71                    }
72                });
73            }
74        });
75
76        (stream1, stream2)
77    }
78}