futures_timer/native/atomic_waker.rs
1use core::cell::UnsafeCell;
2use core::fmt;
3use core::sync::atomic::AtomicUsize;
4use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5use core::task::Waker;
6
7/// A synchronization primitive for task wakeup.
8///
9/// Sometimes the task interested in a given event will change over time.
10/// An `AtomicWaker` can coordinate concurrent notifications with the consumer
11/// potentially "updating" the underlying task to wake up. This is useful in
12/// scenarios where a computation completes in another thread and wants to
13/// notify the consumer, but the consumer is in the process of being migrated to
14/// a new logical task.
15///
16/// Consumers should call `register` before checking the result of a computation
17/// and producers should call `wake` after producing the computation (this
18/// differs from the usual `thread::park` pattern). It is also permitted for
19/// `wake` to be called **before** `register`. This results in a no-op.
20///
21/// A single `AtomicWaker` may be reused for any number of calls to `register` or
22/// `wake`.
23///
24/// `AtomicWaker` does not provide any memory ordering guarantees, as such the
25/// user should use caution and use other synchronization primitives to guard
26/// the result of the underlying computation.
27pub struct AtomicWaker {
28 state: AtomicUsize,
29 waker: UnsafeCell<Option<Waker>>,
30}
31
32/// Idle state
33const WAITING: usize = 0;
34
35/// A new waker value is being registered with the `AtomicWaker` cell.
36const REGISTERING: usize = 0b01;
37
38/// The waker currently registered with the `AtomicWaker` cell is being woken.
39const WAKING: usize = 0b10;
40
41impl AtomicWaker {
42 /// Create an `AtomicWaker`.
43 pub fn new() -> AtomicWaker {
44 // Make sure that task is Sync
45 trait AssertSync: Sync {}
46 impl AssertSync for Waker {}
47
48 AtomicWaker {
49 state: AtomicUsize::new(WAITING),
50 waker: UnsafeCell::new(None),
51 }
52 }
53
54 /// Registers the waker to be notified on calls to `wake`.
55 ///
56 /// The new task will take place of any previous tasks that were registered
57 /// by previous calls to `register`. Any calls to `wake` that happen after
58 /// a call to `register` (as defined by the memory ordering rules), will
59 /// notify the `register` caller's task and deregister the waker from future
60 /// notifications. Because of this, callers should ensure `register` gets
61 /// invoked with a new `Waker` **each** time they require a wakeup.
62 ///
63 /// It is safe to call `register` with multiple other threads concurrently
64 /// calling `wake`. This will result in the `register` caller's current
65 /// task being notified once.
66 ///
67 /// This function is safe to call concurrently, but this is generally a bad
68 /// idea. Concurrent calls to `register` will attempt to register different
69 /// tasks to be notified. One of the callers will win and have its task set,
70 /// but there is no guarantee as to which caller will succeed.
71 ///
72 /// # Examples
73 ///
74 /// Here is how `register` is used when implementing a flag.
75 ///
76 /// ```
77 /// use std::future::Future;
78 /// use std::task::{Context, Poll};
79 /// use std::sync::atomic::AtomicBool;
80 /// use std::sync::atomic::Ordering::SeqCst;
81 /// use std::pin::Pin;
82 ///
83 /// use futures::task::AtomicWaker;
84 ///
85 /// struct Flag {
86 /// waker: AtomicWaker,
87 /// set: AtomicBool,
88 /// }
89 ///
90 /// impl Future for Flag {
91 /// type Output = ();
92 ///
93 /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
94 /// // Register **before** checking `set` to avoid a race condition
95 /// // that would result in lost notifications.
96 /// self.waker.register(cx.waker());
97 ///
98 /// if self.set.load(SeqCst) {
99 /// Poll::Ready(())
100 /// } else {
101 /// Poll::Pending
102 /// }
103 /// }
104 /// }
105 /// ```
106 pub fn register(&self, waker: &Waker) {
107 match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
108 WAITING => {
109 unsafe {
110 // Locked acquired, update the waker cell
111 *self.waker.get() = Some(waker.clone());
112
113 // Release the lock. If the state transitioned to include
114 // the `WAKING` bit, this means that a wake has been
115 // called concurrently, so we have to remove the waker and
116 // wake it.`
117 //
118 // Start by assuming that the state is `REGISTERING` as this
119 // is what we jut set it to.
120 let res = self
121 .state
122 .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
123
124 match res {
125 Ok(_) => {}
126 Err(actual) => {
127 // This branch can only be reached if a
128 // concurrent thread called `wake`. In this
129 // case, `actual` **must** be `REGISTERING |
130 // `WAKING`.
131 debug_assert_eq!(actual, REGISTERING | WAKING);
132
133 // Take the waker to wake once the atomic operation has
134 // completed.
135 let waker = (*self.waker.get()).take().unwrap();
136
137 // Just swap, because no one could change state while state == `REGISTERING` | `WAKING`.
138 self.state.swap(WAITING, AcqRel);
139
140 // The atomic swap was complete, now
141 // wake the task and return.
142 waker.wake();
143 }
144 }
145 }
146 }
147 WAKING => {
148 // Currently in the process of waking the task, i.e.,
149 // `wake` is currently being called on the old task handle.
150 // So, we call wake on the new waker
151 waker.wake_by_ref();
152 }
153 state => {
154 // In this case, a concurrent thread is holding the
155 // "registering" lock. This probably indicates a bug in the
156 // caller's code as racing to call `register` doesn't make much
157 // sense.
158 //
159 // We just want to maintain memory safety. It is ok to drop the
160 // call to `register`.
161 debug_assert!(state == REGISTERING || state == REGISTERING | WAKING);
162 }
163 }
164 }
165
166 /// Calls `wake` on the last `Waker` passed to `register`.
167 ///
168 /// If `register` has not been called yet, then this does nothing.
169 pub fn wake(&self) {
170 if let Some(waker) = self.take() {
171 waker.wake();
172 }
173 }
174
175 /// Returns the last `Waker` passed to `register`, so that the user can wake it.
176 ///
177 ///
178 /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user
179 /// to take the waker and then wake it separately, rather than performing both steps in one
180 /// atomic action.
181 ///
182 /// If a waker has not been registered, this returns `None`.
183 pub fn take(&self) -> Option<Waker> {
184 // AcqRel ordering is used in order to acquire the value of the `task`
185 // cell as well as to establish a `release` ordering with whatever
186 // memory the `AtomicWaker` is associated with.
187 match self.state.fetch_or(WAKING, AcqRel) {
188 WAITING => {
189 // The waking lock has been acquired.
190 let waker = unsafe { (*self.waker.get()).take() };
191
192 // Release the lock
193 self.state.fetch_and(!WAKING, Release);
194
195 waker
196 }
197 state => {
198 // There is a concurrent thread currently updating the
199 // associated task.
200 //
201 // Nothing more to do as the `WAKING` bit has been set. It
202 // doesn't matter if there are concurrent registering threads or
203 // not.
204 //
205 debug_assert!(
206 state == REGISTERING || state == REGISTERING | WAKING || state == WAKING
207 );
208 None
209 }
210 }
211 }
212}
213
214impl Default for AtomicWaker {
215 fn default() -> Self {
216 AtomicWaker::new()
217 }
218}
219
220impl fmt::Debug for AtomicWaker {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 write!(f, "AtomicWaker")
223 }
224}
225
226unsafe impl Send for AtomicWaker {}
227unsafe impl Sync for AtomicWaker {}