timely/dataflow/operators/vec/result.rs
1//! Extension methods for `StreamVec` containing `Result`s.
2
3use crate::dataflow::operators::vec::Map;
4use crate::dataflow::{Scope, StreamVec};
5
6/// Extension trait for `StreamVec`.
7pub trait ResultStream<S: Scope, T: 'static, E: 'static> {
8 /// Returns a new instance of `self` containing only `ok` records.
9 ///
10 /// # Examples
11 /// ```
12 /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
13 ///
14 /// timely::example(|scope| {
15 /// vec![Ok(0), Err(())].to_stream(scope)
16 /// .ok()
17 /// .inspect(|x| println!("seen: {:?}", x));
18 /// });
19 /// ```
20 fn ok(self) -> StreamVec<S, T>;
21
22 /// Returns a new instance of `self` containing only `err` records.
23 ///
24 /// # Examples
25 /// ```
26 /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
27 ///
28 /// timely::example(|scope| {
29 /// vec![Ok(0), Err(())].to_stream(scope)
30 /// .err()
31 /// .inspect(|x| println!("seen: {:?}", x));
32 /// });
33 /// ```
34 fn err(self) -> StreamVec<S, E>;
35
36 /// Returns a new instance of `self` applying `logic` on all `Ok` records.
37 ///
38 /// # Examples
39 /// ```
40 /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
41 ///
42 /// timely::example(|scope| {
43 /// vec![Ok(0), Err(())].to_stream(scope)
44 /// .map_ok(|x| x + 1)
45 /// .inspect(|x| println!("seen: {:?}", x));
46 /// });
47 /// ```
48 fn map_ok<T2: 'static, L: FnMut(T) -> T2 + 'static>(self, logic: L) -> StreamVec<S, Result<T2, E>>;
49
50 /// Returns a new instance of `self` applying `logic` on all `Err` records.
51 ///
52 /// # Examples
53 /// ```
54 /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
55 ///
56 /// timely::example(|scope| {
57 /// vec![Ok(0), Err(())].to_stream(scope)
58 /// .map_err(|_| 1)
59 /// .inspect(|x| println!("seen: {:?}", x));
60 /// });
61 /// ```
62 fn map_err<E2: 'static, L: FnMut(E) -> E2 + 'static>(self, logic: L) -> StreamVec<S, Result<T, E2>>;
63
64 /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err`
65 /// records.
66 ///
67 /// # Examples
68 /// ```
69 /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
70 ///
71 /// timely::example(|scope| {
72 /// vec![Ok(0), Err(())].to_stream(scope)
73 /// .and_then(|x| Ok(1 + 1))
74 /// .inspect(|x| println!("seen: {:?}", x));
75 /// });
76 /// ```
77 fn and_then<T2: 'static, L: FnMut(T) -> Result<T2, E> + 'static>(
78 self,
79 logic: L,
80 ) -> StreamVec<S, Result<T2, E>>;
81
82 /// Returns a new instance of `self` applying `logic` on all `Ok` records.
83 ///
84 /// # Examples
85 /// ```
86 /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream};
87 ///
88 /// timely::example(|scope| {
89 /// vec![Ok(1), Err(())].to_stream(scope)
90 /// .unwrap_or_else(|_| 0)
91 /// .inspect(|x| println!("seen: {:?}", x));
92 /// });
93 /// ```
94 fn unwrap_or_else<L: FnMut(E) -> T + 'static>(self, logic: L) -> StreamVec<S, T>;
95}
96
97impl<S: Scope, T: 'static, E: 'static> ResultStream<S, T, E> for StreamVec<S, Result<T, E>> {
98 fn ok(self) -> StreamVec<S, T> {
99 self.flat_map(Result::ok)
100 }
101
102 fn err(self) -> StreamVec<S, E> {
103 self.flat_map(Result::err)
104 }
105
106 fn map_ok<T2: 'static, L: FnMut(T) -> T2 + 'static>(self, mut logic: L) -> StreamVec<S, Result<T2, E>> {
107 self.map(move |r| r.map(&mut logic))
108 }
109
110 fn map_err<E2: 'static, L: FnMut(E) -> E2 + 'static>(self, mut logic: L) -> StreamVec<S, Result<T, E2>> {
111 self.map(move |r| r.map_err(&mut logic))
112 }
113
114 fn and_then<T2: 'static, L: FnMut(T) -> Result<T2, E> + 'static>(self, mut logic: L) -> StreamVec<S, Result<T2, E>> {
115 self.map(move |r| r.and_then(&mut logic))
116 }
117
118 fn unwrap_or_else<L: FnMut(E) -> T + 'static>(self, mut logic: L) -> StreamVec<S, T> {
119 self.map(move |r| r.unwrap_or_else(&mut logic))
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use crate::dataflow::operators::{vec::{ToStream, ResultStream}, Capture, capture::Extract};
126
127 #[test]
128 fn test_ok() {
129 let output = crate::example(|scope| {
130 vec![Ok(0), Err(())].to_stream(scope)
131 .ok()
132 .capture()
133 });
134 assert_eq!(output.extract()[0].1, vec![0]);
135 }
136
137 #[test]
138 fn test_err() {
139 let output = crate::example(|scope| {
140 vec![Ok(0), Err(())].to_stream(scope)
141 .err()
142 .capture()
143 });
144 assert_eq!(output.extract()[0].1, vec![()]);
145 }
146
147 #[test]
148 fn test_map_ok() {
149 let output = crate::example(|scope| {
150 vec![Ok(0), Err(())].to_stream(scope)
151 .map_ok(|_| 10)
152 .capture()
153 });
154 assert_eq!(output.extract()[0].1, vec![Ok(10), Err(())]);
155 }
156
157 #[test]
158 fn test_map_err() {
159 let output = crate::example(|scope| {
160 vec![Ok(0), Err(())].to_stream(scope)
161 .map_err(|_| 10)
162 .capture()
163 });
164 assert_eq!(output.extract()[0].1, vec![Ok(0), Err(10)]);
165 }
166
167 #[test]
168 fn test_and_then() {
169 let output = crate::example(|scope| {
170 vec![Ok(0), Err(())].to_stream(scope)
171 .and_then(|_| Ok(1))
172 .capture()
173 });
174 assert_eq!(output.extract()[0].1, vec![Ok(1), Err(())]);
175 }
176
177 #[test]
178 fn test_unwrap_or_else() {
179 let output = crate::example(|scope| {
180 vec![Ok(0), Err(())].to_stream(scope)
181 .unwrap_or_else(|_| 10)
182 .capture()
183 });
184 assert_eq!(output.extract()[0].1, vec![0, 10]);
185 }
186}