rayon_core/
job.rs

1use crate::latch::Latch;
2use crate::unwind;
3use crossbeam_deque::{Injector, Steal};
4use std::any::Any;
5use std::cell::UnsafeCell;
6use std::mem;
7
8pub(super) enum JobResult<T> {
9    None,
10    Ok(T),
11    Panic(Box<dyn Any + Send>),
12}
13
14/// A `Job` is used to advertise work for other threads that they may
15/// want to steal. In accordance with time honored tradition, jobs are
16/// arranged in a deque, so that thieves can take from the top of the
17/// deque while the main worker manages the bottom of the deque. This
18/// deque is managed by the `thread_pool` module.
19pub(super) trait Job {
20    /// Unsafe: this may be called from a different thread than the one
21    /// which scheduled the job, so the implementer must ensure the
22    /// appropriate traits are met, whether `Send`, `Sync`, or both.
23    unsafe fn execute(this: *const Self);
24}
25
26/// Effectively a Job trait object. Each JobRef **must** be executed
27/// exactly once, or else data may leak.
28///
29/// Internally, we store the job's data in a `*const ()` pointer.  The
30/// true type is something like `*const StackJob<...>`, but we hide
31/// it. We also carry the "execute fn" from the `Job` trait.
32#[derive(Copy, Clone, Debug, PartialEq, Eq)]
33pub(super) struct JobRef {
34    pointer: *const (),
35    execute_fn: unsafe fn(*const ()),
36}
37
38unsafe impl Send for JobRef {}
39unsafe impl Sync for JobRef {}
40
41impl JobRef {
42    /// Unsafe: caller asserts that `data` will remain valid until the
43    /// job is executed.
44    pub(super) unsafe fn new<T>(data: *const T) -> JobRef
45    where
46        T: Job,
47    {
48        let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
49
50        // erase types:
51        JobRef {
52            pointer: data as *const (),
53            execute_fn: mem::transmute(fn_ptr),
54        }
55    }
56
57    #[inline]
58    pub(super) unsafe fn execute(&self) {
59        (self.execute_fn)(self.pointer)
60    }
61}
62
63/// A job that will be owned by a stack slot. This means that when it
64/// executes it need not free any heap data, the cleanup occurs when
65/// the stack frame is later popped.  The function parameter indicates
66/// `true` if the job was stolen -- executed on a different thread.
67pub(super) struct StackJob<L, F, R>
68where
69    L: Latch + Sync,
70    F: FnOnce(bool) -> R + Send,
71    R: Send,
72{
73    pub(super) latch: L,
74    func: UnsafeCell<Option<F>>,
75    result: UnsafeCell<JobResult<R>>,
76}
77
78impl<L, F, R> StackJob<L, F, R>
79where
80    L: Latch + Sync,
81    F: FnOnce(bool) -> R + Send,
82    R: Send,
83{
84    pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
85        StackJob {
86            latch,
87            func: UnsafeCell::new(Some(func)),
88            result: UnsafeCell::new(JobResult::None),
89        }
90    }
91
92    pub(super) unsafe fn as_job_ref(&self) -> JobRef {
93        JobRef::new(self)
94    }
95
96    pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
97        self.func.into_inner().unwrap()(stolen)
98    }
99
100    pub(super) unsafe fn into_result(self) -> R {
101        self.result.into_inner().into_return_value()
102    }
103}
104
105impl<L, F, R> Job for StackJob<L, F, R>
106where
107    L: Latch + Sync,
108    F: FnOnce(bool) -> R + Send,
109    R: Send,
110{
111    unsafe fn execute(this: *const Self) {
112        fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
113            move || func(true)
114        }
115
116        let this = &*this;
117        let abort = unwind::AbortIfPanic;
118        let func = (*this.func.get()).take().unwrap();
119        (*this.result.get()) = match unwind::halt_unwinding(call(func)) {
120            Ok(x) => JobResult::Ok(x),
121            Err(x) => JobResult::Panic(x),
122        };
123        this.latch.set();
124        mem::forget(abort);
125    }
126}
127
128/// Represents a job stored in the heap. Used to implement
129/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
130/// invokes a closure, which then triggers the appropriate logic to
131/// signal that the job executed.
132///
133/// (Probably `StackJob` should be refactored in a similar fashion.)
134pub(super) struct HeapJob<BODY>
135where
136    BODY: FnOnce() + Send,
137{
138    job: UnsafeCell<Option<BODY>>,
139}
140
141impl<BODY> HeapJob<BODY>
142where
143    BODY: FnOnce() + Send,
144{
145    pub(super) fn new(func: BODY) -> Self {
146        HeapJob {
147            job: UnsafeCell::new(Some(func)),
148        }
149    }
150
151    /// Creates a `JobRef` from this job -- note that this hides all
152    /// lifetimes, so it is up to you to ensure that this JobRef
153    /// doesn't outlive any data that it closes over.
154    pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
155        let this: *const Self = mem::transmute(self);
156        JobRef::new(this)
157    }
158}
159
160impl<BODY> Job for HeapJob<BODY>
161where
162    BODY: FnOnce() + Send,
163{
164    unsafe fn execute(this: *const Self) {
165        let this: Box<Self> = mem::transmute(this);
166        let job = (*this.job.get()).take().unwrap();
167        job();
168    }
169}
170
171impl<T> JobResult<T> {
172    /// Convert the `JobResult` for a job that has finished (and hence
173    /// its JobResult is populated) into its return value.
174    ///
175    /// NB. This will panic if the job panicked.
176    pub(super) fn into_return_value(self) -> T {
177        match self {
178            JobResult::None => unreachable!(),
179            JobResult::Ok(x) => x,
180            JobResult::Panic(x) => unwind::resume_unwinding(x),
181        }
182    }
183}
184
185/// Indirect queue to provide FIFO job priority.
186pub(super) struct JobFifo {
187    inner: Injector<JobRef>,
188}
189
190impl JobFifo {
191    pub(super) fn new() -> Self {
192        JobFifo {
193            inner: Injector::new(),
194        }
195    }
196
197    pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
198        // A little indirection ensures that spawns are always prioritized in FIFO order.  The
199        // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
200        // (FIFO), but either way they will end up popping from the front of this queue.
201        self.inner.push(job_ref);
202        JobRef::new(self)
203    }
204}
205
206impl Job for JobFifo {
207    unsafe fn execute(this: *const Self) {
208        // We "execute" a queue by executing its first job, FIFO.
209        loop {
210            match (*this).inner.steal() {
211                Steal::Success(job_ref) => break job_ref.execute(),
212                Steal::Empty => panic!("FIFO is empty"),
213                Steal::Retry => {}
214            }
215        }
216    }
217}