rayon/iter/
par_bridge.rs

1use crossbeam_deque::{Steal, Stealer, Worker};
2
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::{Mutex, TryLockError};
5use std::thread::yield_now;
6
7use crate::current_num_threads;
8use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
9use crate::iter::ParallelIterator;
10
11/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
12///
13/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
14/// across the Rayon thread pool. This has the advantage of being able to parallelize just about
15/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
16/// `par_iter` instead. However, it can still be useful for iterators that are difficult to
17/// parallelize by other means, like channels or file or network I/O.
18///
19/// The resulting iterator is not guaranteed to keep the order of the original iterator.
20///
21/// # Examples
22///
23/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
24/// use any of the `ParallelIterator` methods:
25///
26/// ```
27/// use rayon::iter::ParallelBridge;
28/// use rayon::prelude::ParallelIterator;
29/// use std::sync::mpsc::channel;
30///
31/// let rx = {
32///     let (tx, rx) = channel();
33///
34///     tx.send("one!");
35///     tx.send("two!");
36///     tx.send("three!");
37///
38///     rx
39/// };
40///
41/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
42/// output.sort_unstable();
43///
44/// assert_eq!(&*output, &["one!", "three!", "two!"]);
45/// ```
46pub trait ParallelBridge: Sized {
47    /// Creates a bridge from this type to a `ParallelIterator`.
48    fn par_bridge(self) -> IterBridge<Self>;
49}
50
51impl<T: Iterator + Send> ParallelBridge for T
52where
53    T::Item: Send,
54{
55    fn par_bridge(self) -> IterBridge<Self> {
56        IterBridge { iter: self }
57    }
58}
59
60/// `IterBridge` is a parallel iterator that wraps a sequential iterator.
61///
62/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
63/// [`ParallelBridge`] documentation for details.
64///
65/// [`ParallelBridge`]: trait.ParallelBridge.html
66#[derive(Debug, Clone)]
67pub struct IterBridge<Iter> {
68    iter: Iter,
69}
70
71impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
72where
73    Iter::Item: Send,
74{
75    type Item = Iter::Item;
76
77    fn drive_unindexed<C>(self, consumer: C) -> C::Result
78    where
79        C: UnindexedConsumer<Self::Item>,
80    {
81        let split_count = AtomicUsize::new(current_num_threads());
82        let worker = Worker::new_fifo();
83        let stealer = worker.stealer();
84        let done = AtomicBool::new(false);
85        let iter = Mutex::new((self.iter, worker));
86
87        bridge_unindexed(
88            IterParallelProducer {
89                split_count: &split_count,
90                done: &done,
91                iter: &iter,
92                items: stealer,
93            },
94            consumer,
95        )
96    }
97}
98
99struct IterParallelProducer<'a, Iter: Iterator> {
100    split_count: &'a AtomicUsize,
101    done: &'a AtomicBool,
102    iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
103    items: Stealer<Iter::Item>,
104}
105
106// manual clone because T doesn't need to be Clone, but the derive assumes it should be
107impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
108    fn clone(&self) -> Self {
109        IterParallelProducer {
110            split_count: self.split_count,
111            done: self.done,
112            iter: self.iter,
113            items: self.items.clone(),
114        }
115    }
116}
117
118impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter>
119where
120    Iter::Item: Send,
121{
122    type Item = Iter::Item;
123
124    fn split(self) -> (Self, Option<Self>) {
125        let mut count = self.split_count.load(Ordering::SeqCst);
126
127        loop {
128            // Check if the iterator is exhausted *and* we've consumed every item from it.
129            let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
130
131            match count.checked_sub(1) {
132                Some(new_count) if !done => {
133                    match self.split_count.compare_exchange_weak(
134                        count,
135                        new_count,
136                        Ordering::SeqCst,
137                        Ordering::SeqCst,
138                    ) {
139                        Ok(_) => return (self.clone(), Some(self)),
140                        Err(last_count) => count = last_count,
141                    }
142                }
143                _ => {
144                    return (self, None);
145                }
146            }
147        }
148    }
149
150    fn fold_with<F>(self, mut folder: F) -> F
151    where
152        F: Folder<Self::Item>,
153    {
154        loop {
155            match self.items.steal() {
156                Steal::Success(it) => {
157                    folder = folder.consume(it);
158                    if folder.full() {
159                        return folder;
160                    }
161                }
162                Steal::Empty => {
163                    // Don't storm the mutex if we're already done.
164                    if self.done.load(Ordering::SeqCst) {
165                        // Someone might have pushed more between our `steal()` and `done.load()`
166                        if self.items.is_empty() {
167                            // The iterator is out of items, no use in continuing
168                            return folder;
169                        }
170                    } else {
171                        // our cache is out of items, time to load more from the iterator
172                        match self.iter.try_lock() {
173                            Ok(mut guard) => {
174                                // Check `done` again in case we raced with the previous lock
175                                // holder on its way out.
176                                if self.done.load(Ordering::SeqCst) {
177                                    if self.items.is_empty() {
178                                        return folder;
179                                    }
180                                    continue;
181                                }
182
183                                let count = current_num_threads();
184                                let count = (count * count) * 2;
185
186                                let (ref mut iter, ref worker) = *guard;
187
188                                // while worker.len() < count {
189                                // FIXME the new deque doesn't let us count items.  We can just
190                                // push a number of items, but that doesn't consider active
191                                // stealers elsewhere.
192                                for _ in 0..count {
193                                    if let Some(it) = iter.next() {
194                                        worker.push(it);
195                                    } else {
196                                        self.done.store(true, Ordering::SeqCst);
197                                        break;
198                                    }
199                                }
200                            }
201                            Err(TryLockError::WouldBlock) => {
202                                // someone else has the mutex, just sit tight until it's ready
203                                yield_now(); //TODO: use a thread-pool-aware yield? (#548)
204                            }
205                            Err(TryLockError::Poisoned(_)) => {
206                                // any panics from other threads will have been caught by the pool,
207                                // and will be re-thrown when joined - just exit
208                                return folder;
209                            }
210                        }
211                    }
212                }
213                Steal::Retry => (),
214            }
215        }
216    }
217}