futures_timer/native/
timer.rs

1use std::fmt;
2use std::mem;
3use std::pin::Pin;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering::SeqCst;
6use std::sync::{Arc, Mutex, Weak};
7use std::task::{Context, Poll};
8use std::time::Instant;
9
10use std::future::Future;
11
12use super::AtomicWaker;
13use super::{global, ArcList, Heap, HeapTimer, Node, Slot};
14
15/// A "timer heap" used to power separately owned instances of `Delay`.
16///
17/// This timer is implemented as a priority queued-based heap. Each `Timer`
18/// contains a few primary methods which which to drive it:
19///
20/// * `next_wake` indicates how long the ambient system needs to sleep until it
21///   invokes further processing on a `Timer`
22/// * `advance_to` is what actually fires timers on the `Timer`, and should be
23///   called essentially every iteration of the event loop, or when the time
24///   specified by `next_wake` has elapsed.
25/// * The `Future` implementation for `Timer` is used to process incoming timer
26///   updates and requests. This is used to schedule new timeouts, update
27///   existing ones, or delete existing timeouts. The `Future` implementation
28///   will never resolve, but it'll schedule notifications of when to wake up
29///   and process more messages.
30///
31/// Note that if you're using this crate you probably don't need to use a
32/// `Timer` as there is a global one already available for you run on a helper
33/// thread. If this isn't desirable, though, then the
34/// `TimerHandle::set_fallback` method can be used instead!
35pub struct Timer {
36    inner: Arc<Inner>,
37    timer_heap: Heap<HeapTimer>,
38}
39
40/// A handle to a `Timer` which is used to create instances of a `Delay`.
41#[derive(Clone)]
42pub struct TimerHandle {
43    pub(crate) inner: Weak<Inner>,
44}
45
46pub(crate) struct Inner {
47    /// List of updates the `Timer` needs to process
48    pub(crate) list: ArcList<ScheduledTimer>,
49
50    /// The blocked `Timer` task to receive notifications to the `list` above.
51    pub(crate) waker: AtomicWaker,
52}
53
54/// Shared state between the `Timer` and a `Delay`.
55pub(crate) struct ScheduledTimer {
56    pub(crate) waker: AtomicWaker,
57
58    // The lowest bit here is whether the timer has fired or not, the second
59    // lowest bit is whether the timer has been invalidated, and all the other
60    // bits are the "generation" of the timer which is reset during the `reset`
61    // function. Only timers for a matching generation are fired.
62    pub(crate) state: AtomicUsize,
63
64    pub(crate) inner: Weak<Inner>,
65    pub(crate) at: Mutex<Option<Instant>>,
66
67    // TODO: this is only accessed by the timer thread, should have a more
68    // lightweight protection than a `Mutex`
69    pub(crate) slot: Mutex<Option<Slot>>,
70}
71
72impl Timer {
73    /// Creates a new timer heap ready to create new timers.
74    pub fn new() -> Timer {
75        Timer {
76            inner: Arc::new(Inner {
77                list: ArcList::new(),
78                waker: AtomicWaker::new(),
79            }),
80            timer_heap: Heap::new(),
81        }
82    }
83
84    /// Returns a handle to this timer heap, used to create new timeouts.
85    pub fn handle(&self) -> TimerHandle {
86        TimerHandle {
87            inner: Arc::downgrade(&self.inner),
88        }
89    }
90
91    /// Returns the time at which this timer next needs to be invoked with
92    /// `advance_to`.
93    ///
94    /// Event loops or threads typically want to sleep until the specified
95    /// instant.
96    pub fn next_event(&self) -> Option<Instant> {
97        self.timer_heap.peek().map(|t| t.at)
98    }
99
100    /// Proces any timers which are supposed to fire at or before the current
101    /// instant.
102    ///
103    /// This method is equivalent to `self.advance_to(Instant::now())`.
104    pub fn advance(&mut self) {
105        self.advance_to(Instant::now())
106    }
107
108    /// Proces any timers which are supposed to fire before `now` specified.
109    ///
110    /// This method should be called on `Timer` periodically to advance the
111    /// internal state and process any pending timers which need to fire.
112    pub fn advance_to(&mut self, now: Instant) {
113        loop {
114            match self.timer_heap.peek() {
115                Some(head) if head.at <= now => {}
116                Some(_) => break,
117                None => break,
118            };
119
120            // Flag the timer as fired and then notify its task, if any, that's
121            // blocked.
122            let heap_timer = self.timer_heap.pop().unwrap();
123            *heap_timer.node.slot.lock().unwrap() = None;
124            let bits = heap_timer.gen << 2;
125            match heap_timer
126                .node
127                .state
128                .compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
129            {
130                Ok(_) => heap_timer.node.waker.wake(),
131                Err(_b) => {}
132            }
133        }
134    }
135
136    /// Either updates the timer at slot `idx` to fire at `at`, or adds a new
137    /// timer at `idx` and sets it to fire at `at`.
138    fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
139        // TODO: avoid remove + push and instead just do one sift of the heap?
140        // In theory we could update it in place and then do the percolation
141        // as necessary
142        let gen = node.state.load(SeqCst) >> 2;
143        let mut slot = node.slot.lock().unwrap();
144        if let Some(heap_slot) = slot.take() {
145            self.timer_heap.remove(heap_slot);
146        }
147        *slot = Some(self.timer_heap.push(HeapTimer {
148            at,
149            gen,
150            node: node.clone(),
151        }));
152    }
153
154    fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
155        // If this `idx` is still around and it's still got a registered timer,
156        // then we jettison it form the timer heap.
157        let mut slot = node.slot.lock().unwrap();
158        let heap_slot = match slot.take() {
159            Some(slot) => slot,
160            None => return,
161        };
162        self.timer_heap.remove(heap_slot);
163    }
164
165    fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
166        node.state.fetch_or(0b10, SeqCst);
167        node.waker.wake();
168    }
169}
170
171impl Future for Timer {
172    type Output = ();
173
174    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
175        Pin::new(&mut self.inner).waker.register(cx.waker());
176        let mut list = self.inner.list.take();
177        while let Some(node) = list.pop() {
178            let at = *node.at.lock().unwrap();
179            match at {
180                Some(at) => self.update_or_add(at, node),
181                None => self.remove(node),
182            }
183        }
184        Poll::Pending
185    }
186}
187
188impl Drop for Timer {
189    fn drop(&mut self) {
190        // Seal off our list to prevent any more updates from getting pushed on.
191        // Any timer which sees an error from the push will immediately become
192        // inert.
193        let mut list = self.inner.list.take_and_seal();
194
195        // Now that we'll never receive another timer, drain the list of all
196        // updates and also drain our heap of all active timers, invalidating
197        // everything.
198        while let Some(t) = list.pop() {
199            self.invalidate(t);
200        }
201        while let Some(t) = self.timer_heap.pop() {
202            self.invalidate(t.node);
203        }
204    }
205}
206
207impl fmt::Debug for Timer {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
209        f.debug_struct("Timer").field("heap", &"...").finish()
210    }
211}
212
213impl Default for Timer {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
220
221/// Error returned from `TimerHandle::set_fallback`.
222#[derive(Clone, Debug)]
223struct SetDefaultError(());
224
225impl TimerHandle {
226    /// Configures this timer handle to be the one returned by
227    /// `TimerHandle::default`.
228    ///
229    /// By default a global thread is initialized on the first call to
230    /// `TimerHandle::default`. This first call can happen transitively through
231    /// `Delay::new`. If, however, that hasn't happened yet then the global
232    /// default timer handle can be configured through this method.
233    ///
234    /// This method can be used to prevent the global helper thread from
235    /// spawning. If this method is successful then the global helper thread
236    /// will never get spun up.
237    ///
238    /// On success this timer handle will have installed itself globally to be
239    /// used as the return value for `TimerHandle::default` unless otherwise
240    /// specified.
241    ///
242    /// # Errors
243    ///
244    /// If another thread has already called `set_as_global_fallback` or this
245    /// thread otherwise loses a race to call this method then it will fail
246    /// returning an error. Once a call to `set_as_global_fallback` is
247    /// successful then no future calls may succeed.
248    fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
249        unsafe {
250            let val = self.into_usize();
251            match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
252                Ok(_) => Ok(()),
253                Err(_) => {
254                    drop(TimerHandle::from_usize(val));
255                    Err(SetDefaultError(()))
256                }
257            }
258        }
259    }
260
261    fn into_usize(self) -> usize {
262        unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
263    }
264
265    unsafe fn from_usize(val: usize) -> TimerHandle {
266        let inner = mem::transmute::<usize, Weak<Inner>>(val);
267        TimerHandle { inner }
268    }
269}
270
271impl Default for TimerHandle {
272    fn default() -> TimerHandle {
273        let mut fallback = HANDLE_FALLBACK.load(SeqCst);
274
275        // If the fallback hasn't been previously initialized then let's spin
276        // up a helper thread and try to initialize with that. If we can't
277        // actually create a helper thread then we'll just return a "defunkt"
278        // handle which will return errors when timer objects are attempted to
279        // be associated.
280        if fallback == 0 {
281            let helper = match global::HelperThread::new() {
282                Ok(helper) => helper,
283                Err(_) => return TimerHandle { inner: Weak::new() },
284            };
285
286            // If we successfully set ourselves as the actual fallback then we
287            // want to `forget` the helper thread to ensure that it persists
288            // globally. If we fail to set ourselves as the fallback that means
289            // that someone was racing with this call to
290            // `TimerHandle::default`.  They ended up winning so we'll destroy
291            // our helper thread (which shuts down the thread) and reload the
292            // fallback.
293            if helper.handle().set_as_global_fallback().is_ok() {
294                let ret = helper.handle();
295                helper.forget();
296                return ret;
297            }
298            fallback = HANDLE_FALLBACK.load(SeqCst);
299        }
300
301        // At this point our fallback handle global was configured so we use
302        // its value to reify a handle, clone it, and then forget our reified
303        // handle as we don't actually have an owning reference to it.
304        assert!(fallback != 0);
305        unsafe {
306            let handle = TimerHandle::from_usize(fallback);
307            let ret = handle.clone();
308            let _ = handle.into_usize();
309            ret
310        }
311    }
312}
313
314impl fmt::Debug for TimerHandle {
315    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
316        f.debug_struct("TimerHandle")
317            .field("inner", &"...")
318            .finish()
319    }
320}