reqwest/blocking/
wait.rs
1use std::future::Future;
2use std::sync::Arc;
3use std::task::{Context, Poll, Wake, Waker};
4use std::thread::{self, Thread};
5use std::time::Duration;
6
7use tokio::time::Instant;
8
9pub(crate) fn timeout<F, I, E>(fut: F, timeout: Option<Duration>) -> Result<I, Waited<E>>
10where
11 F: Future<Output = Result<I, E>>,
12{
13 enter();
14
15 let deadline = timeout.map(|d| {
16 log::trace!("wait at most {d:?}");
17 Instant::now() + d
18 });
19
20 let thread = ThreadWaker(thread::current());
21 let waker = Waker::from(Arc::new(thread));
24 let mut cx = Context::from_waker(&waker);
25
26 futures_util::pin_mut!(fut);
27
28 loop {
29 match fut.as_mut().poll(&mut cx) {
30 Poll::Ready(Ok(val)) => return Ok(val),
31 Poll::Ready(Err(err)) => return Err(Waited::Inner(err)),
32 Poll::Pending => (), }
34
35 if let Some(deadline) = deadline {
36 let now = Instant::now();
37 if now >= deadline {
38 log::trace!("wait timeout exceeded");
39 return Err(Waited::TimedOut(crate::error::TimedOut));
40 }
41
42 log::trace!(
43 "({:?}) park timeout {:?}",
44 thread::current().id(),
45 deadline - now
46 );
47 thread::park_timeout(deadline - now);
48 } else {
49 log::trace!("({:?}) park without timeout", thread::current().id());
50 thread::park();
51 }
52 }
53}
54
55#[derive(Debug)]
56pub(crate) enum Waited<E> {
57 TimedOut(crate::error::TimedOut),
58 Inner(E),
59}
60
61struct ThreadWaker(Thread);
62
63impl Wake for ThreadWaker {
64 fn wake(self: Arc<Self>) {
65 self.wake_by_ref();
66 }
67
68 fn wake_by_ref(self: &Arc<Self>) {
69 self.0.unpark();
70 }
71}
72
73fn enter() {
74 #[cfg(debug_assertions)]
76 {
77 let _enter = tokio::runtime::Builder::new_current_thread()
78 .build()
79 .expect("build shell runtime")
80 .enter();
81 }
82}