futures_util/future/future/
remote_handle.rs1use {
2 crate::future::{CatchUnwind, FutureExt},
3 futures_channel::oneshot::{self, Receiver, Sender},
4 futures_core::{
5 future::Future,
6 ready,
7 task::{Context, Poll},
8 },
9 pin_project_lite::pin_project,
10 std::{
11 any::Any,
12 boxed::Box,
13 fmt,
14 panic::{self, AssertUnwindSafe},
15 pin::Pin,
16 sync::{
17 atomic::{AtomicBool, Ordering},
18 Arc,
19 },
20 thread,
21 },
22};
23
24#[must_use = "dropping a remote handle cancels the underlying future"]
41#[derive(Debug)]
42#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
43pub struct RemoteHandle<T> {
44 rx: Receiver<thread::Result<T>>,
45 keep_running: Arc<AtomicBool>,
46}
47
48impl<T> RemoteHandle<T> {
49 pub fn forget(self) {
54 self.keep_running.store(true, Ordering::SeqCst);
55 }
56}
57
58impl<T: 'static> Future for RemoteHandle<T> {
59 type Output = T;
60
61 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
62 match ready!(self.rx.poll_unpin(cx)) {
63 Ok(Ok(output)) => Poll::Ready(output),
64 Ok(Err(e)) => panic::resume_unwind(e),
66 Err(e) => panic::resume_unwind(Box::new(e)),
68 }
69 }
70}
71
72type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>;
73
74pin_project! {
75 #[must_use = "futures do nothing unless you `.await` or poll them"]
78 #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
79 pub struct Remote<Fut: Future> {
80 tx: Option<Sender<SendMsg<Fut>>>,
81 keep_running: Arc<AtomicBool>,
82 #[pin]
83 future: CatchUnwind<AssertUnwindSafe<Fut>>,
84 }
85}
86
87impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 f.debug_tuple("Remote").field(&self.future).finish()
90 }
91}
92
93impl<Fut: Future> Future for Remote<Fut> {
94 type Output = ();
95
96 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
97 let this = self.project();
98
99 if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
100 && !this.keep_running.load(Ordering::SeqCst)
101 {
102 return Poll::Ready(());
104 }
105
106 let output = ready!(this.future.poll(cx));
107
108 drop(this.tx.take().unwrap().send(output));
111 Poll::Ready(())
112 }
113}
114
115pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) {
116 let (tx, rx) = oneshot::channel();
117 let keep_running = Arc::new(AtomicBool::new(false));
118
119 let wrapped = Remote {
121 future: AssertUnwindSafe(future).catch_unwind(),
122 tx: Some(tx),
123 keep_running: keep_running.clone(),
124 };
125
126 (wrapped, RemoteHandle { rx, keep_running })
127}