futures_timer/native/
atomic_waker.rs

1use core::cell::UnsafeCell;
2use core::fmt;
3use core::sync::atomic::AtomicUsize;
4use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5use core::task::Waker;
6
7/// A synchronization primitive for task wakeup.
8///
9/// Sometimes the task interested in a given event will change over time.
10/// An `AtomicWaker` can coordinate concurrent notifications with the consumer
11/// potentially "updating" the underlying task to wake up. This is useful in
12/// scenarios where a computation completes in another thread and wants to
13/// notify the consumer, but the consumer is in the process of being migrated to
14/// a new logical task.
15///
16/// Consumers should call `register` before checking the result of a computation
17/// and producers should call `wake` after producing the computation (this
18/// differs from the usual `thread::park` pattern). It is also permitted for
19/// `wake` to be called **before** `register`. This results in a no-op.
20///
21/// A single `AtomicWaker` may be reused for any number of calls to `register` or
22/// `wake`.
23///
24/// `AtomicWaker` does not provide any memory ordering guarantees, as such the
25/// user should use caution and use other synchronization primitives to guard
26/// the result of the underlying computation.
27pub struct AtomicWaker {
28    state: AtomicUsize,
29    waker: UnsafeCell<Option<Waker>>,
30}
31
32/// Idle state
33const WAITING: usize = 0;
34
35/// A new waker value is being registered with the `AtomicWaker` cell.
36const REGISTERING: usize = 0b01;
37
38/// The waker currently registered with the `AtomicWaker` cell is being woken.
39const WAKING: usize = 0b10;
40
41impl AtomicWaker {
42    /// Create an `AtomicWaker`.
43    pub fn new() -> AtomicWaker {
44        // Make sure that task is Sync
45        trait AssertSync: Sync {}
46        impl AssertSync for Waker {}
47
48        AtomicWaker {
49            state: AtomicUsize::new(WAITING),
50            waker: UnsafeCell::new(None),
51        }
52    }
53
54    /// Registers the waker to be notified on calls to `wake`.
55    ///
56    /// The new task will take place of any previous tasks that were registered
57    /// by previous calls to `register`. Any calls to `wake` that happen after
58    /// a call to `register` (as defined by the memory ordering rules), will
59    /// notify the `register` caller's task and deregister the waker from future
60    /// notifications. Because of this, callers should ensure `register` gets
61    /// invoked with a new `Waker` **each** time they require a wakeup.
62    ///
63    /// It is safe to call `register` with multiple other threads concurrently
64    /// calling `wake`. This will result in the `register` caller's current
65    /// task being notified once.
66    ///
67    /// This function is safe to call concurrently, but this is generally a bad
68    /// idea. Concurrent calls to `register` will attempt to register different
69    /// tasks to be notified. One of the callers will win and have its task set,
70    /// but there is no guarantee as to which caller will succeed.
71    ///
72    /// # Examples
73    ///
74    /// Here is how `register` is used when implementing a flag.
75    ///
76    /// ```
77    /// use std::future::Future;
78    /// use std::task::{Context, Poll};
79    /// use std::sync::atomic::AtomicBool;
80    /// use std::sync::atomic::Ordering::SeqCst;
81    /// use std::pin::Pin;
82    ///
83    /// use futures::task::AtomicWaker;
84    ///
85    /// struct Flag {
86    ///     waker: AtomicWaker,
87    ///     set: AtomicBool,
88    /// }
89    ///
90    /// impl Future for Flag {
91    ///     type Output = ();
92    ///
93    ///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
94    ///         // Register **before** checking `set` to avoid a race condition
95    ///         // that would result in lost notifications.
96    ///         self.waker.register(cx.waker());
97    ///
98    ///         if self.set.load(SeqCst) {
99    ///             Poll::Ready(())
100    ///         } else {
101    ///             Poll::Pending
102    ///         }
103    ///     }
104    /// }
105    /// ```
106    pub fn register(&self, waker: &Waker) {
107        match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
108            WAITING => {
109                unsafe {
110                    // Locked acquired, update the waker cell
111                    *self.waker.get() = Some(waker.clone());
112
113                    // Release the lock. If the state transitioned to include
114                    // the `WAKING` bit, this means that a wake has been
115                    // called concurrently, so we have to remove the waker and
116                    // wake it.`
117                    //
118                    // Start by assuming that the state is `REGISTERING` as this
119                    // is what we jut set it to.
120                    let res = self
121                        .state
122                        .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
123
124                    match res {
125                        Ok(_) => {}
126                        Err(actual) => {
127                            // This branch can only be reached if a
128                            // concurrent thread called `wake`. In this
129                            // case, `actual` **must** be `REGISTERING |
130                            // `WAKING`.
131                            debug_assert_eq!(actual, REGISTERING | WAKING);
132
133                            // Take the waker to wake once the atomic operation has
134                            // completed.
135                            let waker = (*self.waker.get()).take().unwrap();
136
137                            // Just swap, because no one could change state while state == `REGISTERING` | `WAKING`.
138                            self.state.swap(WAITING, AcqRel);
139
140                            // The atomic swap was complete, now
141                            // wake the task and return.
142                            waker.wake();
143                        }
144                    }
145                }
146            }
147            WAKING => {
148                // Currently in the process of waking the task, i.e.,
149                // `wake` is currently being called on the old task handle.
150                // So, we call wake on the new waker
151                waker.wake_by_ref();
152            }
153            state => {
154                // In this case, a concurrent thread is holding the
155                // "registering" lock. This probably indicates a bug in the
156                // caller's code as racing to call `register` doesn't make much
157                // sense.
158                //
159                // We just want to maintain memory safety. It is ok to drop the
160                // call to `register`.
161                debug_assert!(state == REGISTERING || state == REGISTERING | WAKING);
162            }
163        }
164    }
165
166    /// Calls `wake` on the last `Waker` passed to `register`.
167    ///
168    /// If `register` has not been called yet, then this does nothing.
169    pub fn wake(&self) {
170        if let Some(waker) = self.take() {
171            waker.wake();
172        }
173    }
174
175    /// Returns the last `Waker` passed to `register`, so that the user can wake it.
176    ///
177    ///
178    /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user
179    /// to take the waker and then wake it separately, rather than performing both steps in one
180    /// atomic action.
181    ///
182    /// If a waker has not been registered, this returns `None`.
183    pub fn take(&self) -> Option<Waker> {
184        // AcqRel ordering is used in order to acquire the value of the `task`
185        // cell as well as to establish a `release` ordering with whatever
186        // memory the `AtomicWaker` is associated with.
187        match self.state.fetch_or(WAKING, AcqRel) {
188            WAITING => {
189                // The waking lock has been acquired.
190                let waker = unsafe { (*self.waker.get()).take() };
191
192                // Release the lock
193                self.state.fetch_and(!WAKING, Release);
194
195                waker
196            }
197            state => {
198                // There is a concurrent thread currently updating the
199                // associated task.
200                //
201                // Nothing more to do as the `WAKING` bit has been set. It
202                // doesn't matter if there are concurrent registering threads or
203                // not.
204                //
205                debug_assert!(
206                    state == REGISTERING || state == REGISTERING | WAKING || state == WAKING
207                );
208                None
209            }
210        }
211    }
212}
213
214impl Default for AtomicWaker {
215    fn default() -> Self {
216        AtomicWaker::new()
217    }
218}
219
220impl fmt::Debug for AtomicWaker {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        write!(f, "AtomicWaker")
223    }
224}
225
226unsafe impl Send for AtomicWaker {}
227unsafe impl Sync for AtomicWaker {}