parking/
lib.rs

1//! Thread parking and unparking.
2//!
3//! A parker is in either notified or unnotified state. Method [`park()`][`Parker::park()`] blocks
4//! the current thread until the parker becomes notified and then puts it back into unnotified
5//! state. Method [`unpark()`][`Unparker::unpark()`] puts it into notified state.
6//!
7//! # Examples
8//!
9//! ```
10//! use std::thread;
11//! use std::time::Duration;
12//! use parking::Parker;
13//!
14//! let p = Parker::new();
15//! let u = p.unparker();
16//!
17//! // Notify the parker.
18//! u.unpark();
19//!
20//! // Wakes up immediately because the parker is notified.
21//! p.park();
22//!
23//! thread::spawn(move || {
24//!     thread::sleep(Duration::from_millis(500));
25//!     u.unpark();
26//! });
27//!
28//! // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
29//! p.park();
30//! ```
31
32#![forbid(unsafe_code)]
33#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
34#![doc(
35    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40
41#[cfg(not(all(loom, feature = "loom")))]
42use std::sync;
43
44#[cfg(all(loom, feature = "loom"))]
45use loom::sync;
46
47use std::cell::Cell;
48use std::fmt;
49use std::marker::PhantomData;
50use std::sync::Arc;
51use std::task::{Wake, Waker};
52use std::time::Duration;
53
54#[cfg(not(all(loom, feature = "loom")))]
55use std::time::Instant;
56
57use sync::atomic::AtomicUsize;
58use sync::atomic::Ordering::SeqCst;
59use sync::{Condvar, Mutex};
60
61/// Creates a parker and an associated unparker.
62///
63/// # Examples
64///
65/// ```
66/// let (p, u) = parking::pair();
67/// ```
68pub fn pair() -> (Parker, Unparker) {
69    let p = Parker::new();
70    let u = p.unparker();
71    (p, u)
72}
73
74/// Waits for a notification.
75pub struct Parker {
76    unparker: Unparker,
77    _marker: PhantomData<Cell<()>>,
78}
79
80impl Parker {
81    /// Creates a new parker.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use parking::Parker;
87    ///
88    /// let p = Parker::new();
89    /// ```
90    ///
91    pub fn new() -> Parker {
92        Parker {
93            unparker: Unparker {
94                inner: Arc::new(Inner {
95                    state: AtomicUsize::new(EMPTY),
96                    lock: Mutex::new(()),
97                    cvar: Condvar::new(),
98                }),
99            },
100            _marker: PhantomData,
101        }
102    }
103
104    /// Blocks until notified and then goes back into unnotified state.
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// use parking::Parker;
110    ///
111    /// let p = Parker::new();
112    /// let u = p.unparker();
113    ///
114    /// // Notify the parker.
115    /// u.unpark();
116    ///
117    /// // Wakes up immediately because the parker is notified.
118    /// p.park();
119    /// ```
120    pub fn park(&self) {
121        self.unparker.inner.park(None);
122    }
123
124    /// Blocks until notified and then goes back into unnotified state, or times out after
125    /// `duration`.
126    ///
127    /// Returns `true` if notified before the timeout.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use std::time::Duration;
133    /// use parking::Parker;
134    ///
135    /// let p = Parker::new();
136    ///
137    /// // Wait for a notification, or time out after 500 ms.
138    /// p.park_timeout(Duration::from_millis(500));
139    /// ```
140    #[cfg(not(loom))]
141    pub fn park_timeout(&self, duration: Duration) -> bool {
142        self.unparker.inner.park(Some(duration))
143    }
144
145    /// Blocks until notified and then goes back into unnotified state, or times out at `instant`.
146    ///
147    /// Returns `true` if notified before the deadline.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use std::time::{Duration, Instant};
153    /// use parking::Parker;
154    ///
155    /// let p = Parker::new();
156    ///
157    /// // Wait for a notification, or time out after 500 ms.
158    /// p.park_deadline(Instant::now() + Duration::from_millis(500));
159    /// ```
160    #[cfg(not(loom))]
161    pub fn park_deadline(&self, instant: Instant) -> bool {
162        self.unparker
163            .inner
164            .park(Some(instant.saturating_duration_since(Instant::now())))
165    }
166
167    /// Notifies the parker.
168    ///
169    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
170    /// was already notified.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    /// use std::thread;
176    /// use std::time::Duration;
177    /// use parking::Parker;
178    ///
179    /// let p = Parker::new();
180    ///
181    /// assert_eq!(p.unpark(), true);
182    /// assert_eq!(p.unpark(), false);
183    ///
184    /// // Wakes up immediately.
185    /// p.park();
186    /// ```
187    pub fn unpark(&self) -> bool {
188        self.unparker.unpark()
189    }
190
191    /// Returns a handle for unparking.
192    ///
193    /// The returned [`Unparker`] can be cloned and shared among threads.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use parking::Parker;
199    ///
200    /// let p = Parker::new();
201    /// let u = p.unparker();
202    ///
203    /// // Notify the parker.
204    /// u.unpark();
205    ///
206    /// // Wakes up immediately because the parker is notified.
207    /// p.park();
208    /// ```
209    pub fn unparker(&self) -> Unparker {
210        self.unparker.clone()
211    }
212}
213
214impl Default for Parker {
215    fn default() -> Parker {
216        Parker::new()
217    }
218}
219
220impl fmt::Debug for Parker {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        f.pad("Parker { .. }")
223    }
224}
225
226/// Notifies a parker.
227pub struct Unparker {
228    inner: Arc<Inner>,
229}
230
231impl Unparker {
232    /// Notifies the associated parker.
233    ///
234    /// Returns `true` if this call is the first to notify the parker, or `false` if the parker
235    /// was already notified.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use std::thread;
241    /// use std::time::Duration;
242    /// use parking::Parker;
243    ///
244    /// let p = Parker::new();
245    /// let u = p.unparker();
246    ///
247    /// thread::spawn(move || {
248    ///     thread::sleep(Duration::from_millis(500));
249    ///     u.unpark();
250    /// });
251    ///
252    /// // Wakes up when `u.unpark()` notifies and then goes back into unnotified state.
253    /// p.park();
254    /// ```
255    pub fn unpark(&self) -> bool {
256        self.inner.unpark()
257    }
258
259    /// Indicates whether this unparker will unpark the associated parker.
260    ///
261    /// This can be used to avoid unnecessary work before calling `unpark()`.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// use parking::Parker;
267    ///
268    /// let p = Parker::new();
269    /// let u = p.unparker();
270    ///
271    /// assert!(u.will_unpark(&p));
272    /// ```
273    pub fn will_unpark(&self, parker: &Parker) -> bool {
274        Arc::ptr_eq(&self.inner, &parker.unparker.inner)
275    }
276
277    /// Indicates whether two unparkers will unpark the same parker.
278    ///
279    /// # Examples
280    ///
281    /// ```
282    /// use parking::Parker;
283    ///
284    /// let p = Parker::new();
285    /// let u1 = p.unparker();
286    /// let u2 = p.unparker();
287    ///
288    /// assert!(u1.same_parker(&u2));
289    /// ```
290    pub fn same_parker(&self, other: &Unparker) -> bool {
291        Arc::ptr_eq(&self.inner, &other.inner)
292    }
293}
294
295impl fmt::Debug for Unparker {
296    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297        f.pad("Unparker { .. }")
298    }
299}
300
301impl Clone for Unparker {
302    fn clone(&self) -> Unparker {
303        Unparker {
304            inner: self.inner.clone(),
305        }
306    }
307}
308
309impl From<Unparker> for Waker {
310    fn from(up: Unparker) -> Self {
311        Waker::from(up.inner)
312    }
313}
314
315const EMPTY: usize = 0;
316const PARKED: usize = 1;
317const NOTIFIED: usize = 2;
318
319struct Inner {
320    state: AtomicUsize,
321    lock: Mutex<()>,
322    cvar: Condvar,
323}
324
325impl Inner {
326    fn park(&self, timeout: Option<Duration>) -> bool {
327        // If we were previously notified then we consume this notification and return quickly.
328        if self
329            .state
330            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
331            .is_ok()
332        {
333            return true;
334        }
335
336        // If the timeout is zero, then there is no need to actually block.
337        if let Some(dur) = timeout {
338            if dur == Duration::from_millis(0) {
339                return false;
340            }
341        }
342
343        // Otherwise we need to coordinate going to sleep.
344        let mut m = self.lock.lock().unwrap();
345
346        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
347            Ok(_) => {}
348            // Consume this notification to avoid spurious wakeups in the next park.
349            Err(NOTIFIED) => {
350                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
351                // because `unpark` may have been called again since we read `NOTIFIED` in the
352                // `compare_exchange` above. We must perform an acquire operation that synchronizes
353                // with that `unpark` to observe any writes it made before the call to `unpark`. To
354                // do that we must read from the write it made to `state`.
355                let old = self.state.swap(EMPTY, SeqCst);
356                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
357                return true;
358            }
359            Err(n) => panic!("inconsistent park_timeout state: {}", n),
360        }
361
362        match timeout {
363            None => {
364                loop {
365                    // Block the current thread on the conditional variable.
366                    m = self.cvar.wait(m).unwrap();
367
368                    if self
369                        .state
370                        .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
371                        .is_ok()
372                    {
373                        // got a notification
374                        return true;
375                    }
376                }
377            }
378            Some(timeout) => {
379                #[cfg(not(loom))]
380                {
381                    // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
382                    // notification we just want to unconditionally set `state` back to `EMPTY`, either
383                    // consuming a notification or un-flagging ourselves as parked.
384                    let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
385
386                    match self.state.swap(EMPTY, SeqCst) {
387                        NOTIFIED => true, // got a notification
388                        PARKED => false,  // no notification
389                        n => panic!("inconsistent park_timeout state: {}", n),
390                    }
391                }
392
393                #[cfg(loom)]
394                {
395                    let _ = timeout;
396                    panic!("park_timeout is not supported under loom");
397                }
398            }
399        }
400    }
401
402    pub fn unpark(&self) -> bool {
403        // To ensure the unparked thread will observe any writes we made before this call, we must
404        // perform a release operation that `park` can synchronize with. To do that we must write
405        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
406        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
407        match self.state.swap(NOTIFIED, SeqCst) {
408            EMPTY => return true,     // no one was waiting
409            NOTIFIED => return false, // already unparked
410            PARKED => {}              // gotta go wake someone up
411            _ => panic!("inconsistent state in unpark"),
412        }
413
414        // There is a period between when the parked thread sets `state` to `PARKED` (or last
415        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
416        // If we were to notify during this period it would be ignored and then when the parked
417        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
418        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
419        //
420        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
421        // it doesn't get woken only to have to wait for us to release `lock`.
422        drop(self.lock.lock().unwrap());
423        self.cvar.notify_one();
424        true
425    }
426}
427
428impl Wake for Inner {
429    #[inline]
430    fn wake(self: Arc<Self>) {
431        self.unpark();
432    }
433
434    #[inline]
435    fn wake_by_ref(self: &Arc<Self>) {
436        self.unpark();
437    }
438}