1//! Support for creating futures that represent timeouts.
2//!
3//! This module contains the `Delay` type which is a future that will resolve
4//! at a particular point in the future.
56use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::SeqCst;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
1415use super::arc_list::Node;
16use super::AtomicWaker;
17use super::{ScheduledTimer, TimerHandle};
1819/// A future representing the notification that an elapsed duration has
20/// occurred.
21///
22/// This is created through the `Delay::new` method indicating when the future should fire.
23/// Note that these futures are not intended for high resolution timers, but rather they will
24/// likely fire some granularity after the exact instant that they're otherwise indicated to fire
25/// at.
26pub struct Delay {
27 state: Option<Arc<Node<ScheduledTimer>>>,
28}
2930impl Delay {
31/// Creates a new future which will fire at `dur` time into the future.
32 ///
33 /// The returned object will be bound to the default timer for this thread.
34 /// The default timer will be spun up in a helper thread on first use.
35#[inline]
36pub fn new(dur: Duration) -> Delay {
37 Delay::new_handle(Instant::now() + dur, Default::default())
38 }
3940/// Creates a new future which will fire at the time specified by `at`.
41 ///
42 /// The returned instance of `Delay` will be bound to the timer specified by
43 /// the `handle` argument.
44pub(crate) fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
45let inner = match handle.inner.upgrade() {
46Some(i) => i,
47None => return Delay { state: None },
48 };
49let state = Arc::new(Node::new(ScheduledTimer {
50 at: Mutex::new(Some(at)),
51 state: AtomicUsize::new(0),
52 waker: AtomicWaker::new(),
53 inner: handle.inner,
54 slot: Mutex::new(None),
55 }));
5657// If we fail to actually push our node then we've become an inert
58 // timer, meaning that we'll want to immediately return an error from
59 // `poll`.
60if inner.list.push(&state).is_err() {
61return Delay { state: None };
62 }
6364 inner.waker.wake();
65 Delay { state: Some(state) }
66 }
6768/// Resets this timeout to an new timeout which will fire at the time
69 /// specified by `at`.
70#[inline]
71pub fn reset(&mut self, dur: Duration) {
72if self._reset(dur).is_err() {
73self.state = None
74}
75 }
7677fn _reset(&mut self, dur: Duration) -> Result<(), ()> {
78let state = match self.state {
79Some(ref state) => state,
80None => return Err(()),
81 };
82if let Some(timeouts) = state.inner.upgrade() {
83let mut bits = state.state.load(SeqCst);
84loop {
85// If we've been invalidated, cancel this reset
86if bits & 0b10 != 0 {
87return Err(());
88 }
89let new = bits.wrapping_add(0b100) & !0b11;
90match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
91Ok(_) => break,
92Err(s) => bits = s,
93 }
94 }
95*state.at.lock().unwrap() = Some(Instant::now() + dur);
96// If we fail to push our node then we've become an inert timer, so
97 // we'll want to clear our `state` field accordingly
98timeouts.list.push(state)?;
99 timeouts.waker.wake();
100 }
101102Ok(())
103 }
104}
105106impl Future for Delay {
107type Output = ();
108109fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110let state = match self.state {
111Some(ref state) => state,
112None => panic!("timer has gone away"),
113 };
114115if state.state.load(SeqCst) & 1 != 0 {
116return Poll::Ready(());
117 }
118119 state.waker.register(&cx.waker());
120121// Now that we've registered, do the full check of our own internal
122 // state. If we've fired the first bit is set, and if we've been
123 // invalidated the second bit is set.
124match state.state.load(SeqCst) {
125 n if n & 0b01 != 0 => Poll::Ready(()),
126 n if n & 0b10 != 0 => panic!("timer has gone away"),
127_ => Poll::Pending,
128 }
129 }
130}
131132impl Drop for Delay {
133fn drop(&mut self) {
134let state = match self.state {
135Some(ref s) => s,
136None => return,
137 };
138if let Some(timeouts) = state.inner.upgrade() {
139*state.at.lock().unwrap() = None;
140if timeouts.list.push(state).is_ok() {
141 timeouts.waker.wake();
142 }
143 }
144 }
145}
146147impl fmt::Debug for Delay {
148fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
149 f.debug_struct("Delay").finish()
150 }
151}