rayon_core/thread_pool/
mod.rs

1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3//!
4//! [`ThreadPool`]: struct.ThreadPool.html
5
6use crate::join;
7use crate::registry::{Registry, ThreadSpawn, WorkerThread};
8use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
9use crate::spawn;
10#[allow(deprecated)]
11use crate::Configuration;
12use crate::{scope, Scope};
13use crate::{scope_fifo, ScopeFifo};
14use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
15use std::error::Error;
16use std::fmt;
17use std::sync::Arc;
18
19mod test;
20
21/// Represents a user created [thread-pool].
22///
23/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
24/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
25/// execute functions explicitly within this [`ThreadPool`] using
26/// [`ThreadPool::install()`]. By contrast, top level rayon functions
27/// (like `join()`) will execute implicitly within the current thread-pool.
28///
29///
30/// ## Creating a ThreadPool
31///
32/// ```rust
33/// # use rayon_core as rayon;
34/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
35/// ```
36///
37/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
38/// threads. In addition, any other rayon operations called inside of `install()` will also
39/// execute in the context of the `ThreadPool`.
40///
41/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
42/// they will complete executing any remaining work that you have spawned, and automatically
43/// terminate.
44///
45///
46/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
47/// [`ThreadPool`]: struct.ThreadPool.html
48/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
49/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
50/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
51/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
52pub struct ThreadPool {
53    registry: Arc<Registry>,
54}
55
56impl ThreadPool {
57    #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
58    #[allow(deprecated)]
59    /// Deprecated in favor of `ThreadPoolBuilder::build`.
60    pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>> {
61        Self::build(configuration.into_builder()).map_err(Box::from)
62    }
63
64    pub(super) fn build<S>(
65        builder: ThreadPoolBuilder<S>,
66    ) -> Result<ThreadPool, ThreadPoolBuildError>
67    where
68        S: ThreadSpawn,
69    {
70        let registry = Registry::new(builder)?;
71        Ok(ThreadPool { registry })
72    }
73
74    /// Executes `op` within the threadpool. Any attempts to use
75    /// `join`, `scope`, or parallel iterators will then operate
76    /// within that threadpool.
77    ///
78    /// # Warning: thread-local data
79    ///
80    /// Because `op` is executing within the Rayon thread-pool,
81    /// thread-local data from the current thread will not be
82    /// accessible.
83    ///
84    /// # Panics
85    ///
86    /// If `op` should panic, that panic will be propagated.
87    ///
88    /// ## Using `install()`
89    ///
90    /// ```rust
91    ///    # use rayon_core as rayon;
92    ///    fn main() {
93    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
94    ///         let n = pool.install(|| fib(20));
95    ///         println!("{}", n);
96    ///    }
97    ///
98    ///    fn fib(n: usize) -> usize {
99    ///         if n == 0 || n == 1 {
100    ///             return n;
101    ///         }
102    ///         let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
103    ///         return a + b;
104    ///     }
105    /// ```
106    pub fn install<OP, R>(&self, op: OP) -> R
107    where
108        OP: FnOnce() -> R + Send,
109        R: Send,
110    {
111        self.registry.in_worker(|_, _| op())
112    }
113
114    /// Returns the (current) number of threads in the thread pool.
115    ///
116    /// # Future compatibility note
117    ///
118    /// Note that unless this thread-pool was created with a
119    /// [`ThreadPoolBuilder`] that specifies the number of threads,
120    /// then this number may vary over time in future versions (see [the
121    /// `num_threads()` method for details][snt]).
122    ///
123    /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
124    /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
125    #[inline]
126    pub fn current_num_threads(&self) -> usize {
127        self.registry.num_threads()
128    }
129
130    /// If called from a Rayon worker thread in this thread-pool,
131    /// returns the index of that thread; if not called from a Rayon
132    /// thread, or called from a Rayon thread that belongs to a
133    /// different thread-pool, returns `None`.
134    ///
135    /// The index for a given thread will not change over the thread's
136    /// lifetime. However, multiple threads may share the same index if
137    /// they are in distinct thread-pools.
138    ///
139    /// # Future compatibility note
140    ///
141    /// Currently, every thread-pool (including the global
142    /// thread-pool) has a fixed number of threads, but this may
143    /// change in future Rayon versions (see [the `num_threads()` method
144    /// for details][snt]). In that case, the index for a
145    /// thread would not change during its lifetime, but thread
146    /// indices may wind up being reused if threads are terminated and
147    /// restarted.
148    ///
149    /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
150    #[inline]
151    pub fn current_thread_index(&self) -> Option<usize> {
152        let curr = self.registry.current_thread()?;
153        Some(curr.index())
154    }
155
156    /// Returns true if the current worker thread currently has "local
157    /// tasks" pending. This can be useful as part of a heuristic for
158    /// deciding whether to spawn a new task or execute code on the
159    /// current thread, particularly in breadth-first
160    /// schedulers. However, keep in mind that this is an inherently
161    /// racy check, as other worker threads may be actively "stealing"
162    /// tasks from our local deque.
163    ///
164    /// **Background:** Rayon's uses a [work-stealing] scheduler. The
165    /// key idea is that each thread has its own [deque] of
166    /// tasks. Whenever a new task is spawned -- whether through
167    /// `join()`, `Scope::spawn()`, or some other means -- that new
168    /// task is pushed onto the thread's *local* deque. Worker threads
169    /// have a preference for executing their own tasks; if however
170    /// they run out of tasks, they will go try to "steal" tasks from
171    /// other threads. This function therefore has an inherent race
172    /// with other active worker threads, which may be removing items
173    /// from the local deque.
174    ///
175    /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
176    /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
177    #[inline]
178    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
179        let curr = self.registry.current_thread()?;
180        Some(!curr.local_deque_is_empty())
181    }
182
183    /// Execute `oper_a` and `oper_b` in the thread-pool and return
184    /// the results. Equivalent to `self.install(|| join(oper_a,
185    /// oper_b))`.
186    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
187    where
188        A: FnOnce() -> RA + Send,
189        B: FnOnce() -> RB + Send,
190        RA: Send,
191        RB: Send,
192    {
193        self.install(|| join(oper_a, oper_b))
194    }
195
196    /// Creates a scope that executes within this thread-pool.
197    /// Equivalent to `self.install(|| scope(...))`.
198    ///
199    /// See also: [the `scope()` function][scope].
200    ///
201    /// [scope]: fn.scope.html
202    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
203    where
204        OP: FnOnce(&Scope<'scope>) -> R + Send,
205        R: Send,
206    {
207        self.install(|| scope(op))
208    }
209
210    /// Creates a scope that executes within this thread-pool.
211    /// Spawns from the same thread are prioritized in relative FIFO order.
212    /// Equivalent to `self.install(|| scope_fifo(...))`.
213    ///
214    /// See also: [the `scope_fifo()` function][scope_fifo].
215    ///
216    /// [scope_fifo]: fn.scope_fifo.html
217    pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
218    where
219        OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
220        R: Send,
221    {
222        self.install(|| scope_fifo(op))
223    }
224
225    /// Creates a scope that spawns work into this thread-pool.
226    ///
227    /// See also: [the `in_place_scope()` function][in_place_scope].
228    ///
229    /// [in_place_scope]: fn.in_place_scope.html
230    pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
231    where
232        OP: FnOnce(&Scope<'scope>) -> R,
233    {
234        do_in_place_scope(Some(&self.registry), op)
235    }
236
237    /// Creates a scope that spawns work into this thread-pool in FIFO order.
238    ///
239    /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
240    ///
241    /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
242    pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
243    where
244        OP: FnOnce(&ScopeFifo<'scope>) -> R,
245    {
246        do_in_place_scope_fifo(Some(&self.registry), op)
247    }
248
249    /// Spawns an asynchronous task in this thread-pool. This task will
250    /// run in the implicit, global scope, which means that it may outlast
251    /// the current stack frame -- therefore, it cannot capture any references
252    /// onto the stack (you will likely need a `move` closure).
253    ///
254    /// See also: [the `spawn()` function defined on scopes][spawn].
255    ///
256    /// [spawn]: struct.Scope.html#method.spawn
257    pub fn spawn<OP>(&self, op: OP)
258    where
259        OP: FnOnce() + Send + 'static,
260    {
261        // We assert that `self.registry` has not terminated.
262        unsafe { spawn::spawn_in(op, &self.registry) }
263    }
264
265    /// Spawns an asynchronous task in this thread-pool. This task will
266    /// run in the implicit, global scope, which means that it may outlast
267    /// the current stack frame -- therefore, it cannot capture any references
268    /// onto the stack (you will likely need a `move` closure).
269    ///
270    /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
271    ///
272    /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
273    pub fn spawn_fifo<OP>(&self, op: OP)
274    where
275        OP: FnOnce() + Send + 'static,
276    {
277        // We assert that `self.registry` has not terminated.
278        unsafe { spawn::spawn_fifo_in(op, &self.registry) }
279    }
280}
281
282impl Drop for ThreadPool {
283    fn drop(&mut self) {
284        self.registry.terminate();
285    }
286}
287
288impl fmt::Debug for ThreadPool {
289    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
290        fmt.debug_struct("ThreadPool")
291            .field("num_threads", &self.current_num_threads())
292            .field("id", &self.registry.id())
293            .finish()
294    }
295}
296
297/// If called from a Rayon worker thread, returns the index of that
298/// thread within its current pool; if not called from a Rayon thread,
299/// returns `None`.
300///
301/// The index for a given thread will not change over the thread's
302/// lifetime. However, multiple threads may share the same index if
303/// they are in distinct thread-pools.
304///
305/// See also: [the `ThreadPool::current_thread_index()` method].
306///
307/// [m]: struct.ThreadPool.html#method.current_thread_index
308///
309/// # Future compatibility note
310///
311/// Currently, every thread-pool (including the global
312/// thread-pool) has a fixed number of threads, but this may
313/// change in future Rayon versions (see [the `num_threads()` method
314/// for details][snt]). In that case, the index for a
315/// thread would not change during its lifetime, but thread
316/// indices may wind up being reused if threads are terminated and
317/// restarted.
318///
319/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
320#[inline]
321pub fn current_thread_index() -> Option<usize> {
322    unsafe {
323        let curr = WorkerThread::current().as_ref()?;
324        Some(curr.index())
325    }
326}
327
328/// If called from a Rayon worker thread, indicates whether that
329/// thread's local deque still has pending tasks. Otherwise, returns
330/// `None`. For more information, see [the
331/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
332///
333/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
334#[inline]
335pub fn current_thread_has_pending_tasks() -> Option<bool> {
336    unsafe {
337        let curr = WorkerThread::current().as_ref()?;
338        Some(!curr.local_deque_is_empty())
339    }
340}