1use std::sync::atomic::{AtomicUsize, Ordering};
23pub(super) struct AtomicCounters {
4/// Packs together a number of counters. The counters are ordered as
5 /// follows, from least to most significant bits (here, we assuming
6 /// that [`THREADS_BITS`] is equal to 10):
7 ///
8 /// * Bits 0..10: Stores the number of **sleeping threads**
9 /// * Bits 10..20: Stores the number of **inactive threads**
10 /// * Bits 20..: Stores the **job event counter** (JEC)
11 ///
12 /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
13 /// that the total number of bits (and hence the number of bits used for the
14 /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
15value: AtomicUsize,
16}
1718#[derive(Copy, Clone)]
19pub(super) struct Counters {
20 word: usize,
21}
2223/// A value read from the **Jobs Event Counter**.
24/// See the [`README.md`](README.md) for more
25/// coverage of how the jobs event counter works.
26#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
27pub(super) struct JobsEventCounter(usize);
2829impl JobsEventCounter {
30pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
3132#[inline]
33pub(super) fn as_usize(self) -> usize {
34self.0
35}
3637/// The JEC "is sleepy" if the last thread to increment it was in the
38 /// process of becoming sleepy. This is indicated by its value being *even*.
39 /// When new jobs are posted, they check if the JEC is sleepy, and if so
40 /// they incremented it.
41#[inline]
42pub(super) fn is_sleepy(self) -> bool {
43 (self.as_usize() & 1) == 0
44}
4546/// The JEC "is active" if the last thread to increment it was posting new
47 /// work. This is indicated by its value being *odd*. When threads get
48 /// sleepy, they will check if the JEC is active, and increment it.
49#[inline]
50pub(super) fn is_active(self) -> bool {
51 !self.is_sleepy()
52 }
53}
5455/// Number of bits used for the thread counters.
56const THREADS_BITS: usize = 10;
5758/// Bits to shift to select the sleeping threads
59/// (used with `select_bits`).
60const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
6162/// Bits to shift to select the inactive threads
63/// (used with `select_bits`).
64const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
6566/// Bits to shift to select the JEC
67/// (use JOBS_BITS).
68const JEC_SHIFT: usize = 2 * THREADS_BITS;
6970/// Max value for the thread counters.
71const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
7273/// Constant that can be added to add one sleeping thread.
74const ONE_SLEEPING: usize = 1;
7576/// Constant that can be added to add one inactive thread.
77/// An inactive thread is either idle, sleepy, or sleeping.
78const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
7980/// Constant that can be added to add one to the JEC.
81const ONE_JEC: usize = 1 << JEC_SHIFT;
8283impl AtomicCounters {
84#[inline]
85pub(super) fn new() -> AtomicCounters {
86 AtomicCounters {
87 value: AtomicUsize::new(0),
88 }
89 }
9091/// Load and return the current value of the various counters.
92 /// This value can then be given to other method which will
93 /// attempt to update the counters via compare-and-swap.
94#[inline]
95pub(super) fn load(&self, ordering: Ordering) -> Counters {
96 Counters::new(self.value.load(ordering))
97 }
9899#[inline]
100fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
101self.value
102 .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
103 .is_ok()
104 }
105106/// Adds an inactive thread. This cannot fail.
107 ///
108 /// This should be invoked when a thread enters its idle loop looking
109 /// for work. It is decremented when work is found. Note that it is
110 /// not decremented if the thread transitions from idle to sleepy or sleeping;
111 /// so the number of inactive threads is always greater-than-or-equal
112 /// to the number of sleeping threads.
113#[inline]
114pub(super) fn add_inactive_thread(&self) {
115self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
116 }
117118/// Increments the jobs event counter if `increment_when`, when applied to
119 /// the current value, is true. Used to toggle the JEC from even (sleepy) to
120 /// odd (active) or vice versa. Returns the final value of the counters, for
121 /// which `increment_when` is guaranteed to return false.
122pub(super) fn increment_jobs_event_counter_if(
123&self,
124 increment_when: impl Fn(JobsEventCounter) -> bool,
125 ) -> Counters {
126loop {
127let old_value = self.load(Ordering::SeqCst);
128if increment_when(old_value.jobs_counter()) {
129let new_value = old_value.increment_jobs_counter();
130if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
131return new_value;
132 }
133 } else {
134return old_value;
135 }
136 }
137 }
138139/// Subtracts an inactive thread. This cannot fail. It is invoked
140 /// when a thread finds work and hence becomes active. It returns the
141 /// number of sleeping threads to wake up (if any).
142 ///
143 /// See `add_inactive_thread`.
144#[inline]
145pub(super) fn sub_inactive_thread(&self) -> usize {
146let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
147debug_assert!(
148 old_value.inactive_threads() > 0,
149"sub_inactive_thread: old_value {:?} has no inactive threads",
150 old_value,
151 );
152debug_assert!(
153 old_value.sleeping_threads() <= old_value.inactive_threads(),
154"sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
155 old_value,
156 old_value.sleeping_threads(),
157 old_value.inactive_threads(),
158 );
159160// Current heuristic: whenever an inactive thread goes away, if
161 // there are any sleeping threads, wake 'em up.
162let sleeping_threads = old_value.sleeping_threads();
163 std::cmp::min(sleeping_threads, 2)
164 }
165166/// Subtracts a sleeping thread. This cannot fail, but it is only
167 /// safe to do if you you know the number of sleeping threads is
168 /// non-zero (i.e., because you have just awoken a sleeping
169 /// thread).
170#[inline]
171pub(super) fn sub_sleeping_thread(&self) {
172let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
173debug_assert!(
174 old_value.sleeping_threads() > 0,
175"sub_sleeping_thread: old_value {:?} had no sleeping threads",
176 old_value,
177 );
178debug_assert!(
179 old_value.sleeping_threads() <= old_value.inactive_threads(),
180"sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
181 old_value,
182 old_value.sleeping_threads(),
183 old_value.inactive_threads(),
184 );
185 }
186187#[inline]
188pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
189debug_assert!(
190 old_value.inactive_threads() > 0,
191"try_add_sleeping_thread: old_value {:?} has no inactive threads",
192 old_value,
193 );
194debug_assert!(
195 old_value.sleeping_threads() < THREADS_MAX,
196"try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
197 old_value,
198 );
199200let mut new_value = old_value;
201 new_value.word += ONE_SLEEPING;
202203self.try_exchange(old_value, new_value, Ordering::SeqCst)
204 }
205}
206207#[inline]
208fn select_thread(word: usize, shift: usize) -> usize {
209 ((word >> shift) as usize) & THREADS_MAX
210}
211212#[inline]
213fn select_jec(word: usize) -> usize {
214 (word >> JEC_SHIFT) as usize
215}
216217impl Counters {
218#[inline]
219fn new(word: usize) -> Counters {
220 Counters { word }
221 }
222223#[inline]
224fn increment_jobs_counter(self) -> Counters {
225// We can freely add to JEC because it occupies the most significant bits.
226 // Thus it doesn't overflow into the other counters, just wraps itself.
227Counters {
228 word: self.word.wrapping_add(ONE_JEC),
229 }
230 }
231232#[inline]
233pub(super) fn jobs_counter(self) -> JobsEventCounter {
234 JobsEventCounter(select_jec(self.word))
235 }
236237/// The number of threads that are not actively
238 /// executing work. They may be idle, sleepy, or asleep.
239#[inline]
240pub(super) fn inactive_threads(self) -> usize {
241 select_thread(self.word, INACTIVE_SHIFT)
242 }
243244#[inline]
245pub(super) fn awake_but_idle_threads(self) -> usize {
246debug_assert!(
247self.sleeping_threads() <= self.inactive_threads(),
248"sleeping threads: {} > raw idle threads {}",
249self.sleeping_threads(),
250self.inactive_threads()
251 );
252self.inactive_threads() - self.sleeping_threads()
253 }
254255#[inline]
256pub(super) fn sleeping_threads(self) -> usize {
257 select_thread(self.word, SLEEPING_SHIFT)
258 }
259}
260261impl std::fmt::Debug for Counters {
262fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263let word = format!("{:016x}", self.word);
264 fmt.debug_struct("Counters")
265 .field("word", &word)
266 .field("jobs", &self.jobs_counter().0)
267 .field("inactive", &self.inactive_threads())
268 .field("sleeping", &self.sleeping_threads())
269 .finish()
270 }
271}