rayon/iter/collect/mod.rs
1use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
2use std::mem::MaybeUninit;
3use std::slice;
4
5mod consumer;
6use self::consumer::CollectConsumer;
7use self::consumer::CollectResult;
8use super::unzip::unzip_indexed;
9
10mod test;
11
12/// Collects the results of the exact iterator into the specified vector.
13///
14/// This is called by `IndexedParallelIterator::collect_into_vec`.
15pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>)
16where
17 I: IndexedParallelIterator<Item = T>,
18 T: Send,
19{
20 v.truncate(0); // clear any old data
21 let len = pi.len();
22 Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer));
23}
24
25/// Collects the results of the iterator into the specified vector.
26///
27/// Technically, this only works for `IndexedParallelIterator`, but we're faking a
28/// bit of specialization here until Rust can do that natively. Callers are
29/// using `opt_len` to find the length before calling this, and only exact
30/// iterators will return anything but `None` there.
31///
32/// Since the type system doesn't understand that contract, we have to allow
33/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
34/// `UnindexedConsumer`. That implementation panics `unreachable!` in case
35/// there's a bug where we actually do try to use this unindexed.
36fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
37where
38 I: ParallelIterator<Item = T>,
39 T: Send,
40{
41 Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer));
42}
43
44/// Unzips the results of the exact iterator into the specified vectors.
45///
46/// This is called by `IndexedParallelIterator::unzip_into_vecs`.
47pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
48where
49 I: IndexedParallelIterator<Item = (A, B)>,
50 A: Send,
51 B: Send,
52{
53 // clear any old data
54 left.truncate(0);
55 right.truncate(0);
56
57 let len = pi.len();
58 Collect::new(right, len).with_consumer(|right_consumer| {
59 let mut right_result = None;
60 Collect::new(left, len).with_consumer(|left_consumer| {
61 let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer);
62 right_result = Some(right_r);
63 left_r
64 });
65 right_result.unwrap()
66 });
67}
68
69/// Manage the collection vector.
70struct Collect<'c, T: Send> {
71 vec: &'c mut Vec<T>,
72 len: usize,
73}
74
75impl<'c, T: Send + 'c> Collect<'c, T> {
76 fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
77 Collect { vec, len }
78 }
79
80 /// Create a consumer on the slice of memory we are collecting into.
81 ///
82 /// The consumer needs to be used inside the scope function, and the
83 /// complete collect result passed back.
84 ///
85 /// This method will verify the collect result, and panic if the slice
86 /// was not fully written into. Otherwise, in the successful case,
87 /// the vector is complete with the collected result.
88 fn with_consumer<F>(mut self, scope_fn: F)
89 where
90 F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
91 {
92 let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
93 let result = scope_fn(CollectConsumer::new(slice));
94
95 // The CollectResult represents a contiguous part of the
96 // slice, that has been written to.
97 // On unwind here, the CollectResult will be dropped.
98 // If some producers on the way did not produce enough elements,
99 // partial CollectResults may have been dropped without
100 // being reduced to the final result, and we will see
101 // that as the length coming up short.
102 //
103 // Here, we assert that `slice` is fully initialized. This is
104 // checked by the following assert, which verifies if a
105 // complete CollectResult was produced; if the length is
106 // correct, it is necessarily covering the target slice.
107 // Since we know that the consumer cannot have escaped from
108 // `drive` (by parametricity, essentially), we know that any
109 // stores that will happen, have happened. Unless some code is buggy,
110 // that means we should have seen `len` total writes.
111 let actual_writes = result.len();
112 assert!(
113 actual_writes == self.len,
114 "expected {} total writes, but got {}",
115 self.len,
116 actual_writes
117 );
118
119 // Release the result's mutable borrow and "proxy ownership"
120 // of the elements, before the vector takes it over.
121 result.release_ownership();
122
123 let new_len = self.vec.len() + self.len;
124
125 unsafe {
126 self.vec.set_len(new_len);
127 }
128 }
129
130 /// Reserve space for `len` more elements in the vector,
131 /// and return a slice to the uninitialized tail of the vector
132 fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
133 // Reserve the new space.
134 vec.reserve(len);
135
136 // TODO: use `Vec::spare_capacity_mut` instead
137 // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
138 // as `T`, and we already made sure to have the additional space.
139 let start = vec.len();
140 let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
141 unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
142 }
143}
144
145/// Extends a vector with items from a parallel iterator.
146impl<T> ParallelExtend<T> for Vec<T>
147where
148 T: Send,
149{
150 fn par_extend<I>(&mut self, par_iter: I)
151 where
152 I: IntoParallelIterator<Item = T>,
153 {
154 // See the vec_collect benchmarks in rayon-demo for different strategies.
155 let par_iter = par_iter.into_par_iter();
156 match par_iter.opt_len() {
157 Some(len) => {
158 // When Rust gets specialization, we can get here for indexed iterators
159 // without relying on `opt_len`. Until then, `special_extend()` fakes
160 // an unindexed mode on the promise that `opt_len()` is accurate.
161 special_extend(par_iter, len, self);
162 }
163 None => {
164 // This works like `extend`, but `Vec::append` is more efficient.
165 let list = super::extend::collect(par_iter);
166 self.reserve(super::extend::len(&list));
167 for mut vec in list {
168 self.append(&mut vec);
169 }
170 }
171 }
172 }
173}