rayon_core/scope/
mod.rs

1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`scope()`]: fn.scope.html
5//! [`in_place_scope()`]: fn.in_place_scope.html
6//! [`join()`]: ../join/join.fn.html
7
8use crate::job::{HeapJob, JobFifo};
9use crate::latch::{CountLatch, CountLockLatch, Latch};
10use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
11use crate::unwind;
12use std::any::Any;
13use std::fmt;
14use std::marker::PhantomData;
15use std::mem;
16use std::ptr;
17use std::sync::atomic::{AtomicPtr, Ordering};
18use std::sync::Arc;
19
20#[cfg(test)]
21mod test;
22
23/// Represents a fork-join scope which can be used to spawn any number of tasks.
24/// See [`scope()`] for more information.
25///
26///[`scope()`]: fn.scope.html
27pub struct Scope<'scope> {
28    base: ScopeBase<'scope>,
29}
30
31/// Represents a fork-join scope which can be used to spawn any number of tasks.
32/// Those spawned from the same thread are prioritized in relative FIFO order.
33/// See [`scope_fifo()`] for more information.
34///
35///[`scope_fifo()`]: fn.scope_fifo.html
36pub struct ScopeFifo<'scope> {
37    base: ScopeBase<'scope>,
38    fifos: Vec<JobFifo>,
39}
40
41enum ScopeLatch {
42    /// A latch for scopes created on a rayon thread which will participate in work-
43    /// stealing while it waits for completion. This thread is not necessarily part
44    /// of the same registry as the scope itself!
45    Stealing {
46        latch: CountLatch,
47        /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
48        /// with registry B, when a job completes in a thread of registry B, we may
49        /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
50        /// That means we need a reference to registry A (since at that point we will
51        /// only have a reference to registry B), so we stash it here.
52        registry: Arc<Registry>,
53        /// The index of the worker to wake in `registry`
54        worker_index: usize,
55    },
56
57    /// A latch for scopes created on a non-rayon thread which will block to wait.
58    Blocking { latch: CountLockLatch },
59}
60
61struct ScopeBase<'scope> {
62    /// thread registry where `scope()` was executed or where `in_place_scope()`
63    /// should spawn jobs.
64    registry: Arc<Registry>,
65
66    /// if some job panicked, the error is stored here; it will be
67    /// propagated to the one who created the scope
68    panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
69
70    /// latch to track job counts
71    job_completed_latch: ScopeLatch,
72
73    /// You can think of a scope as containing a list of closures to execute,
74    /// all of which outlive `'scope`.  They're not actually required to be
75    /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
76    /// the closures are only *moved* across threads to be executed.
77    marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
78}
79
80/// Creates a "fork-join" scope `s` and invokes the closure with a
81/// reference to `s`. This closure can then spawn asynchronous tasks
82/// into `s`. Those tasks may run asynchronously with respect to the
83/// closure; they may themselves spawn additional tasks into `s`. When
84/// the closure returns, it will block until all tasks that have been
85/// spawned into `s` complete.
86///
87/// `scope()` is a more flexible building block compared to `join()`,
88/// since a loop can be used to spawn any number of tasks without
89/// recursing. However, that flexibility comes at a performance price:
90/// tasks spawned using `scope()` must be allocated onto the heap,
91/// whereas `join()` can make exclusive use of the stack. **Prefer
92/// `join()` (or, even better, parallel iterators) where possible.**
93///
94/// # Example
95///
96/// The Rayon `join()` function launches two closures and waits for them
97/// to stop. One could implement `join()` using a scope like so, although
98/// it would be less efficient than the real implementation:
99///
100/// ```rust
101/// # use rayon_core as rayon;
102/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
103///     where A: FnOnce() -> RA + Send,
104///           B: FnOnce() -> RB + Send,
105///           RA: Send,
106///           RB: Send,
107/// {
108///     let mut result_a: Option<RA> = None;
109///     let mut result_b: Option<RB> = None;
110///     rayon::scope(|s| {
111///         s.spawn(|_| result_a = Some(oper_a()));
112///         s.spawn(|_| result_b = Some(oper_b()));
113///     });
114///     (result_a.unwrap(), result_b.unwrap())
115/// }
116/// ```
117///
118/// # A note on threading
119///
120/// The closure given to `scope()` executes in the Rayon thread-pool,
121/// as do those given to `spawn()`. This means that you can't access
122/// thread-local variables (well, you can, but they may have
123/// unexpected values).
124///
125/// # Task execution
126///
127/// Task execution potentially starts as soon as `spawn()` is called.
128/// The task will end sometime before `scope()` returns. Note that the
129/// *closure* given to scope may return much earlier. In general
130/// the lifetime of a scope created like `scope(body) goes something like this:
131///
132/// - Scope begins when `scope(body)` is called
133/// - Scope body `body()` is invoked
134///     - Scope tasks may be spawned
135/// - Scope body returns
136/// - Scope tasks execute, possibly spawning more tasks
137/// - Once all tasks are done, scope ends and `scope()` returns
138///
139/// To see how and when tasks are joined, consider this example:
140///
141/// ```rust
142/// # use rayon_core as rayon;
143/// // point start
144/// rayon::scope(|s| {
145///     s.spawn(|s| { // task s.1
146///         s.spawn(|s| { // task s.1.1
147///             rayon::scope(|t| {
148///                 t.spawn(|_| ()); // task t.1
149///                 t.spawn(|_| ()); // task t.2
150///             });
151///         });
152///     });
153///     s.spawn(|s| { // task s.2
154///     });
155///     // point mid
156/// });
157/// // point end
158/// ```
159///
160/// The various tasks that are run will execute roughly like so:
161///
162/// ```notrust
163/// | (start)
164/// |
165/// | (scope `s` created)
166/// +-----------------------------------------------+ (task s.2)
167/// +-------+ (task s.1)                            |
168/// |       |                                       |
169/// |       +---+ (task s.1.1)                      |
170/// |       |   |                                   |
171/// |       |   | (scope `t` created)               |
172/// |       |   +----------------+ (task t.2)       |
173/// |       |   +---+ (task t.1) |                  |
174/// | (mid) |   |   |            |                  |
175/// :       |   + <-+------------+ (scope `t` ends) |
176/// :       |   |                                   |
177/// |<------+---+-----------------------------------+ (scope `s` ends)
178/// |
179/// | (end)
180/// ```
181///
182/// The point here is that everything spawned into scope `s` will
183/// terminate (at latest) at the same point -- right before the
184/// original call to `rayon::scope` returns. This includes new
185/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
186/// scope is created (such as `t`), the things spawned into that scope
187/// will be joined before that scope returns, which in turn occurs
188/// before the creating task (task `s.1.1` in this case) finishes.
189///
190/// There is no guaranteed order of execution for spawns in a scope,
191/// given that other threads may steal tasks at any time. However, they
192/// are generally prioritized in a LIFO order on the thread from which
193/// they were spawned. So in this example, absent any stealing, we can
194/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
195/// threads always steal from the other end of the deque, like FIFO
196/// order.  The idea is that "recent" tasks are most likely to be fresh
197/// in the local CPU's cache, while other threads can steal older
198/// "stale" tasks.  For an alternate approach, consider
199/// [`scope_fifo()`] instead.
200///
201/// [`scope_fifo()`]: fn.scope_fifo.html
202///
203/// # Accessing stack data
204///
205/// In general, spawned tasks may access stack data in place that
206/// outlives the scope itself. Other data must be fully owned by the
207/// spawned task.
208///
209/// ```rust
210/// # use rayon_core as rayon;
211/// let ok: Vec<i32> = vec![1, 2, 3];
212/// rayon::scope(|s| {
213///     let bad: Vec<i32> = vec![4, 5, 6];
214///     s.spawn(|_| {
215///         // We can access `ok` because outlives the scope `s`.
216///         println!("ok: {:?}", ok);
217///
218///         // If we just try to use `bad` here, the closure will borrow `bad`
219///         // (because we are just printing it out, and that only requires a
220///         // borrow), which will result in a compilation error. Read on
221///         // for options.
222///         // println!("bad: {:?}", bad);
223///    });
224/// });
225/// ```
226///
227/// As the comments example above suggest, to reference `bad` we must
228/// take ownership of it. One way to do this is to detach the closure
229/// from the surrounding stack frame, using the `move` keyword. This
230/// will cause it to take ownership of *all* the variables it touches,
231/// in this case including both `ok` *and* `bad`:
232///
233/// ```rust
234/// # use rayon_core as rayon;
235/// let ok: Vec<i32> = vec![1, 2, 3];
236/// rayon::scope(|s| {
237///     let bad: Vec<i32> = vec![4, 5, 6];
238///     s.spawn(move |_| {
239///         println!("ok: {:?}", ok);
240///         println!("bad: {:?}", bad);
241///     });
242///
243///     // That closure is fine, but now we can't use `ok` anywhere else,
244///     // since it is owend by the previous task:
245///     // s.spawn(|_| println!("ok: {:?}", ok));
246/// });
247/// ```
248///
249/// While this works, it could be a problem if we want to use `ok` elsewhere.
250/// There are two choices. We can keep the closure as a `move` closure, but
251/// instead of referencing the variable `ok`, we create a shadowed variable that
252/// is a borrow of `ok` and capture *that*:
253///
254/// ```rust
255/// # use rayon_core as rayon;
256/// let ok: Vec<i32> = vec![1, 2, 3];
257/// rayon::scope(|s| {
258///     let bad: Vec<i32> = vec![4, 5, 6];
259///     let ok: &Vec<i32> = &ok; // shadow the original `ok`
260///     s.spawn(move |_| {
261///         println!("ok: {:?}", ok); // captures the shadowed version
262///         println!("bad: {:?}", bad);
263///     });
264///
265///     // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
266///     // can be shared freely. Note that we need a `move` closure here though,
267///     // because otherwise we'd be trying to borrow the shadowed `ok`,
268///     // and that doesn't outlive `scope`.
269///     s.spawn(move |_| println!("ok: {:?}", ok));
270/// });
271/// ```
272///
273/// Another option is not to use the `move` keyword but instead to take ownership
274/// of individual variables:
275///
276/// ```rust
277/// # use rayon_core as rayon;
278/// let ok: Vec<i32> = vec![1, 2, 3];
279/// rayon::scope(|s| {
280///     let bad: Vec<i32> = vec![4, 5, 6];
281///     s.spawn(|_| {
282///         // Transfer ownership of `bad` into a local variable (also named `bad`).
283///         // This will force the closure to take ownership of `bad` from the environment.
284///         let bad = bad;
285///         println!("ok: {:?}", ok); // `ok` is only borrowed.
286///         println!("bad: {:?}", bad); // refers to our local variable, above.
287///     });
288///
289///     s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
290/// });
291/// ```
292///
293/// # Panics
294///
295/// If a panic occurs, either in the closure given to `scope()` or in
296/// any of the spawned jobs, that panic will be propagated and the
297/// call to `scope()` will panic. If multiple panics occurs, it is
298/// non-deterministic which of their panic values will propagate.
299/// Regardless, once a task is spawned using `scope.spawn()`, it will
300/// execute, even if the spawning task should later panic. `scope()`
301/// returns once all spawned jobs have completed, and any panics are
302/// propagated at that point.
303pub fn scope<'scope, OP, R>(op: OP) -> R
304where
305    OP: FnOnce(&Scope<'scope>) -> R + Send,
306    R: Send,
307{
308    in_worker(|owner_thread, _| {
309        let scope = Scope::<'scope>::new(Some(owner_thread), None);
310        scope.base.complete(Some(owner_thread), || op(&scope))
311    })
312}
313
314/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
315/// closure with a reference to `s`. This closure can then spawn
316/// asynchronous tasks into `s`. Those tasks may run asynchronously with
317/// respect to the closure; they may themselves spawn additional tasks
318/// into `s`. When the closure returns, it will block until all tasks
319/// that have been spawned into `s` complete.
320///
321/// # Task execution
322///
323/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
324/// difference in the order of execution. Consider a similar example:
325///
326/// [`scope()`]: fn.scope.html
327///
328/// ```rust
329/// # use rayon_core as rayon;
330/// // point start
331/// rayon::scope_fifo(|s| {
332///     s.spawn_fifo(|s| { // task s.1
333///         s.spawn_fifo(|s| { // task s.1.1
334///             rayon::scope_fifo(|t| {
335///                 t.spawn_fifo(|_| ()); // task t.1
336///                 t.spawn_fifo(|_| ()); // task t.2
337///             });
338///         });
339///     });
340///     s.spawn_fifo(|s| { // task s.2
341///     });
342///     // point mid
343/// });
344/// // point end
345/// ```
346///
347/// The various tasks that are run will execute roughly like so:
348///
349/// ```notrust
350/// | (start)
351/// |
352/// | (FIFO scope `s` created)
353/// +--------------------+ (task s.1)
354/// +-------+ (task s.2) |
355/// |       |            +---+ (task s.1.1)
356/// |       |            |   |
357/// |       |            |   | (FIFO scope `t` created)
358/// |       |            |   +----------------+ (task t.1)
359/// |       |            |   +---+ (task t.2) |
360/// | (mid) |            |   |   |            |
361/// :       |            |   + <-+------------+ (scope `t` ends)
362/// :       |            |   |
363/// |<------+------------+---+ (scope `s` ends)
364/// |
365/// | (end)
366/// ```
367///
368/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
369/// the thread from which they were spawned, as opposed to `scope()`'s
370/// LIFO.  So in this example, we can expect `s.1` to execute before
371/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
372/// FIFO order, as usual. Overall, this has roughly the same order as
373/// the now-deprecated [`breadth_first`] option, except the effect is
374/// isolated to a particular scope. If spawns are intermingled from any
375/// combination of `scope()` and `scope_fifo()`, or from different
376/// threads, their order is only specified with respect to spawns in the
377/// same scope and thread.
378///
379/// For more details on this design, see Rayon [RFC #1].
380///
381/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
382/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
383///
384/// # Panics
385///
386/// If a panic occurs, either in the closure given to `scope_fifo()` or
387/// in any of the spawned jobs, that panic will be propagated and the
388/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
389/// non-deterministic which of their panic values will propagate.
390/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
391/// will execute, even if the spawning task should later panic.
392/// `scope_fifo()` returns once all spawned jobs have completed, and any
393/// panics are propagated at that point.
394pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
395where
396    OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
397    R: Send,
398{
399    in_worker(|owner_thread, _| {
400        let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
401        scope.base.complete(Some(owner_thread), || op(&scope))
402    })
403}
404
405/// Creates a "fork-join" scope `s` and invokes the closure with a
406/// reference to `s`. This closure can then spawn asynchronous tasks
407/// into `s`. Those tasks may run asynchronously with respect to the
408/// closure; they may themselves spawn additional tasks into `s`. When
409/// the closure returns, it will block until all tasks that have been
410/// spawned into `s` complete.
411///
412/// This is just like `scope()` except the closure runs on the same thread
413/// that calls `in_place_scope()`. Only work that it spawns runs in the
414/// thread pool.
415///
416/// # Panics
417///
418/// If a panic occurs, either in the closure given to `in_place_scope()` or in
419/// any of the spawned jobs, that panic will be propagated and the
420/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
421/// non-deterministic which of their panic values will propagate.
422/// Regardless, once a task is spawned using `scope.spawn()`, it will
423/// execute, even if the spawning task should later panic. `in_place_scope()`
424/// returns once all spawned jobs have completed, and any panics are
425/// propagated at that point.
426pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
427where
428    OP: FnOnce(&Scope<'scope>) -> R,
429{
430    do_in_place_scope(None, op)
431}
432
433pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
434where
435    OP: FnOnce(&Scope<'scope>) -> R,
436{
437    let thread = unsafe { WorkerThread::current().as_ref() };
438    let scope = Scope::<'scope>::new(thread, registry);
439    scope.base.complete(thread, || op(&scope))
440}
441
442/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
443/// closure with a reference to `s`. This closure can then spawn
444/// asynchronous tasks into `s`. Those tasks may run asynchronously with
445/// respect to the closure; they may themselves spawn additional tasks
446/// into `s`. When the closure returns, it will block until all tasks
447/// that have been spawned into `s` complete.
448///
449/// This is just like `scope_fifo()` except the closure runs on the same thread
450/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
451/// thread pool.
452///
453/// # Panics
454///
455/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
456/// any of the spawned jobs, that panic will be propagated and the
457/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
458/// non-deterministic which of their panic values will propagate.
459/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
460/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
461/// returns once all spawned jobs have completed, and any panics are
462/// propagated at that point.
463pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
464where
465    OP: FnOnce(&ScopeFifo<'scope>) -> R,
466{
467    do_in_place_scope_fifo(None, op)
468}
469
470pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
471where
472    OP: FnOnce(&ScopeFifo<'scope>) -> R,
473{
474    let thread = unsafe { WorkerThread::current().as_ref() };
475    let scope = ScopeFifo::<'scope>::new(thread, registry);
476    scope.base.complete(thread, || op(&scope))
477}
478
479impl<'scope> Scope<'scope> {
480    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
481        let base = ScopeBase::new(owner, registry);
482        Scope { base }
483    }
484
485    /// Spawns a job into the fork-join scope `self`. This job will
486    /// execute sometime before the fork-join scope completes.  The
487    /// job is specified as a closure, and this closure receives its
488    /// own reference to the scope `self` as argument. This can be
489    /// used to inject new jobs into `self`.
490    ///
491    /// # Returns
492    ///
493    /// Nothing. The spawned closures cannot pass back values to the
494    /// caller directly, though they can write to local variables on
495    /// the stack (if those variables outlive the scope) or
496    /// communicate through shared channels.
497    ///
498    /// (The intention is to eventualy integrate with Rust futures to
499    /// support spawns of functions that compute a value.)
500    ///
501    /// # Examples
502    ///
503    /// ```rust
504    /// # use rayon_core as rayon;
505    /// let mut value_a = None;
506    /// let mut value_b = None;
507    /// let mut value_c = None;
508    /// rayon::scope(|s| {
509    ///     s.spawn(|s1| {
510    ///           // ^ this is the same scope as `s`; this handle `s1`
511    ///           //   is intended for use by the spawned task,
512    ///           //   since scope handles cannot cross thread boundaries.
513    ///
514    ///         value_a = Some(22);
515    ///
516    ///         // the scope `s` will not end until all these tasks are done
517    ///         s1.spawn(|_| {
518    ///             value_b = Some(44);
519    ///         });
520    ///     });
521    ///
522    ///     s.spawn(|_| {
523    ///         value_c = Some(66);
524    ///     });
525    /// });
526    /// assert_eq!(value_a, Some(22));
527    /// assert_eq!(value_b, Some(44));
528    /// assert_eq!(value_c, Some(66));
529    /// ```
530    ///
531    /// # See also
532    ///
533    /// The [`scope` function] has more extensive documentation about
534    /// task spawning.
535    ///
536    /// [`scope` function]: fn.scope.html
537    pub fn spawn<BODY>(&self, body: BODY)
538    where
539        BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
540    {
541        self.base.increment();
542        unsafe {
543            let job_ref = Box::new(HeapJob::new(move || {
544                self.base.execute_job(move || body(self))
545            }))
546            .as_job_ref();
547
548            // Since `Scope` implements `Sync`, we can't be sure that we're still in a
549            // thread of this pool, so we can't just push to the local worker thread.
550            // Also, this might be an in-place scope.
551            self.base.registry.inject_or_push(job_ref);
552        }
553    }
554}
555
556impl<'scope> ScopeFifo<'scope> {
557    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
558        let base = ScopeBase::new(owner, registry);
559        let num_threads = base.registry.num_threads();
560        let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
561        ScopeFifo { base, fifos }
562    }
563
564    /// Spawns a job into the fork-join scope `self`. This job will
565    /// execute sometime before the fork-join scope completes.  The
566    /// job is specified as a closure, and this closure receives its
567    /// own reference to the scope `self` as argument. This can be
568    /// used to inject new jobs into `self`.
569    ///
570    /// # See also
571    ///
572    /// This method is akin to [`Scope::spawn()`], but with a FIFO
573    /// priority.  The [`scope_fifo` function] has more details about
574    /// this distinction.
575    ///
576    /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
577    /// [`scope_fifo` function]: fn.scope_fifo.html
578    pub fn spawn_fifo<BODY>(&self, body: BODY)
579    where
580        BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
581    {
582        self.base.increment();
583        unsafe {
584            let job_ref = Box::new(HeapJob::new(move || {
585                self.base.execute_job(move || body(self))
586            }))
587            .as_job_ref();
588
589            // If we're in the pool, use our scope's private fifo for this thread to execute
590            // in a locally-FIFO order.  Otherwise, just use the pool's global injector.
591            match self.base.registry.current_thread() {
592                Some(worker) => {
593                    let fifo = &self.fifos[worker.index()];
594                    worker.push(fifo.push(job_ref));
595                }
596                None => self.base.registry.inject(&[job_ref]),
597            }
598        }
599    }
600}
601
602impl<'scope> ScopeBase<'scope> {
603    /// Creates the base of a new scope for the given registry
604    fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
605        let registry = registry.unwrap_or_else(|| match owner {
606            Some(owner) => owner.registry(),
607            None => global_registry(),
608        });
609
610        ScopeBase {
611            registry: Arc::clone(registry),
612            panic: AtomicPtr::new(ptr::null_mut()),
613            job_completed_latch: ScopeLatch::new(owner),
614            marker: PhantomData,
615        }
616    }
617
618    fn increment(&self) {
619        self.job_completed_latch.increment();
620    }
621
622    /// Executes `func` as a job, either aborting or executing as
623    /// appropriate.
624    fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
625    where
626        FUNC: FnOnce() -> R,
627    {
628        let result = self.execute_job_closure(func);
629        self.job_completed_latch.wait(owner);
630        self.maybe_propagate_panic();
631        result.unwrap() // only None if `op` panicked, and that would have been propagated
632    }
633
634    /// Executes `func` as a job, either aborting or executing as
635    /// appropriate.
636    fn execute_job<FUNC>(&self, func: FUNC)
637    where
638        FUNC: FnOnce(),
639    {
640        let _: Option<()> = self.execute_job_closure(func);
641    }
642
643    /// Executes `func` as a job in scope. Adjusts the "job completed"
644    /// counters and also catches any panic and stores it into
645    /// `scope`.
646    fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
647    where
648        FUNC: FnOnce() -> R,
649    {
650        match unwind::halt_unwinding(func) {
651            Ok(r) => {
652                self.job_completed_latch.set();
653                Some(r)
654            }
655            Err(err) => {
656                self.job_panicked(err);
657                self.job_completed_latch.set();
658                None
659            }
660        }
661    }
662
663    fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
664        // capture the first error we see, free the rest
665        let nil = ptr::null_mut();
666        let mut err = Box::new(err); // box up the fat ptr
667        if self
668            .panic
669            .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
670            .is_ok()
671        {
672            mem::forget(err); // ownership now transferred into self.panic
673        }
674    }
675
676    fn maybe_propagate_panic(&self) {
677        // propagate panic, if any occurred; at this point, all
678        // outstanding jobs have completed, so we can use a relaxed
679        // ordering:
680        let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
681        if !panic.is_null() {
682            let value = unsafe { Box::from_raw(panic) };
683            unwind::resume_unwinding(*value);
684        }
685    }
686}
687
688impl ScopeLatch {
689    fn new(owner: Option<&WorkerThread>) -> Self {
690        match owner {
691            Some(owner) => ScopeLatch::Stealing {
692                latch: CountLatch::new(),
693                registry: Arc::clone(owner.registry()),
694                worker_index: owner.index(),
695            },
696            None => ScopeLatch::Blocking {
697                latch: CountLockLatch::new(),
698            },
699        }
700    }
701
702    fn increment(&self) {
703        match self {
704            ScopeLatch::Stealing { latch, .. } => latch.increment(),
705            ScopeLatch::Blocking { latch } => latch.increment(),
706        }
707    }
708
709    fn set(&self) {
710        match self {
711            ScopeLatch::Stealing {
712                latch,
713                registry,
714                worker_index,
715            } => latch.set_and_tickle_one(registry, *worker_index),
716            ScopeLatch::Blocking { latch } => latch.set(),
717        }
718    }
719
720    fn wait(&self, owner: Option<&WorkerThread>) {
721        match self {
722            ScopeLatch::Stealing {
723                latch,
724                registry,
725                worker_index,
726            } => unsafe {
727                let owner = owner.expect("owner thread");
728                debug_assert_eq!(registry.id(), owner.registry().id());
729                debug_assert_eq!(*worker_index, owner.index());
730                owner.wait_until(latch);
731            },
732            ScopeLatch::Blocking { latch } => latch.wait(),
733        }
734    }
735}
736
737impl<'scope> fmt::Debug for Scope<'scope> {
738    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
739        fmt.debug_struct("Scope")
740            .field("pool_id", &self.base.registry.id())
741            .field("panic", &self.base.panic)
742            .field("job_completed_latch", &self.base.job_completed_latch)
743            .finish()
744    }
745}
746
747impl<'scope> fmt::Debug for ScopeFifo<'scope> {
748    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
749        fmt.debug_struct("ScopeFifo")
750            .field("num_fifos", &self.fifos.len())
751            .field("pool_id", &self.base.registry.id())
752            .field("panic", &self.base.panic)
753            .field("job_completed_latch", &self.base.job_completed_latch)
754            .finish()
755    }
756}
757
758impl fmt::Debug for ScopeLatch {
759    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
760        match self {
761            ScopeLatch::Stealing { latch, .. } => fmt
762                .debug_tuple("ScopeLatch::Stealing")
763                .field(latch)
764                .finish(),
765            ScopeLatch::Blocking { latch } => fmt
766                .debug_tuple("ScopeLatch::Blocking")
767                .field(latch)
768                .finish(),
769        }
770    }
771}