rayon_core/
latch.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Condvar, Mutex};
3use std::usize;
4
5use crate::registry::{Registry, WorkerThread};
6
7/// We define various kinds of latches, which are all a primitive signaling
8/// mechanism. A latch starts as false. Eventually someone calls `set()` and
9/// it becomes true. You can test if it has been set by calling `probe()`.
10///
11/// Some kinds of latches, but not all, support a `wait()` operation
12/// that will wait until the latch is set, blocking efficiently. That
13/// is not part of the trait since it is not possibly to do with all
14/// latches.
15///
16/// The intention is that `set()` is called once, but `probe()` may be
17/// called any number of times. Once `probe()` returns true, the memory
18/// effects that occurred before `set()` become visible.
19///
20/// It'd probably be better to refactor the API into two paired types,
21/// but that's a bit of work, and this is not a public API.
22///
23/// ## Memory ordering
24///
25/// Latches need to guarantee two things:
26///
27/// - Once `probe()` returns true, all memory effects from the `set()`
28///   are visible (in other words, the set should synchronize-with
29///   the probe).
30/// - Once `set()` occurs, the next `probe()` *will* observe it.  This
31///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
32///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
33pub(super) trait Latch {
34    /// Set the latch, signalling others.
35    ///
36    /// # WARNING
37    ///
38    /// Setting a latch triggers other threads to wake up and (in some
39    /// cases) complete. This may, in turn, cause memory to be
40    /// allocated and so forth.  One must be very careful about this,
41    /// and it's typically better to read all the fields you will need
42    /// to access *before* a latch is set!
43    fn set(&self);
44}
45
46pub(super) trait AsCoreLatch {
47    fn as_core_latch(&self) -> &CoreLatch;
48}
49
50/// Latch is not set, owning thread is awake
51const UNSET: usize = 0;
52
53/// Latch is not set, owning thread is going to sleep on this latch
54/// (but has not yet fallen asleep).
55const SLEEPY: usize = 1;
56
57/// Latch is not set, owning thread is asleep on this latch and
58/// must be awoken.
59const SLEEPING: usize = 2;
60
61/// Latch is set.
62const SET: usize = 3;
63
64/// Spin latches are the simplest, most efficient kind, but they do
65/// not support a `wait()` operation. They just have a boolean flag
66/// that becomes true when `set()` is called.
67#[derive(Debug)]
68pub(super) struct CoreLatch {
69    state: AtomicUsize,
70}
71
72impl CoreLatch {
73    #[inline]
74    fn new() -> Self {
75        Self {
76            state: AtomicUsize::new(0),
77        }
78    }
79
80    /// Returns the address of this core latch as an integer. Used
81    /// for logging.
82    #[inline]
83    pub(super) fn addr(&self) -> usize {
84        self as *const CoreLatch as usize
85    }
86
87    /// Invoked by owning thread as it prepares to sleep. Returns true
88    /// if the owning thread may proceed to fall asleep, false if the
89    /// latch was set in the meantime.
90    #[inline]
91    pub(super) fn get_sleepy(&self) -> bool {
92        self.state
93            .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
94            .is_ok()
95    }
96
97    /// Invoked by owning thread as it falls asleep sleep. Returns
98    /// true if the owning thread should block, or false if the latch
99    /// was set in the meantime.
100    #[inline]
101    pub(super) fn fall_asleep(&self) -> bool {
102        self.state
103            .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
104            .is_ok()
105    }
106
107    /// Invoked by owning thread as it falls asleep sleep. Returns
108    /// true if the owning thread should block, or false if the latch
109    /// was set in the meantime.
110    #[inline]
111    pub(super) fn wake_up(&self) {
112        if !self.probe() {
113            let _ =
114                self.state
115                    .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
116        }
117    }
118
119    /// Set the latch. If this returns true, the owning thread was sleeping
120    /// and must be awoken.
121    ///
122    /// This is private because, typically, setting a latch involves
123    /// doing some wakeups; those are encapsulated in the surrounding
124    /// latch code.
125    #[inline]
126    fn set(&self) -> bool {
127        let old_state = self.state.swap(SET, Ordering::AcqRel);
128        old_state == SLEEPING
129    }
130
131    /// Test if this latch has been set.
132    #[inline]
133    pub(super) fn probe(&self) -> bool {
134        self.state.load(Ordering::Acquire) == SET
135    }
136}
137
138/// Spin latches are the simplest, most efficient kind, but they do
139/// not support a `wait()` operation. They just have a boolean flag
140/// that becomes true when `set()` is called.
141pub(super) struct SpinLatch<'r> {
142    core_latch: CoreLatch,
143    registry: &'r Arc<Registry>,
144    target_worker_index: usize,
145    cross: bool,
146}
147
148impl<'r> SpinLatch<'r> {
149    /// Creates a new spin latch that is owned by `thread`. This means
150    /// that `thread` is the only thread that should be blocking on
151    /// this latch -- it also means that when the latch is set, we
152    /// will wake `thread` if it is sleeping.
153    #[inline]
154    pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
155        SpinLatch {
156            core_latch: CoreLatch::new(),
157            registry: thread.registry(),
158            target_worker_index: thread.index(),
159            cross: false,
160        }
161    }
162
163    /// Creates a new spin latch for cross-threadpool blocking.  Notably, we
164    /// need to make sure the registry is kept alive after setting, so we can
165    /// safely call the notification.
166    #[inline]
167    pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
168        SpinLatch {
169            cross: true,
170            ..SpinLatch::new(thread)
171        }
172    }
173
174    #[inline]
175    pub(super) fn probe(&self) -> bool {
176        self.core_latch.probe()
177    }
178}
179
180impl<'r> AsCoreLatch for SpinLatch<'r> {
181    #[inline]
182    fn as_core_latch(&self) -> &CoreLatch {
183        &self.core_latch
184    }
185}
186
187impl<'r> Latch for SpinLatch<'r> {
188    #[inline]
189    fn set(&self) {
190        let cross_registry;
191
192        let registry = if self.cross {
193            // Ensure the registry stays alive while we notify it.
194            // Otherwise, it would be possible that we set the spin
195            // latch and the other thread sees it and exits, causing
196            // the registry to be deallocated, all before we get a
197            // chance to invoke `registry.notify_worker_latch_is_set`.
198            cross_registry = Arc::clone(self.registry);
199            &cross_registry
200        } else {
201            // If this is not a "cross-registry" spin-latch, then the
202            // thread which is performing `set` is itself ensuring
203            // that the registry stays alive.
204            self.registry
205        };
206        let target_worker_index = self.target_worker_index;
207
208        // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
209        if self.core_latch.set() {
210            // Subtle: at this point, we can no longer read from
211            // `self`, because the thread owning this spin latch may
212            // have awoken and deallocated the latch. Therefore, we
213            // only use fields whose values we already read.
214            registry.notify_worker_latch_is_set(target_worker_index);
215        }
216    }
217}
218
219/// A Latch starts as false and eventually becomes true. You can block
220/// until it becomes true.
221#[derive(Debug)]
222pub(super) struct LockLatch {
223    m: Mutex<bool>,
224    v: Condvar,
225}
226
227impl LockLatch {
228    #[inline]
229    pub(super) fn new() -> LockLatch {
230        LockLatch {
231            m: Mutex::new(false),
232            v: Condvar::new(),
233        }
234    }
235
236    /// Block until latch is set, then resets this lock latch so it can be reused again.
237    pub(super) fn wait_and_reset(&self) {
238        let mut guard = self.m.lock().unwrap();
239        while !*guard {
240            guard = self.v.wait(guard).unwrap();
241        }
242        *guard = false;
243    }
244
245    /// Block until latch is set.
246    pub(super) fn wait(&self) {
247        let mut guard = self.m.lock().unwrap();
248        while !*guard {
249            guard = self.v.wait(guard).unwrap();
250        }
251    }
252}
253
254impl Latch for LockLatch {
255    #[inline]
256    fn set(&self) {
257        let mut guard = self.m.lock().unwrap();
258        *guard = true;
259        self.v.notify_all();
260    }
261}
262
263/// Counting latches are used to implement scopes. They track a
264/// counter. Unlike other latches, calling `set()` does not
265/// necessarily make the latch be considered `set()`; instead, it just
266/// decrements the counter. The latch is only "set" (in the sense that
267/// `probe()` returns true) once the counter reaches zero.
268///
269/// Note: like a `SpinLatch`, count laches are always associated with
270/// some registry that is probing them, which must be tickled when
271/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
272/// reference to that registry. This is because in some cases the
273/// registry owns the count-latch, and that would create a cycle. So a
274/// `CountLatch` must be given a reference to its owning registry when
275/// it is set. For this reason, it does not implement the `Latch`
276/// trait (but it doesn't have to, as it is not used in those generic
277/// contexts).
278#[derive(Debug)]
279pub(super) struct CountLatch {
280    core_latch: CoreLatch,
281    counter: AtomicUsize,
282}
283
284impl CountLatch {
285    #[inline]
286    pub(super) fn new() -> CountLatch {
287        CountLatch {
288            core_latch: CoreLatch::new(),
289            counter: AtomicUsize::new(1),
290        }
291    }
292
293    #[inline]
294    pub(super) fn increment(&self) {
295        debug_assert!(!self.core_latch.probe());
296        self.counter.fetch_add(1, Ordering::Relaxed);
297    }
298
299    /// Decrements the latch counter by one. If this is the final
300    /// count, then the latch is **set**, and calls to `probe()` will
301    /// return true. Returns whether the latch was set.
302    #[inline]
303    pub(super) fn set(&self) -> bool {
304        if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
305            self.core_latch.set();
306            true
307        } else {
308            false
309        }
310    }
311
312    /// Decrements the latch counter by one and possibly set it.  If
313    /// the latch is set, then the specific worker thread is tickled,
314    /// which should be the one that owns this latch.
315    #[inline]
316    pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
317        if self.set() {
318            registry.notify_worker_latch_is_set(target_worker_index);
319        }
320    }
321}
322
323impl AsCoreLatch for CountLatch {
324    #[inline]
325    fn as_core_latch(&self) -> &CoreLatch {
326        &self.core_latch
327    }
328}
329
330#[derive(Debug)]
331pub(super) struct CountLockLatch {
332    lock_latch: LockLatch,
333    counter: AtomicUsize,
334}
335
336impl CountLockLatch {
337    #[inline]
338    pub(super) fn new() -> CountLockLatch {
339        CountLockLatch {
340            lock_latch: LockLatch::new(),
341            counter: AtomicUsize::new(1),
342        }
343    }
344
345    #[inline]
346    pub(super) fn increment(&self) {
347        let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
348        debug_assert!(old_counter != 0);
349    }
350
351    pub(super) fn wait(&self) {
352        self.lock_latch.wait();
353    }
354}
355
356impl Latch for CountLockLatch {
357    #[inline]
358    fn set(&self) {
359        if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
360            self.lock_latch.set();
361        }
362    }
363}
364
365impl<'a, L> Latch for &'a L
366where
367    L: Latch,
368{
369    #[inline]
370    fn set(&self) {
371        L::set(self);
372    }
373}