rayon_core/sleep/
mod.rs

1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use crate::latch::CoreLatch;
5use crate::log::Event::*;
6use crate::log::Logger;
7use crossbeam_utils::CachePadded;
8use std::sync::atomic::Ordering;
9use std::sync::{Condvar, Mutex};
10use std::thread;
11use std::usize;
12
13mod counters;
14use self::counters::{AtomicCounters, JobsEventCounter};
15
16/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
17/// of workers. It has callbacks that are invoked periodically at significant events,
18/// such as when workers are looping and looking for work, when latches are set, or when
19/// jobs are published, and it either blocks threads or wakes them in response to these
20/// events. See the [`README.md`] in this module for more details.
21///
22/// [`README.md`] README.md
23pub(super) struct Sleep {
24    logger: Logger,
25
26    /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
27    /// them block.
28    worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
29
30    counters: AtomicCounters,
31}
32
33/// An instance of this struct is created when a thread becomes idle.
34/// It is consumed when the thread finds work, and passed by `&mut`
35/// reference for operations that preserve the idle state. (In other
36/// words, producing one of these structs is evidence the thread is
37/// idle.) It tracks state such as how long the thread has been idle.
38pub(super) struct IdleState {
39    /// What is worker index of the idle thread?
40    worker_index: usize,
41
42    /// How many rounds have we been circling without sleeping?
43    rounds: u32,
44
45    /// Once we become sleepy, what was the sleepy counter value?
46    /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
47    jobs_counter: JobsEventCounter,
48}
49
50/// The "sleep state" for an individual worker.
51#[derive(Default)]
52struct WorkerSleepState {
53    /// Set to true when the worker goes to sleep; set to false when
54    /// the worker is notified or when it wakes.
55    is_blocked: Mutex<bool>,
56
57    condvar: Condvar,
58}
59
60const ROUNDS_UNTIL_SLEEPY: u32 = 32;
61const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
62
63impl Sleep {
64    pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
65        Sleep {
66            logger,
67            worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
68            counters: AtomicCounters::new(),
69        }
70    }
71
72    #[inline]
73    pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
74        self.logger.log(|| ThreadIdle {
75            worker: worker_index,
76            latch_addr: latch.addr(),
77        });
78
79        self.counters.add_inactive_thread();
80
81        IdleState {
82            worker_index,
83            rounds: 0,
84            jobs_counter: JobsEventCounter::DUMMY,
85        }
86    }
87
88    #[inline]
89    pub(super) fn work_found(&self, idle_state: IdleState) {
90        self.logger.log(|| ThreadFoundWork {
91            worker: idle_state.worker_index,
92            yields: idle_state.rounds,
93        });
94
95        // If we were the last idle thread and other threads are still sleeping,
96        // then we should wake up another thread.
97        let threads_to_wake = self.counters.sub_inactive_thread();
98        self.wake_any_threads(threads_to_wake as u32);
99    }
100
101    #[inline]
102    pub(super) fn no_work_found(
103        &self,
104        idle_state: &mut IdleState,
105        latch: &CoreLatch,
106        has_injected_jobs: impl FnOnce() -> bool,
107    ) {
108        if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
109            thread::yield_now();
110            idle_state.rounds += 1;
111        } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
112            idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
113            idle_state.rounds += 1;
114            thread::yield_now();
115        } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
116            idle_state.rounds += 1;
117            thread::yield_now();
118        } else {
119            debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
120            self.sleep(idle_state, latch, has_injected_jobs);
121        }
122    }
123
124    #[cold]
125    fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
126        let counters = self
127            .counters
128            .increment_jobs_event_counter_if(JobsEventCounter::is_active);
129        let jobs_counter = counters.jobs_counter();
130        self.logger.log(|| ThreadSleepy {
131            worker: worker_index,
132            jobs_counter: jobs_counter.as_usize(),
133        });
134        jobs_counter
135    }
136
137    #[cold]
138    fn sleep(
139        &self,
140        idle_state: &mut IdleState,
141        latch: &CoreLatch,
142        has_injected_jobs: impl FnOnce() -> bool,
143    ) {
144        let worker_index = idle_state.worker_index;
145
146        if !latch.get_sleepy() {
147            self.logger.log(|| ThreadSleepInterruptedByLatch {
148                worker: worker_index,
149                latch_addr: latch.addr(),
150            });
151
152            return;
153        }
154
155        let sleep_state = &self.worker_sleep_states[worker_index];
156        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
157        debug_assert!(!*is_blocked);
158
159        // Our latch was signalled. We should wake back up fully as we
160        // wil have some stuff to do.
161        if !latch.fall_asleep() {
162            self.logger.log(|| ThreadSleepInterruptedByLatch {
163                worker: worker_index,
164                latch_addr: latch.addr(),
165            });
166
167            idle_state.wake_fully();
168            return;
169        }
170
171        loop {
172            let counters = self.counters.load(Ordering::SeqCst);
173
174            // Check if the JEC has changed since we got sleepy.
175            debug_assert!(idle_state.jobs_counter.is_sleepy());
176            if counters.jobs_counter() != idle_state.jobs_counter {
177                // JEC has changed, so a new job was posted, but for some reason
178                // we didn't see it. We should return to just before the SLEEPY
179                // state so we can do another search and (if we fail to find
180                // work) go back to sleep.
181                self.logger.log(|| ThreadSleepInterruptedByJob {
182                    worker: worker_index,
183                });
184
185                idle_state.wake_partly();
186                latch.wake_up();
187                return;
188            }
189
190            // Otherwise, let's move from IDLE to SLEEPING.
191            if self.counters.try_add_sleeping_thread(counters) {
192                break;
193            }
194        }
195
196        // Successfully registered as asleep.
197
198        self.logger.log(|| ThreadSleeping {
199            worker: worker_index,
200            latch_addr: latch.addr(),
201        });
202
203        // We have one last check for injected jobs to do. This protects against
204        // deadlock in the very unlikely event that
205        //
206        // - an external job is being injected while we are sleepy
207        // - that job triggers the rollover over the JEC such that we don't see it
208        // - we are the last active worker thread
209        std::sync::atomic::fence(Ordering::SeqCst);
210        if has_injected_jobs() {
211            // If we see an externally injected job, then we have to 'wake
212            // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
213            // the one that wakes us.)
214            self.counters.sub_sleeping_thread();
215        } else {
216            // If we don't see an injected job (the normal case), then flag
217            // ourselves as asleep and wait till we are notified.
218            //
219            // (Note that `is_blocked` is held under a mutex and the mutex was
220            // acquired *before* we incremented the "sleepy counter". This means
221            // that whomever is coming to wake us will have to wait until we
222            // release the mutex in the call to `wait`, so they will see this
223            // boolean as true.)
224            *is_blocked = true;
225            while *is_blocked {
226                is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
227            }
228        }
229
230        // Update other state:
231        idle_state.wake_fully();
232        latch.wake_up();
233
234        self.logger.log(|| ThreadAwoken {
235            worker: worker_index,
236            latch_addr: latch.addr(),
237        });
238    }
239
240    /// Notify the given thread that it should wake up (if it is
241    /// sleeping).  When this method is invoked, we typically know the
242    /// thread is asleep, though in rare cases it could have been
243    /// awoken by (e.g.) new work having been posted.
244    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
245        self.wake_specific_thread(target_worker_index);
246    }
247
248    /// Signals that `num_jobs` new jobs were injected into the thread
249    /// pool from outside. This function will ensure that there are
250    /// threads available to process them, waking threads from sleep
251    /// if necessary.
252    ///
253    /// # Parameters
254    ///
255    /// - `source_worker_index` -- index of the thread that did the
256    ///   push, or `usize::MAX` if this came from outside the thread
257    ///   pool -- it is used only for logging.
258    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
259    ///   We'll try to get at least one thread per job.
260    #[inline]
261    pub(super) fn new_injected_jobs(
262        &self,
263        source_worker_index: usize,
264        num_jobs: u32,
265        queue_was_empty: bool,
266    ) {
267        // This fence is needed to guarantee that threads
268        // as they are about to fall asleep, observe any
269        // new jobs that may have been injected.
270        std::sync::atomic::fence(Ordering::SeqCst);
271
272        self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
273    }
274
275    /// Signals that `num_jobs` new jobs were pushed onto a thread's
276    /// local deque. This function will try to ensure that there are
277    /// threads available to process them, waking threads from sleep
278    /// if necessary. However, this is not guaranteed: under certain
279    /// race conditions, the function may fail to wake any new
280    /// threads; in that case the existing thread should eventually
281    /// pop the job.
282    ///
283    /// # Parameters
284    ///
285    /// - `source_worker_index` -- index of the thread that did the
286    ///   push, or `usize::MAX` if this came from outside the thread
287    ///   pool -- it is used only for logging.
288    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
289    ///   We'll try to get at least one thread per job.
290    #[inline]
291    pub(super) fn new_internal_jobs(
292        &self,
293        source_worker_index: usize,
294        num_jobs: u32,
295        queue_was_empty: bool,
296    ) {
297        self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
298    }
299
300    /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
301    #[inline]
302    fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
303        // Read the counters and -- if sleepy workers have announced themselves
304        // -- announce that there is now work available. The final value of `counters`
305        // with which we exit the loop thus corresponds to a state when
306        let counters = self
307            .counters
308            .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
309        let num_awake_but_idle = counters.awake_but_idle_threads();
310        let num_sleepers = counters.sleeping_threads();
311
312        self.logger.log(|| JobThreadCounts {
313            worker: source_worker_index,
314            num_idle: num_awake_but_idle as u16,
315            num_sleepers: num_sleepers as u16,
316        });
317
318        if num_sleepers == 0 {
319            // nobody to wake
320            return;
321        }
322
323        // Promote from u16 to u32 so we can interoperate with
324        // num_jobs more easily.
325        let num_awake_but_idle = num_awake_but_idle as u32;
326        let num_sleepers = num_sleepers as u32;
327
328        // If the queue is non-empty, then we always wake up a worker
329        // -- clearly the existing idle jobs aren't enough. Otherwise,
330        // check to see if we have enough idle workers.
331        if !queue_was_empty {
332            let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
333            self.wake_any_threads(num_to_wake);
334        } else if num_awake_but_idle < num_jobs {
335            let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
336            self.wake_any_threads(num_to_wake);
337        }
338    }
339
340    #[cold]
341    fn wake_any_threads(&self, mut num_to_wake: u32) {
342        if num_to_wake > 0 {
343            for i in 0..self.worker_sleep_states.len() {
344                if self.wake_specific_thread(i) {
345                    num_to_wake -= 1;
346                    if num_to_wake == 0 {
347                        return;
348                    }
349                }
350            }
351        }
352    }
353
354    fn wake_specific_thread(&self, index: usize) -> bool {
355        let sleep_state = &self.worker_sleep_states[index];
356
357        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
358        if *is_blocked {
359            *is_blocked = false;
360            sleep_state.condvar.notify_one();
361
362            // When the thread went to sleep, it will have incremented
363            // this value. When we wake it, its our job to decrement
364            // it. We could have the thread do it, but that would
365            // introduce a delay between when the thread was
366            // *notified* and when this counter was decremented. That
367            // might mislead people with new work into thinking that
368            // there are sleeping threads that they should try to
369            // wake, when in fact there is nothing left for them to
370            // do.
371            self.counters.sub_sleeping_thread();
372
373            self.logger.log(|| ThreadNotify { worker: index });
374
375            true
376        } else {
377            false
378        }
379    }
380}
381
382impl IdleState {
383    fn wake_fully(&mut self) {
384        self.rounds = 0;
385        self.jobs_counter = JobsEventCounter::DUMMY;
386    }
387
388    fn wake_partly(&mut self) {
389        self.rounds = ROUNDS_UNTIL_SLEEPY;
390        self.jobs_counter = JobsEventCounter::DUMMY;
391    }
392}