futures_timer/native/
global.rs

1use std::future::Future;
2use std::io;
3use std::mem::{self, ManuallyDrop};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7use std::task::{Context, RawWaker, RawWakerVTable, Waker};
8use std::thread;
9use std::thread::Thread;
10use std::time::Instant;
11
12use super::{Timer, TimerHandle};
13
14pub struct HelperThread {
15    thread: Option<thread::JoinHandle<()>>,
16    timer: TimerHandle,
17    done: Arc<AtomicBool>,
18}
19
20impl HelperThread {
21    pub fn new() -> io::Result<HelperThread> {
22        let timer = Timer::new();
23        let timer_handle = timer.handle();
24        let done = Arc::new(AtomicBool::new(false));
25        let done2 = done.clone();
26        let thread = thread::Builder::new()
27            .name("futures-timer".to_owned())
28            .spawn(move || run(timer, done2))?;
29
30        Ok(HelperThread {
31            thread: Some(thread),
32            done,
33            timer: timer_handle,
34        })
35    }
36
37    pub fn handle(&self) -> TimerHandle {
38        self.timer.clone()
39    }
40
41    pub fn forget(mut self) {
42        self.thread.take();
43    }
44}
45
46impl Drop for HelperThread {
47    fn drop(&mut self) {
48        let thread = match self.thread.take() {
49            Some(thread) => thread,
50            None => return,
51        };
52        self.done.store(true, Ordering::SeqCst);
53        thread.thread().unpark();
54        drop(thread.join());
55    }
56}
57
58fn run(mut timer: Timer, done: Arc<AtomicBool>) {
59    let waker = current_thread_waker();
60    let mut cx = Context::from_waker(&waker);
61
62    while !done.load(Ordering::SeqCst) {
63        let _ = Pin::new(&mut timer).poll(&mut cx);
64
65        timer.advance();
66        match timer.next_event() {
67            // Ok, block for the specified time
68            Some(when) => {
69                let now = Instant::now();
70                if now < when {
71                    thread::park_timeout(when - now)
72                } else {
73                    // .. continue...
74                }
75            }
76
77            // Just wait for one of our futures to wake up
78            None => thread::park(),
79        }
80    }
81}
82
83static VTABLE: RawWakerVTable = RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
84
85fn raw_clone(ptr: *const ()) -> RawWaker {
86    let me = ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) });
87    mem::forget(me.clone());
88    RawWaker::new(ptr, &VTABLE)
89}
90
91fn raw_wake(ptr: *const ()) {
92    unsafe { Arc::from_raw(ptr as *const Thread) }.unpark()
93}
94
95fn raw_wake_by_ref(ptr: *const ()) {
96    ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) }).unpark()
97}
98
99fn raw_drop(ptr: *const ()) {
100    unsafe { Arc::from_raw(ptr as *const Thread) };
101}
102
103fn current_thread_waker() -> Waker {
104    let thread = Arc::new(thread::current());
105    unsafe { Waker::from_raw(RawWaker::new(Arc::into_raw(thread) as *const (), &VTABLE)) }
106}