timely/dataflow/operators/result.rs
1//! Extension methods for `Stream` containing `Result`s.
2
3use crate::Data;
4use crate::dataflow::operators::Map;
5use crate::dataflow::{Scope, Stream};
6
7/// Extension trait for `Stream`.
8pub trait ResultStream<S: Scope, T: Data, E: Data> {
9 /// Returns a new instance of `self` containing only `ok` records.
10 ///
11 /// # Examples
12 /// ```
13 /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
14 ///
15 /// timely::example(|scope| {
16 /// vec![Ok(0), Err(())].to_stream(scope)
17 /// .ok()
18 /// .inspect(|x| println!("seen: {:?}", x));
19 /// });
20 /// ```
21 fn ok(&self) -> Stream<S, T>;
22
23 /// Returns a new instance of `self` containing only `err` records.
24 ///
25 /// # Examples
26 /// ```
27 /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
28 ///
29 /// timely::example(|scope| {
30 /// vec![Ok(0), Err(())].to_stream(scope)
31 /// .err()
32 /// .inspect(|x| println!("seen: {:?}", x));
33 /// });
34 /// ```
35 fn err(&self) -> Stream<S, E>;
36
37 /// Returns a new instance of `self` applying `logic` on all `Ok` records.
38 ///
39 /// # Examples
40 /// ```
41 /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
42 ///
43 /// timely::example(|scope| {
44 /// vec![Ok(0), Err(())].to_stream(scope)
45 /// .map_ok(|x| x + 1)
46 /// .inspect(|x| println!("seen: {:?}", x));
47 /// });
48 /// ```
49 fn map_ok<T2: Data, L: FnMut(T) -> T2 + 'static>(&self, logic: L) -> Stream<S, Result<T2, E>>;
50
51 /// Returns a new instance of `self` applying `logic` on all `Err` records.
52 ///
53 /// # Examples
54 /// ```
55 /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
56 ///
57 /// timely::example(|scope| {
58 /// vec![Ok(0), Err(())].to_stream(scope)
59 /// .map_err(|_| 1)
60 /// .inspect(|x| println!("seen: {:?}", x));
61 /// });
62 /// ```
63 fn map_err<E2: Data, L: FnMut(E) -> E2 + 'static>(&self, logic: L) -> Stream<S, Result<T, E2>>;
64
65 /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err`
66 /// records.
67 ///
68 /// # Examples
69 /// ```
70 /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
71 ///
72 /// timely::example(|scope| {
73 /// vec![Ok(0), Err(())].to_stream(scope)
74 /// .and_then(|x| Ok(1 + 1))
75 /// .inspect(|x| println!("seen: {:?}", x));
76 /// });
77 /// ```
78 fn and_then<T2: Data, L: FnMut(T) -> Result<T2, E> + 'static>(
79 &self,
80 logic: L,
81 ) -> Stream<S, Result<T2, E>>;
82
83 /// Returns a new instance of `self` applying `logic` on all `Ok` records.
84 ///
85 /// # Examples
86 /// ```
87 /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream};
88 ///
89 /// timely::example(|scope| {
90 /// vec![Ok(1), Err(())].to_stream(scope)
91 /// .unwrap_or_else(|_| 0)
92 /// .inspect(|x| println!("seen: {:?}", x));
93 /// });
94 /// ```
95 fn unwrap_or_else<L: FnMut(E) -> T + 'static>(&self, logic: L) -> Stream<S, T>;
96}
97
98impl<S: Scope, T: Data, E: Data> ResultStream<S, T, E> for Stream<S, Result<T, E>> {
99 fn ok(&self) -> Stream<S, T> {
100 self.flat_map(Result::ok)
101 }
102
103 fn err(&self) -> Stream<S, E> {
104 self.flat_map(Result::err)
105 }
106
107 fn map_ok<T2: Data, L: FnMut(T) -> T2 + 'static>(&self, mut logic: L) -> Stream<S, Result<T2, E>> {
108 self.map(move |r| r.map(&mut logic))
109 }
110
111 fn map_err<E2: Data, L: FnMut(E) -> E2 + 'static>(&self, mut logic: L) -> Stream<S, Result<T, E2>> {
112 self.map(move |r| r.map_err(&mut logic))
113 }
114
115 fn and_then<T2: Data, L: FnMut(T) -> Result<T2, E> + 'static>(&self, mut logic: L) -> Stream<S, Result<T2, E>> {
116 self.map(move |r| r.and_then(&mut logic))
117 }
118
119 fn unwrap_or_else<L: FnMut(E) -> T + 'static>(&self, mut logic: L) -> Stream<S, T> {
120 self.map(move |r| r.unwrap_or_else(&mut logic))
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use crate::dataflow::operators::{ToStream, ResultStream, Capture, capture::Extract};
127
128 #[test]
129 fn test_ok() {
130 let output = crate::example(|scope| {
131 vec![Ok(0), Err(())].to_stream(scope)
132 .ok()
133 .capture()
134 });
135 assert_eq!(output.extract()[0].1, vec![0]);
136 }
137
138 #[test]
139 fn test_err() {
140 let output = crate::example(|scope| {
141 vec![Ok(0), Err(())].to_stream(scope)
142 .err()
143 .capture()
144 });
145 assert_eq!(output.extract()[0].1, vec![()]);
146 }
147
148 #[test]
149 fn test_map_ok() {
150 let output = crate::example(|scope| {
151 vec![Ok(0), Err(())].to_stream(scope)
152 .map_ok(|_| 10)
153 .capture()
154 });
155 assert_eq!(output.extract()[0].1, vec![Ok(10), Err(())]);
156 }
157
158 #[test]
159 fn test_map_err() {
160 let output = crate::example(|scope| {
161 vec![Ok(0), Err(())].to_stream(scope)
162 .map_err(|_| 10)
163 .capture()
164 });
165 assert_eq!(output.extract()[0].1, vec![Ok(0), Err(10)]);
166 }
167
168 #[test]
169 fn test_and_then() {
170 let output = crate::example(|scope| {
171 vec![Ok(0), Err(())].to_stream(scope)
172 .and_then(|_| Ok(1))
173 .capture()
174 });
175 assert_eq!(output.extract()[0].1, vec![Ok(1), Err(())]);
176 }
177
178 #[test]
179 fn test_unwrap_or_else() {
180 let output = crate::example(|scope| {
181 vec![Ok(0), Err(())].to_stream(scope)
182 .unwrap_or_else(|_| 10)
183 .capture()
184 });
185 assert_eq!(output.extract()[0].1, vec![0, 10]);
186 }
187}