Skip to main content

timely/dataflow/operators/vec/
result.rs

1//! Extension methods for `StreamVec` containing `Result`s.
2
3use crate::dataflow::operators::vec::Map;
4use crate::dataflow::{Scope, StreamVec};
5
6/// Extension trait for `StreamVec`.
7pub trait ResultStream<S: Scope, T: 'static, E: 'static> {
8    /// Returns a new instance of `self` containing only `ok` records.
9    ///
10    /// # Examples
11    /// ```
12    /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
13    ///
14    /// timely::example(|scope| {
15    ///     vec![Ok(0), Err(())].to_stream(scope)
16    ///            .ok()
17    ///            .inspect(|x| println!("seen: {:?}", x));
18    /// });
19    /// ```
20    fn ok(self) -> StreamVec<S, T>;
21
22    /// Returns a new instance of `self` containing only `err` records.
23    ///
24    /// # Examples
25    /// ```
26    /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
27    ///
28    /// timely::example(|scope| {
29    ///     vec![Ok(0), Err(())].to_stream(scope)
30    ///            .err()
31    ///            .inspect(|x| println!("seen: {:?}", x));
32    /// });
33    /// ```
34    fn err(self) -> StreamVec<S, E>;
35
36    /// Returns a new instance of `self` applying `logic` on all `Ok` records.
37    ///
38    /// # Examples
39    /// ```
40    /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
41    ///
42    /// timely::example(|scope| {
43    ///     vec![Ok(0), Err(())].to_stream(scope)
44    ///            .map_ok(|x| x + 1)
45    ///            .inspect(|x| println!("seen: {:?}", x));
46    /// });
47    /// ```
48    fn map_ok<T2: 'static, L: FnMut(T) -> T2 + 'static>(self, logic: L) -> StreamVec<S, Result<T2, E>>;
49
50    /// Returns a new instance of `self` applying `logic` on all `Err` records.
51    ///
52    /// # Examples
53    /// ```
54    /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
55    ///
56    /// timely::example(|scope| {
57    ///     vec![Ok(0), Err(())].to_stream(scope)
58    ///            .map_err(|_| 1)
59    ///            .inspect(|x| println!("seen: {:?}", x));
60    /// });
61    /// ```
62    fn map_err<E2: 'static, L: FnMut(E) -> E2 + 'static>(self, logic: L) -> StreamVec<S, Result<T, E2>>;
63
64    /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err`
65    /// records.
66    ///
67    /// # Examples
68    /// ```
69    /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
70    ///
71    /// timely::example(|scope| {
72    ///     vec![Ok(0), Err(())].to_stream(scope)
73    ///            .and_then(|x| Ok(1 + 1))
74    ///            .inspect(|x| println!("seen: {:?}", x));
75    /// });
76    /// ```
77    fn and_then<T2: 'static, L: FnMut(T) -> Result<T2, E> + 'static>(
78        self,
79        logic: L,
80    ) -> StreamVec<S, Result<T2, E>>;
81
82    /// Returns a new instance of `self` applying `logic` on all `Ok` records.
83    ///
84    /// # Examples
85    /// ```
86    /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
87    ///
88    /// timely::example(|scope| {
89    ///     vec![Ok(1), Err(())].to_stream(scope)
90    ///            .unwrap_or_else(|_| 0)
91    ///            .inspect(|x| println!("seen: {:?}", x));
92    /// });
93    /// ```
94    fn unwrap_or_else<L: FnMut(E) -> T + 'static>(self, logic: L) -> StreamVec<S, T>;
95}
96
97impl<S: Scope, T: 'static, E: 'static> ResultStream<S, T, E> for StreamVec<S, Result<T, E>> {
98    fn ok(self) -> StreamVec<S, T> {
99        self.flat_map(Result::ok)
100    }
101
102    fn err(self) -> StreamVec<S, E> {
103        self.flat_map(Result::err)
104    }
105
106    fn map_ok<T2: 'static, L: FnMut(T) -> T2 + 'static>(self, mut logic: L) -> StreamVec<S, Result<T2, E>> {
107        self.map(move |r| r.map(&mut logic))
108    }
109
110    fn map_err<E2: 'static, L: FnMut(E) -> E2 + 'static>(self, mut logic: L) -> StreamVec<S, Result<T, E2>> {
111        self.map(move |r| r.map_err(&mut logic))
112    }
113
114    fn and_then<T2: 'static, L: FnMut(T) -> Result<T2, E> + 'static>(self, mut logic: L) -> StreamVec<S, Result<T2, E>> {
115        self.map(move |r| r.and_then(&mut logic))
116    }
117
118    fn unwrap_or_else<L: FnMut(E) -> T + 'static>(self, mut logic: L) -> StreamVec<S, T> {
119        self.map(move |r| r.unwrap_or_else(&mut logic))
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use crate::dataflow::operators::{vec::{ToStream, ResultStream}, Capture, capture::Extract};
126
127    #[test]
128    fn test_ok() {
129        let output = crate::example(|scope| {
130            vec![Ok(0), Err(())].to_stream(scope)
131                .ok()
132                .capture()
133        });
134        assert_eq!(output.extract()[0].1, vec![0]);
135    }
136
137    #[test]
138    fn test_err() {
139        let output = crate::example(|scope| {
140            vec![Ok(0), Err(())].to_stream(scope)
141                .err()
142                .capture()
143        });
144        assert_eq!(output.extract()[0].1, vec![()]);
145    }
146
147    #[test]
148    fn test_map_ok() {
149        let output = crate::example(|scope| {
150            vec![Ok(0), Err(())].to_stream(scope)
151                .map_ok(|_| 10)
152                .capture()
153        });
154        assert_eq!(output.extract()[0].1, vec![Ok(10), Err(())]);
155    }
156
157    #[test]
158    fn test_map_err() {
159        let output = crate::example(|scope| {
160            vec![Ok(0), Err(())].to_stream(scope)
161                .map_err(|_| 10)
162                .capture()
163        });
164        assert_eq!(output.extract()[0].1, vec![Ok(0), Err(10)]);
165    }
166
167    #[test]
168    fn test_and_then() {
169        let output = crate::example(|scope| {
170            vec![Ok(0), Err(())].to_stream(scope)
171                .and_then(|_| Ok(1))
172                .capture()
173        });
174        assert_eq!(output.extract()[0].1, vec![Ok(1), Err(())]);
175    }
176
177    #[test]
178    fn test_unwrap_or_else() {
179        let output = crate::example(|scope| {
180            vec![Ok(0), Err(())].to_stream(scope)
181                .unwrap_or_else(|_| 10)
182                .capture()
183        });
184        assert_eq!(output.extract()[0].1, vec![0, 10]);
185    }
186}