rayon/iter/collect/
mod.rs

1use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
2use std::mem::MaybeUninit;
3use std::slice;
4
5mod consumer;
6use self::consumer::CollectConsumer;
7use self::consumer::CollectResult;
8use super::unzip::unzip_indexed;
9
10mod test;
11
12/// Collects the results of the exact iterator into the specified vector.
13///
14/// This is called by `IndexedParallelIterator::collect_into_vec`.
15pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>)
16where
17    I: IndexedParallelIterator<Item = T>,
18    T: Send,
19{
20    v.truncate(0); // clear any old data
21    let len = pi.len();
22    Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer));
23}
24
25/// Collects the results of the iterator into the specified vector.
26///
27/// Technically, this only works for `IndexedParallelIterator`, but we're faking a
28/// bit of specialization here until Rust can do that natively.  Callers are
29/// using `opt_len` to find the length before calling this, and only exact
30/// iterators will return anything but `None` there.
31///
32/// Since the type system doesn't understand that contract, we have to allow
33/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
34/// `UnindexedConsumer`.  That implementation panics `unreachable!` in case
35/// there's a bug where we actually do try to use this unindexed.
36fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
37where
38    I: ParallelIterator<Item = T>,
39    T: Send,
40{
41    Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer));
42}
43
44/// Unzips the results of the exact iterator into the specified vectors.
45///
46/// This is called by `IndexedParallelIterator::unzip_into_vecs`.
47pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
48where
49    I: IndexedParallelIterator<Item = (A, B)>,
50    A: Send,
51    B: Send,
52{
53    // clear any old data
54    left.truncate(0);
55    right.truncate(0);
56
57    let len = pi.len();
58    Collect::new(right, len).with_consumer(|right_consumer| {
59        let mut right_result = None;
60        Collect::new(left, len).with_consumer(|left_consumer| {
61            let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer);
62            right_result = Some(right_r);
63            left_r
64        });
65        right_result.unwrap()
66    });
67}
68
69/// Manage the collection vector.
70struct Collect<'c, T: Send> {
71    vec: &'c mut Vec<T>,
72    len: usize,
73}
74
75impl<'c, T: Send + 'c> Collect<'c, T> {
76    fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
77        Collect { vec, len }
78    }
79
80    /// Create a consumer on the slice of memory we are collecting into.
81    ///
82    /// The consumer needs to be used inside the scope function, and the
83    /// complete collect result passed back.
84    ///
85    /// This method will verify the collect result, and panic if the slice
86    /// was not fully written into. Otherwise, in the successful case,
87    /// the vector is complete with the collected result.
88    fn with_consumer<F>(mut self, scope_fn: F)
89    where
90        F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
91    {
92        let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
93        let result = scope_fn(CollectConsumer::new(slice));
94
95        // The CollectResult represents a contiguous part of the
96        // slice, that has been written to.
97        // On unwind here, the CollectResult will be dropped.
98        // If some producers on the way did not produce enough elements,
99        // partial CollectResults may have been dropped without
100        // being reduced to the final result, and we will see
101        // that as the length coming up short.
102        //
103        // Here, we assert that `slice` is fully initialized. This is
104        // checked by the following assert, which verifies if a
105        // complete CollectResult was produced; if the length is
106        // correct, it is necessarily covering the target slice.
107        // Since we know that the consumer cannot have escaped from
108        // `drive` (by parametricity, essentially), we know that any
109        // stores that will happen, have happened. Unless some code is buggy,
110        // that means we should have seen `len` total writes.
111        let actual_writes = result.len();
112        assert!(
113            actual_writes == self.len,
114            "expected {} total writes, but got {}",
115            self.len,
116            actual_writes
117        );
118
119        // Release the result's mutable borrow and "proxy ownership"
120        // of the elements, before the vector takes it over.
121        result.release_ownership();
122
123        let new_len = self.vec.len() + self.len;
124
125        unsafe {
126            self.vec.set_len(new_len);
127        }
128    }
129
130    /// Reserve space for `len` more elements in the vector,
131    /// and return a slice to the uninitialized tail of the vector
132    fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
133        // Reserve the new space.
134        vec.reserve(len);
135
136        // TODO: use `Vec::spare_capacity_mut` instead
137        // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
138        // as `T`, and we already made sure to have the additional space.
139        let start = vec.len();
140        let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
141        unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
142    }
143}
144
145/// Extends a vector with items from a parallel iterator.
146impl<T> ParallelExtend<T> for Vec<T>
147where
148    T: Send,
149{
150    fn par_extend<I>(&mut self, par_iter: I)
151    where
152        I: IntoParallelIterator<Item = T>,
153    {
154        // See the vec_collect benchmarks in rayon-demo for different strategies.
155        let par_iter = par_iter.into_par_iter();
156        match par_iter.opt_len() {
157            Some(len) => {
158                // When Rust gets specialization, we can get here for indexed iterators
159                // without relying on `opt_len`.  Until then, `special_extend()` fakes
160                // an unindexed mode on the promise that `opt_len()` is accurate.
161                special_extend(par_iter, len, self);
162            }
163            None => {
164                // This works like `extend`, but `Vec::append` is more efficient.
165                let list = super::extend::collect(par_iter);
166                self.reserve(super::extend::len(&list));
167                for mut vec in list {
168                    self.append(&mut vec);
169                }
170            }
171        }
172    }
173}