futures_util/future/future/
remote_handle.rs

1use {
2    crate::future::{CatchUnwind, FutureExt},
3    futures_channel::oneshot::{self, Receiver, Sender},
4    futures_core::{
5        future::Future,
6        ready,
7        task::{Context, Poll},
8    },
9    pin_project_lite::pin_project,
10    std::{
11        any::Any,
12        boxed::Box,
13        fmt,
14        panic::{self, AssertUnwindSafe},
15        pin::Pin,
16        sync::{
17            atomic::{AtomicBool, Ordering},
18            Arc,
19        },
20        thread,
21    },
22};
23
24/// The handle to a remote future returned by
25/// [`remote_handle`](crate::future::FutureExt::remote_handle). When you drop this,
26/// the remote future will be woken up to be dropped by the executor.
27///
28/// ## Unwind safety
29///
30/// When the remote future panics, [Remote] will catch the unwind and transfer it to
31/// the thread where `RemoteHandle` is being awaited. This is good for the common
32/// case where [Remote] is spawned on a threadpool. It is unlikely that other code
33/// in the executor working thread shares mutable data with the spawned future and we
34/// preserve the executor from losing its working threads.
35///
36/// If you run the future locally and send the handle of to be awaited elsewhere, you
37/// must be careful with regard to unwind safety because the thread in which the future
38/// is polled will keep running after the panic and the thread running the [RemoteHandle]
39/// will unwind.
40#[must_use = "dropping a remote handle cancels the underlying future"]
41#[derive(Debug)]
42#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
43pub struct RemoteHandle<T> {
44    rx: Receiver<thread::Result<T>>,
45    keep_running: Arc<AtomicBool>,
46}
47
48impl<T> RemoteHandle<T> {
49    /// Drops this handle *without* canceling the underlying future.
50    ///
51    /// This method can be used if you want to drop the handle, but let the
52    /// execution continue.
53    pub fn forget(self) {
54        self.keep_running.store(true, Ordering::SeqCst);
55    }
56}
57
58impl<T: 'static> Future for RemoteHandle<T> {
59    type Output = T;
60
61    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
62        match ready!(self.rx.poll_unpin(cx)) {
63            Ok(Ok(output)) => Poll::Ready(output),
64            // the remote future panicked.
65            Ok(Err(e)) => panic::resume_unwind(e),
66            // The oneshot sender was dropped.
67            Err(e) => panic::resume_unwind(Box::new(e)),
68        }
69    }
70}
71
72type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>;
73
74pin_project! {
75    /// A future which sends its output to the corresponding `RemoteHandle`.
76    /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
77    #[must_use = "futures do nothing unless you `.await` or poll them"]
78    #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
79    pub struct Remote<Fut: Future> {
80        tx: Option<Sender<SendMsg<Fut>>>,
81        keep_running: Arc<AtomicBool>,
82        #[pin]
83        future: CatchUnwind<AssertUnwindSafe<Fut>>,
84    }
85}
86
87impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_tuple("Remote").field(&self.future).finish()
90    }
91}
92
93impl<Fut: Future> Future for Remote<Fut> {
94    type Output = ();
95
96    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
97        let this = self.project();
98
99        if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
100            && !this.keep_running.load(Ordering::SeqCst)
101        {
102            // Cancelled, bail out
103            return Poll::Ready(());
104        }
105
106        let output = ready!(this.future.poll(cx));
107
108        // if the receiving end has gone away then that's ok, we just ignore the
109        // send error here.
110        drop(this.tx.take().unwrap().send(output));
111        Poll::Ready(())
112    }
113}
114
115pub(super) fn remote_handle<Fut: Future>(future: Fut) -> (Remote<Fut>, RemoteHandle<Fut::Output>) {
116    let (tx, rx) = oneshot::channel();
117    let keep_running = Arc::new(AtomicBool::new(false));
118
119    // Unwind Safety: See the docs for RemoteHandle.
120    let wrapped = Remote {
121        future: AssertUnwindSafe(future).catch_unwind(),
122        tx: Some(tx),
123        keep_running: keep_running.clone(),
124    };
125
126    (wrapped, RemoteHandle { rx, keep_running })
127}