futures_timer/native/timer.rs
1use std::fmt;
2use std::mem;
3use std::pin::Pin;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering::SeqCst;
6use std::sync::{Arc, Mutex, Weak};
7use std::task::{Context, Poll};
8use std::time::Instant;
9
10use std::future::Future;
11
12use super::AtomicWaker;
13use super::{global, ArcList, Heap, HeapTimer, Node, Slot};
14
15/// A "timer heap" used to power separately owned instances of `Delay`.
16///
17/// This timer is implemented as a priority queued-based heap. Each `Timer`
18/// contains a few primary methods which which to drive it:
19///
20/// * `next_wake` indicates how long the ambient system needs to sleep until it
21/// invokes further processing on a `Timer`
22/// * `advance_to` is what actually fires timers on the `Timer`, and should be
23/// called essentially every iteration of the event loop, or when the time
24/// specified by `next_wake` has elapsed.
25/// * The `Future` implementation for `Timer` is used to process incoming timer
26/// updates and requests. This is used to schedule new timeouts, update
27/// existing ones, or delete existing timeouts. The `Future` implementation
28/// will never resolve, but it'll schedule notifications of when to wake up
29/// and process more messages.
30///
31/// Note that if you're using this crate you probably don't need to use a
32/// `Timer` as there is a global one already available for you run on a helper
33/// thread. If this isn't desirable, though, then the
34/// `TimerHandle::set_fallback` method can be used instead!
35pub struct Timer {
36 inner: Arc<Inner>,
37 timer_heap: Heap<HeapTimer>,
38}
39
40/// A handle to a `Timer` which is used to create instances of a `Delay`.
41#[derive(Clone)]
42pub struct TimerHandle {
43 pub(crate) inner: Weak<Inner>,
44}
45
46pub(crate) struct Inner {
47 /// List of updates the `Timer` needs to process
48 pub(crate) list: ArcList<ScheduledTimer>,
49
50 /// The blocked `Timer` task to receive notifications to the `list` above.
51 pub(crate) waker: AtomicWaker,
52}
53
54/// Shared state between the `Timer` and a `Delay`.
55pub(crate) struct ScheduledTimer {
56 pub(crate) waker: AtomicWaker,
57
58 // The lowest bit here is whether the timer has fired or not, the second
59 // lowest bit is whether the timer has been invalidated, and all the other
60 // bits are the "generation" of the timer which is reset during the `reset`
61 // function. Only timers for a matching generation are fired.
62 pub(crate) state: AtomicUsize,
63
64 pub(crate) inner: Weak<Inner>,
65 pub(crate) at: Mutex<Option<Instant>>,
66
67 // TODO: this is only accessed by the timer thread, should have a more
68 // lightweight protection than a `Mutex`
69 pub(crate) slot: Mutex<Option<Slot>>,
70}
71
72impl Timer {
73 /// Creates a new timer heap ready to create new timers.
74 pub fn new() -> Timer {
75 Timer {
76 inner: Arc::new(Inner {
77 list: ArcList::new(),
78 waker: AtomicWaker::new(),
79 }),
80 timer_heap: Heap::new(),
81 }
82 }
83
84 /// Returns a handle to this timer heap, used to create new timeouts.
85 pub fn handle(&self) -> TimerHandle {
86 TimerHandle {
87 inner: Arc::downgrade(&self.inner),
88 }
89 }
90
91 /// Returns the time at which this timer next needs to be invoked with
92 /// `advance_to`.
93 ///
94 /// Event loops or threads typically want to sleep until the specified
95 /// instant.
96 pub fn next_event(&self) -> Option<Instant> {
97 self.timer_heap.peek().map(|t| t.at)
98 }
99
100 /// Proces any timers which are supposed to fire at or before the current
101 /// instant.
102 ///
103 /// This method is equivalent to `self.advance_to(Instant::now())`.
104 pub fn advance(&mut self) {
105 self.advance_to(Instant::now())
106 }
107
108 /// Proces any timers which are supposed to fire before `now` specified.
109 ///
110 /// This method should be called on `Timer` periodically to advance the
111 /// internal state and process any pending timers which need to fire.
112 pub fn advance_to(&mut self, now: Instant) {
113 loop {
114 match self.timer_heap.peek() {
115 Some(head) if head.at <= now => {}
116 Some(_) => break,
117 None => break,
118 };
119
120 // Flag the timer as fired and then notify its task, if any, that's
121 // blocked.
122 let heap_timer = self.timer_heap.pop().unwrap();
123 *heap_timer.node.slot.lock().unwrap() = None;
124 let bits = heap_timer.gen << 2;
125 match heap_timer
126 .node
127 .state
128 .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
129 {
130 Ok(_) => heap_timer.node.waker.wake(),
131 Err(_b) => {}
132 }
133 }
134 }
135
136 /// Either updates the timer at slot `idx` to fire at `at`, or adds a new
137 /// timer at `idx` and sets it to fire at `at`.
138 fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
139 // TODO: avoid remove + push and instead just do one sift of the heap?
140 // In theory we could update it in place and then do the percolation
141 // as necessary
142 let gen = node.state.load(SeqCst) >> 2;
143 let mut slot = node.slot.lock().unwrap();
144 if let Some(heap_slot) = slot.take() {
145 self.timer_heap.remove(heap_slot);
146 }
147 *slot = Some(self.timer_heap.push(HeapTimer {
148 at,
149 gen,
150 node: node.clone(),
151 }));
152 }
153
154 fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
155 // If this `idx` is still around and it's still got a registered timer,
156 // then we jettison it form the timer heap.
157 let mut slot = node.slot.lock().unwrap();
158 let heap_slot = match slot.take() {
159 Some(slot) => slot,
160 None => return,
161 };
162 self.timer_heap.remove(heap_slot);
163 }
164
165 fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
166 node.state.fetch_or(0b10, SeqCst);
167 node.waker.wake();
168 }
169}
170
171impl Future for Timer {
172 type Output = ();
173
174 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175 Pin::new(&mut self.inner).waker.register(cx.waker());
176 let mut list = self.inner.list.take();
177 while let Some(node) = list.pop() {
178 let at = *node.at.lock().unwrap();
179 match at {
180 Some(at) => self.update_or_add(at, node),
181 None => self.remove(node),
182 }
183 }
184 Poll::Pending
185 }
186}
187
188impl Drop for Timer {
189 fn drop(&mut self) {
190 // Seal off our list to prevent any more updates from getting pushed on.
191 // Any timer which sees an error from the push will immediately become
192 // inert.
193 let mut list = self.inner.list.take_and_seal();
194
195 // Now that we'll never receive another timer, drain the list of all
196 // updates and also drain our heap of all active timers, invalidating
197 // everything.
198 while let Some(t) = list.pop() {
199 self.invalidate(t);
200 }
201 while let Some(t) = self.timer_heap.pop() {
202 self.invalidate(t.node);
203 }
204 }
205}
206
207impl fmt::Debug for Timer {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
209 f.debug_struct("Timer").field("heap", &"...").finish()
210 }
211}
212
213impl Default for Timer {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
220
221/// Error returned from `TimerHandle::set_fallback`.
222#[derive(Clone, Debug)]
223struct SetDefaultError(());
224
225impl TimerHandle {
226 /// Configures this timer handle to be the one returned by
227 /// `TimerHandle::default`.
228 ///
229 /// By default a global thread is initialized on the first call to
230 /// `TimerHandle::default`. This first call can happen transitively through
231 /// `Delay::new`. If, however, that hasn't happened yet then the global
232 /// default timer handle can be configured through this method.
233 ///
234 /// This method can be used to prevent the global helper thread from
235 /// spawning. If this method is successful then the global helper thread
236 /// will never get spun up.
237 ///
238 /// On success this timer handle will have installed itself globally to be
239 /// used as the return value for `TimerHandle::default` unless otherwise
240 /// specified.
241 ///
242 /// # Errors
243 ///
244 /// If another thread has already called `set_as_global_fallback` or this
245 /// thread otherwise loses a race to call this method then it will fail
246 /// returning an error. Once a call to `set_as_global_fallback` is
247 /// successful then no future calls may succeed.
248 fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
249 unsafe {
250 let val = self.into_usize();
251 match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
252 Ok(_) => Ok(()),
253 Err(_) => {
254 drop(TimerHandle::from_usize(val));
255 Err(SetDefaultError(()))
256 }
257 }
258 }
259 }
260
261 fn into_usize(self) -> usize {
262 unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
263 }
264
265 unsafe fn from_usize(val: usize) -> TimerHandle {
266 let inner = mem::transmute::<usize, Weak<Inner>>(val);
267 TimerHandle { inner }
268 }
269}
270
271impl Default for TimerHandle {
272 fn default() -> TimerHandle {
273 let mut fallback = HANDLE_FALLBACK.load(SeqCst);
274
275 // If the fallback hasn't been previously initialized then let's spin
276 // up a helper thread and try to initialize with that. If we can't
277 // actually create a helper thread then we'll just return a "defunkt"
278 // handle which will return errors when timer objects are attempted to
279 // be associated.
280 if fallback == 0 {
281 let helper = match global::HelperThread::new() {
282 Ok(helper) => helper,
283 Err(_) => return TimerHandle { inner: Weak::new() },
284 };
285
286 // If we successfully set ourselves as the actual fallback then we
287 // want to `forget` the helper thread to ensure that it persists
288 // globally. If we fail to set ourselves as the fallback that means
289 // that someone was racing with this call to
290 // `TimerHandle::default`. They ended up winning so we'll destroy
291 // our helper thread (which shuts down the thread) and reload the
292 // fallback.
293 if helper.handle().set_as_global_fallback().is_ok() {
294 let ret = helper.handle();
295 helper.forget();
296 return ret;
297 }
298 fallback = HANDLE_FALLBACK.load(SeqCst);
299 }
300
301 // At this point our fallback handle global was configured so we use
302 // its value to reify a handle, clone it, and then forget our reified
303 // handle as we don't actually have an owning reference to it.
304 assert!(fallback != 0);
305 unsafe {
306 let handle = TimerHandle::from_usize(fallback);
307 let ret = handle.clone();
308 let _ = handle.into_usize();
309 ret
310 }
311 }
312}
313
314impl fmt::Debug for TimerHandle {
315 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
316 f.debug_struct("TimerHandle")
317 .field("inner", &"...")
318 .finish()
319 }
320}