timely/dataflow/operators/result.rs
1//! Extension methods for `Stream` containing `Result`s.
2
3use crate::Data;
4use crate::dataflow::operators::Map;
5use crate::dataflow::{Scope, Stream};
6
7/// Extension trait for `Stream`.
8pub trait ResultStream<S: Scope, T: Data, E: Data> {
9    /// Returns a new instance of `self` containing only `ok` records.
10    ///
11    /// # Examples
12    /// ```
13    /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
14    ///
15    /// timely::example(|scope| {
16    ///     vec![Ok(0), Err(())].to_stream(scope)
17    ///            .ok()
18    ///            .inspect(|x| println!("seen: {:?}", x));
19    /// });
20    /// ```
21    fn ok(&self) -> Stream<S, T>;
22
23    /// Returns a new instance of `self` containing only `err` records.
24    ///
25    /// # Examples
26    /// ```
27    /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
28    ///
29    /// timely::example(|scope| {
30    ///     vec![Ok(0), Err(())].to_stream(scope)
31    ///            .err()
32    ///            .inspect(|x| println!("seen: {:?}", x));
33    /// });
34    /// ```
35    fn err(&self) -> Stream<S, E>;
36
37    /// Returns a new instance of `self` applying `logic` on all `Ok` records.
38    ///
39    /// # Examples
40    /// ```
41    /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
42    ///
43    /// timely::example(|scope| {
44    ///     vec![Ok(0), Err(())].to_stream(scope)
45    ///            .map_ok(|x| x + 1)
46    ///            .inspect(|x| println!("seen: {:?}", x));
47    /// });
48    /// ```
49    fn map_ok<T2: Data, L: FnMut(T) -> T2 + 'static>(&self, logic: L) -> Stream<S, Result<T2, E>>;
50
51    /// Returns a new instance of `self` applying `logic` on all `Err` records.
52    ///
53    /// # Examples
54    /// ```
55    /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
56    ///
57    /// timely::example(|scope| {
58    ///     vec![Ok(0), Err(())].to_stream(scope)
59    ///            .map_err(|_| 1)
60    ///            .inspect(|x| println!("seen: {:?}", x));
61    /// });
62    /// ```
63    fn map_err<E2: Data, L: FnMut(E) -> E2 + 'static>(&self, logic: L) -> Stream<S, Result<T, E2>>;
64
65    /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err`
66    /// records.
67    ///
68    /// # Examples
69    /// ```
70    /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
71    ///
72    /// timely::example(|scope| {
73    ///     vec![Ok(0), Err(())].to_stream(scope)
74    ///            .and_then(|x| Ok(1 + 1))
75    ///            .inspect(|x| println!("seen: {:?}", x));
76    /// });
77    /// ```
78    fn and_then<T2: Data, L: FnMut(T) -> Result<T2, E> + 'static>(
79        &self,
80        logic: L,
81    ) -> Stream<S, Result<T2, E>>;
82
83    /// Returns a new instance of `self` applying `logic` on all `Ok` records.
84    ///
85    /// # Examples
86    /// ```
87    /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
88    ///
89    /// timely::example(|scope| {
90    ///     vec![Ok(1), Err(())].to_stream(scope)
91    ///            .unwrap_or_else(|_| 0)
92    ///            .inspect(|x| println!("seen: {:?}", x));
93    /// });
94    /// ```
95    fn unwrap_or_else<L: FnMut(E) -> T + 'static>(&self, logic: L) -> Stream<S, T>;
96}
97
98impl<S: Scope, T: Data, E: Data> ResultStream<S, T, E> for Stream<S, Result<T, E>> {
99    fn ok(&self) -> Stream<S, T> {
100        self.flat_map(Result::ok)
101    }
102
103    fn err(&self) -> Stream<S, E> {
104        self.flat_map(Result::err)
105    }
106
107    fn map_ok<T2: Data, L: FnMut(T) -> T2 + 'static>(&self, mut logic: L) -> Stream<S, Result<T2, E>> {
108        self.map(move |r| r.map(&mut logic))
109    }
110
111    fn map_err<E2: Data, L: FnMut(E) -> E2 + 'static>(&self, mut logic: L) -> Stream<S, Result<T, E2>> {
112        self.map(move |r| r.map_err(&mut logic))
113    }
114
115    fn and_then<T2: Data, L: FnMut(T) -> Result<T2, E> + 'static>(&self, mut logic: L) -> Stream<S, Result<T2, E>> {
116        self.map(move |r| r.and_then(&mut logic))
117    }
118
119    fn unwrap_or_else<L: FnMut(E) -> T + 'static>(&self, mut logic: L) -> Stream<S, T> {
120        self.map(move |r| r.unwrap_or_else(&mut logic))
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use crate::dataflow::operators::{ToStream, ResultStream, Capture, capture::Extract};
127
128    #[test]
129    fn test_ok() {
130        let output = crate::example(|scope| {
131            vec![Ok(0), Err(())].to_stream(scope)
132                .ok()
133                .capture()
134        });
135        assert_eq!(output.extract()[0].1, vec![0]);
136    }
137
138    #[test]
139    fn test_err() {
140        let output = crate::example(|scope| {
141            vec![Ok(0), Err(())].to_stream(scope)
142                .err()
143                .capture()
144        });
145        assert_eq!(output.extract()[0].1, vec![()]);
146    }
147
148    #[test]
149    fn test_map_ok() {
150        let output = crate::example(|scope| {
151            vec![Ok(0), Err(())].to_stream(scope)
152                .map_ok(|_| 10)
153                .capture()
154        });
155        assert_eq!(output.extract()[0].1, vec![Ok(10), Err(())]);
156    }
157
158    #[test]
159    fn test_map_err() {
160        let output = crate::example(|scope| {
161            vec![Ok(0), Err(())].to_stream(scope)
162                .map_err(|_| 10)
163                .capture()
164        });
165        assert_eq!(output.extract()[0].1, vec![Ok(0), Err(10)]);
166    }
167
168    #[test]
169    fn test_and_then() {
170        let output = crate::example(|scope| {
171            vec![Ok(0), Err(())].to_stream(scope)
172                .and_then(|_| Ok(1))
173                .capture()
174        });
175        assert_eq!(output.extract()[0].1, vec![Ok(1), Err(())]);
176    }
177
178    #[test]
179    fn test_unwrap_or_else() {
180        let output = crate::example(|scope| {
181            vec![Ok(0), Err(())].to_stream(scope)
182                .unwrap_or_else(|_| 10)
183                .capture()
184        });
185        assert_eq!(output.extract()[0].1, vec![0, 10]);
186    }
187}