rayon/iter/
try_reduce_with.rs

1use super::plumbing::*;
2use super::ParallelIterator;
3
4use super::private::Try;
5use std::sync::atomic::{AtomicBool, Ordering};
6
7pub(super) fn try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T>
8where
9    PI: ParallelIterator<Item = T>,
10    R: Fn(T::Ok, T::Ok) -> T + Sync,
11    T: Try + Send,
12{
13    let full = AtomicBool::new(false);
14    let consumer = TryReduceWithConsumer {
15        reduce_op: &reduce_op,
16        full: &full,
17    };
18    pi.drive_unindexed(consumer)
19}
20
21struct TryReduceWithConsumer<'r, R> {
22    reduce_op: &'r R,
23    full: &'r AtomicBool,
24}
25
26impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {}
27
28impl<'r, R> Clone for TryReduceWithConsumer<'r, R> {
29    fn clone(&self) -> Self {
30        *self
31    }
32}
33
34impl<'r, R, T> Consumer<T> for TryReduceWithConsumer<'r, R>
35where
36    R: Fn(T::Ok, T::Ok) -> T + Sync,
37    T: Try + Send,
38{
39    type Folder = TryReduceWithFolder<'r, R, T>;
40    type Reducer = Self;
41    type Result = Option<T>;
42
43    fn split_at(self, _index: usize) -> (Self, Self, Self) {
44        (self, self, self)
45    }
46
47    fn into_folder(self) -> Self::Folder {
48        TryReduceWithFolder {
49            reduce_op: self.reduce_op,
50            opt_result: None,
51            full: self.full,
52        }
53    }
54
55    fn full(&self) -> bool {
56        self.full.load(Ordering::Relaxed)
57    }
58}
59
60impl<'r, R, T> UnindexedConsumer<T> for TryReduceWithConsumer<'r, R>
61where
62    R: Fn(T::Ok, T::Ok) -> T + Sync,
63    T: Try + Send,
64{
65    fn split_off_left(&self) -> Self {
66        *self
67    }
68
69    fn to_reducer(&self) -> Self::Reducer {
70        *self
71    }
72}
73
74impl<'r, R, T> Reducer<Option<T>> for TryReduceWithConsumer<'r, R>
75where
76    R: Fn(T::Ok, T::Ok) -> T + Sync,
77    T: Try,
78{
79    fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
80        let reduce_op = self.reduce_op;
81        match (left, right) {
82            (None, x) | (x, None) => x,
83            (Some(a), Some(b)) => match (a.into_result(), b.into_result()) {
84                (Ok(a), Ok(b)) => Some(reduce_op(a, b)),
85                (Err(e), _) | (_, Err(e)) => Some(T::from_error(e)),
86            },
87        }
88    }
89}
90
91struct TryReduceWithFolder<'r, R, T: Try> {
92    reduce_op: &'r R,
93    opt_result: Option<Result<T::Ok, T::Error>>,
94    full: &'r AtomicBool,
95}
96
97impl<'r, R, T> Folder<T> for TryReduceWithFolder<'r, R, T>
98where
99    R: Fn(T::Ok, T::Ok) -> T,
100    T: Try,
101{
102    type Result = Option<T>;
103
104    fn consume(self, item: T) -> Self {
105        let reduce_op = self.reduce_op;
106        let result = match self.opt_result {
107            None => item.into_result(),
108            Some(Ok(a)) => match item.into_result() {
109                Ok(b) => reduce_op(a, b).into_result(),
110                Err(e) => Err(e),
111            },
112            Some(Err(e)) => Err(e),
113        };
114        if result.is_err() {
115            self.full.store(true, Ordering::Relaxed)
116        }
117        TryReduceWithFolder {
118            opt_result: Some(result),
119            ..self
120        }
121    }
122
123    fn complete(self) -> Option<T> {
124        let result = self.opt_result?;
125        Some(match result {
126            Ok(ok) => T::from_ok(ok),
127            Err(error) => T::from_error(error),
128        })
129    }
130
131    fn full(&self) -> bool {
132        self.full.load(Ordering::Relaxed)
133    }
134}