futures_timer/native/
global.rs
1use std::future::Future;
2use std::io;
3use std::mem::{self, ManuallyDrop};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7use std::task::{Context, RawWaker, RawWakerVTable, Waker};
8use std::thread;
9use std::thread::Thread;
10use std::time::Instant;
11
12use super::{Timer, TimerHandle};
13
14pub struct HelperThread {
15 thread: Option<thread::JoinHandle<()>>,
16 timer: TimerHandle,
17 done: Arc<AtomicBool>,
18}
19
20impl HelperThread {
21 pub fn new() -> io::Result<HelperThread> {
22 let timer = Timer::new();
23 let timer_handle = timer.handle();
24 let done = Arc::new(AtomicBool::new(false));
25 let done2 = done.clone();
26 let thread = thread::Builder::new()
27 .name("futures-timer".to_owned())
28 .spawn(move || run(timer, done2))?;
29
30 Ok(HelperThread {
31 thread: Some(thread),
32 done,
33 timer: timer_handle,
34 })
35 }
36
37 pub fn handle(&self) -> TimerHandle {
38 self.timer.clone()
39 }
40
41 pub fn forget(mut self) {
42 self.thread.take();
43 }
44}
45
46impl Drop for HelperThread {
47 fn drop(&mut self) {
48 let thread = match self.thread.take() {
49 Some(thread) => thread,
50 None => return,
51 };
52 self.done.store(true, Ordering::SeqCst);
53 thread.thread().unpark();
54 drop(thread.join());
55 }
56}
57
58fn run(mut timer: Timer, done: Arc<AtomicBool>) {
59 let waker = current_thread_waker();
60 let mut cx = Context::from_waker(&waker);
61
62 while !done.load(Ordering::SeqCst) {
63 let _ = Pin::new(&mut timer).poll(&mut cx);
64
65 timer.advance();
66 match timer.next_event() {
67 Some(when) => {
69 let now = Instant::now();
70 if now < when {
71 thread::park_timeout(when - now)
72 } else {
73 }
75 }
76
77 None => thread::park(),
79 }
80 }
81}
82
83static VTABLE: RawWakerVTable = RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
84
85fn raw_clone(ptr: *const ()) -> RawWaker {
86 let me = ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) });
87 mem::forget(me.clone());
88 RawWaker::new(ptr, &VTABLE)
89}
90
91fn raw_wake(ptr: *const ()) {
92 unsafe { Arc::from_raw(ptr as *const Thread) }.unpark()
93}
94
95fn raw_wake_by_ref(ptr: *const ()) {
96 ManuallyDrop::new(unsafe { Arc::from_raw(ptr as *const Thread) }).unpark()
97}
98
99fn raw_drop(ptr: *const ()) {
100 unsafe { Arc::from_raw(ptr as *const Thread) };
101}
102
103fn current_thread_waker() -> Waker {
104 let thread = Arc::new(thread::current());
105 unsafe { Waker::from_raw(RawWaker::new(Arc::into_raw(thread) as *const (), &VTABLE)) }
106}