rayon_core/sleep/
counters.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3pub(super) struct AtomicCounters {
4    /// Packs together a number of counters. The counters are ordered as
5    /// follows, from least to most significant bits (here, we assuming
6    /// that [`THREADS_BITS`] is equal to 10):
7    ///
8    /// * Bits 0..10: Stores the number of **sleeping threads**
9    /// * Bits 10..20: Stores the number of **inactive threads**
10    /// * Bits 20..: Stores the **job event counter** (JEC)
11    ///
12    /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
13    /// that the total number of bits (and hence the number of bits used for the
14    /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
15    value: AtomicUsize,
16}
17
18#[derive(Copy, Clone)]
19pub(super) struct Counters {
20    word: usize,
21}
22
23/// A value read from the **Jobs Event Counter**.
24/// See the [`README.md`](README.md) for more
25/// coverage of how the jobs event counter works.
26#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
27pub(super) struct JobsEventCounter(usize);
28
29impl JobsEventCounter {
30    pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
31
32    #[inline]
33    pub(super) fn as_usize(self) -> usize {
34        self.0
35    }
36
37    /// The JEC "is sleepy" if the last thread to increment it was in the
38    /// process of becoming sleepy. This is indicated by its value being *even*.
39    /// When new jobs are posted, they check if the JEC is sleepy, and if so
40    /// they incremented it.
41    #[inline]
42    pub(super) fn is_sleepy(self) -> bool {
43        (self.as_usize() & 1) == 0
44    }
45
46    /// The JEC "is active" if the last thread to increment it was posting new
47    /// work. This is indicated by its value being *odd*. When threads get
48    /// sleepy, they will check if the JEC is active, and increment it.
49    #[inline]
50    pub(super) fn is_active(self) -> bool {
51        !self.is_sleepy()
52    }
53}
54
55/// Number of bits used for the thread counters.
56const THREADS_BITS: usize = 10;
57
58/// Bits to shift to select the sleeping threads
59/// (used with `select_bits`).
60const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
61
62/// Bits to shift to select the inactive threads
63/// (used with `select_bits`).
64const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
65
66/// Bits to shift to select the JEC
67/// (use JOBS_BITS).
68const JEC_SHIFT: usize = 2 * THREADS_BITS;
69
70/// Max value for the thread counters.
71const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
72
73/// Constant that can be added to add one sleeping thread.
74const ONE_SLEEPING: usize = 1;
75
76/// Constant that can be added to add one inactive thread.
77/// An inactive thread is either idle, sleepy, or sleeping.
78const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
79
80/// Constant that can be added to add one to the JEC.
81const ONE_JEC: usize = 1 << JEC_SHIFT;
82
83impl AtomicCounters {
84    #[inline]
85    pub(super) fn new() -> AtomicCounters {
86        AtomicCounters {
87            value: AtomicUsize::new(0),
88        }
89    }
90
91    /// Load and return the current value of the various counters.
92    /// This value can then be given to other method which will
93    /// attempt to update the counters via compare-and-swap.
94    #[inline]
95    pub(super) fn load(&self, ordering: Ordering) -> Counters {
96        Counters::new(self.value.load(ordering))
97    }
98
99    #[inline]
100    fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
101        self.value
102            .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
103            .is_ok()
104    }
105
106    /// Adds an inactive thread. This cannot fail.
107    ///
108    /// This should be invoked when a thread enters its idle loop looking
109    /// for work. It is decremented when work is found. Note that it is
110    /// not decremented if the thread transitions from idle to sleepy or sleeping;
111    /// so the number of inactive threads is always greater-than-or-equal
112    /// to the number of sleeping threads.
113    #[inline]
114    pub(super) fn add_inactive_thread(&self) {
115        self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
116    }
117
118    /// Increments the jobs event counter if `increment_when`, when applied to
119    /// the current value, is true. Used to toggle the JEC from even (sleepy) to
120    /// odd (active) or vice versa. Returns the final value of the counters, for
121    /// which `increment_when` is guaranteed to return false.
122    pub(super) fn increment_jobs_event_counter_if(
123        &self,
124        increment_when: impl Fn(JobsEventCounter) -> bool,
125    ) -> Counters {
126        loop {
127            let old_value = self.load(Ordering::SeqCst);
128            if increment_when(old_value.jobs_counter()) {
129                let new_value = old_value.increment_jobs_counter();
130                if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
131                    return new_value;
132                }
133            } else {
134                return old_value;
135            }
136        }
137    }
138
139    /// Subtracts an inactive thread. This cannot fail. It is invoked
140    /// when a thread finds work and hence becomes active. It returns the
141    /// number of sleeping threads to wake up (if any).
142    ///
143    /// See `add_inactive_thread`.
144    #[inline]
145    pub(super) fn sub_inactive_thread(&self) -> usize {
146        let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
147        debug_assert!(
148            old_value.inactive_threads() > 0,
149            "sub_inactive_thread: old_value {:?} has no inactive threads",
150            old_value,
151        );
152        debug_assert!(
153            old_value.sleeping_threads() <= old_value.inactive_threads(),
154            "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
155            old_value,
156            old_value.sleeping_threads(),
157            old_value.inactive_threads(),
158        );
159
160        // Current heuristic: whenever an inactive thread goes away, if
161        // there are any sleeping threads, wake 'em up.
162        let sleeping_threads = old_value.sleeping_threads();
163        std::cmp::min(sleeping_threads, 2)
164    }
165
166    /// Subtracts a sleeping thread. This cannot fail, but it is only
167    /// safe to do if you you know the number of sleeping threads is
168    /// non-zero (i.e., because you have just awoken a sleeping
169    /// thread).
170    #[inline]
171    pub(super) fn sub_sleeping_thread(&self) {
172        let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
173        debug_assert!(
174            old_value.sleeping_threads() > 0,
175            "sub_sleeping_thread: old_value {:?} had no sleeping threads",
176            old_value,
177        );
178        debug_assert!(
179            old_value.sleeping_threads() <= old_value.inactive_threads(),
180            "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
181            old_value,
182            old_value.sleeping_threads(),
183            old_value.inactive_threads(),
184        );
185    }
186
187    #[inline]
188    pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
189        debug_assert!(
190            old_value.inactive_threads() > 0,
191            "try_add_sleeping_thread: old_value {:?} has no inactive threads",
192            old_value,
193        );
194        debug_assert!(
195            old_value.sleeping_threads() < THREADS_MAX,
196            "try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
197            old_value,
198        );
199
200        let mut new_value = old_value;
201        new_value.word += ONE_SLEEPING;
202
203        self.try_exchange(old_value, new_value, Ordering::SeqCst)
204    }
205}
206
207#[inline]
208fn select_thread(word: usize, shift: usize) -> usize {
209    ((word >> shift) as usize) & THREADS_MAX
210}
211
212#[inline]
213fn select_jec(word: usize) -> usize {
214    (word >> JEC_SHIFT) as usize
215}
216
217impl Counters {
218    #[inline]
219    fn new(word: usize) -> Counters {
220        Counters { word }
221    }
222
223    #[inline]
224    fn increment_jobs_counter(self) -> Counters {
225        // We can freely add to JEC because it occupies the most significant bits.
226        // Thus it doesn't overflow into the other counters, just wraps itself.
227        Counters {
228            word: self.word.wrapping_add(ONE_JEC),
229        }
230    }
231
232    #[inline]
233    pub(super) fn jobs_counter(self) -> JobsEventCounter {
234        JobsEventCounter(select_jec(self.word))
235    }
236
237    /// The number of threads that are not actively
238    /// executing work. They may be idle, sleepy, or asleep.
239    #[inline]
240    pub(super) fn inactive_threads(self) -> usize {
241        select_thread(self.word, INACTIVE_SHIFT)
242    }
243
244    #[inline]
245    pub(super) fn awake_but_idle_threads(self) -> usize {
246        debug_assert!(
247            self.sleeping_threads() <= self.inactive_threads(),
248            "sleeping threads: {} > raw idle threads {}",
249            self.sleeping_threads(),
250            self.inactive_threads()
251        );
252        self.inactive_threads() - self.sleeping_threads()
253    }
254
255    #[inline]
256    pub(super) fn sleeping_threads(self) -> usize {
257        select_thread(self.word, SLEEPING_SHIFT)
258    }
259}
260
261impl std::fmt::Debug for Counters {
262    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        let word = format!("{:016x}", self.word);
264        fmt.debug_struct("Counters")
265            .field("word", &word)
266            .field("jobs", &self.jobs_counter().0)
267            .field("inactive", &self.inactive_threads())
268            .field("sleeping", &self.sleeping_threads())
269            .finish()
270    }
271}