rayon/iter/collect/
consumer.rs

1use super::super::plumbing::*;
2use std::marker::PhantomData;
3use std::mem::MaybeUninit;
4use std::ptr;
5use std::slice;
6
7pub(super) struct CollectConsumer<'c, T: Send> {
8    /// A slice covering the target memory, not yet initialized!
9    target: &'c mut [MaybeUninit<T>],
10}
11
12impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
13    /// The target memory is considered uninitialized, and will be
14    /// overwritten without reading or dropping existing values.
15    pub(super) fn new(target: &'c mut [MaybeUninit<T>]) -> Self {
16        CollectConsumer { target }
17    }
18}
19
20/// CollectResult represents an initialized part of the target slice.
21///
22/// This is a proxy owner of the elements in the slice; when it drops,
23/// the elements will be dropped, unless its ownership is released before then.
24#[must_use]
25pub(super) struct CollectResult<'c, T> {
26    /// A slice covering the target memory, initialized up to our separate `len`.
27    target: &'c mut [MaybeUninit<T>],
28    /// The current initialized length in `target`
29    len: usize,
30    /// Lifetime invariance guarantees that the data flows from consumer to result,
31    /// especially for the `scope_fn` callback in `Collect::with_consumer`.
32    invariant_lifetime: PhantomData<&'c mut &'c mut [T]>,
33}
34
35unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {}
36
37impl<'c, T> CollectResult<'c, T> {
38    /// The current length of the collect result
39    pub(super) fn len(&self) -> usize {
40        self.len
41    }
42
43    /// Release ownership of the slice of elements, and return the length
44    pub(super) fn release_ownership(mut self) -> usize {
45        let ret = self.len;
46        self.len = 0;
47        ret
48    }
49}
50
51impl<'c, T> Drop for CollectResult<'c, T> {
52    fn drop(&mut self) {
53        // Drop the first `self.len` elements, which have been recorded
54        // to be initialized by the folder.
55        unsafe {
56            // TODO: use `MaybeUninit::slice_as_mut_ptr`
57            let start = self.target.as_mut_ptr() as *mut T;
58            ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len));
59        }
60    }
61}
62
63impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
64    type Folder = CollectResult<'c, T>;
65    type Reducer = CollectReducer;
66    type Result = CollectResult<'c, T>;
67
68    fn split_at(self, index: usize) -> (Self, Self, CollectReducer) {
69        let CollectConsumer { target } = self;
70
71        // Produce new consumers. Normal slicing ensures that the
72        // memory range given to each consumer is disjoint.
73        let (left, right) = target.split_at_mut(index);
74        (
75            CollectConsumer::new(left),
76            CollectConsumer::new(right),
77            CollectReducer,
78        )
79    }
80
81    fn into_folder(self) -> Self::Folder {
82        // Create a result/folder that consumes values and writes them
83        // into target. The initial result has length 0.
84        CollectResult {
85            target: self.target,
86            len: 0,
87            invariant_lifetime: PhantomData,
88        }
89    }
90
91    fn full(&self) -> bool {
92        false
93    }
94}
95
96impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> {
97    type Result = Self;
98
99    fn consume(mut self, item: T) -> Self {
100        let dest = self
101            .target
102            .get_mut(self.len)
103            .expect("too many values pushed to consumer");
104
105        // Write item and increase the initialized length
106        unsafe {
107            dest.as_mut_ptr().write(item);
108            self.len += 1;
109        }
110
111        self
112    }
113
114    fn complete(self) -> Self::Result {
115        // NB: We don't explicitly check that the local writes were complete,
116        // but Collect will assert the total result length in the end.
117        self
118    }
119
120    fn full(&self) -> bool {
121        false
122    }
123}
124
125/// Pretend to be unindexed for `special_collect_into_vec`,
126/// but we should never actually get used that way...
127impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> {
128    fn split_off_left(&self) -> Self {
129        unreachable!("CollectConsumer must be indexed!")
130    }
131    fn to_reducer(&self) -> Self::Reducer {
132        CollectReducer
133    }
134}
135
136/// CollectReducer combines adjacent chunks; the result must always
137/// be contiguous so that it is one combined slice.
138pub(super) struct CollectReducer;
139
140impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer {
141    fn reduce(
142        self,
143        mut left: CollectResult<'c, T>,
144        right: CollectResult<'c, T>,
145    ) -> CollectResult<'c, T> {
146        // Merge if the CollectResults are adjacent and in left to right order
147        // else: drop the right piece now and total length will end up short in the end,
148        // when the correctness of the collected result is asserted.
149        let left_end = left.target[left.len..].as_ptr();
150        if left_end == right.target.as_ptr() {
151            let len = left.len + right.release_ownership();
152            unsafe {
153                left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len);
154            }
155            left.len = len;
156        }
157        left
158    }
159}