async_process/reaper/
wait.rs

1//! A version of the reaper that waits on some polling primitive.
2//!
3//! This uses:
4//!
5//! - pidfd on Linux
6//! - Waitable objects on Windows
7
8use async_channel::{Receiver, Sender};
9use async_task::Runnable;
10use futures_lite::future;
11
12use std::io;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Mutex;
15use std::task::{Context, Poll};
16
17/// The zombie process reaper.
18pub(crate) struct Reaper {
19    /// The channel for sending new runnables.
20    sender: Sender<Runnable>,
21
22    /// The channel for receiving new runnables.
23    recv: Receiver<Runnable>,
24
25    /// Number of zombie processes.
26    zombies: AtomicUsize,
27}
28
29impl Reaper {
30    /// Create a new reaper.
31    pub(crate) fn new() -> Self {
32        let (sender, recv) = async_channel::unbounded();
33        Self {
34            sender,
35            recv,
36            zombies: AtomicUsize::new(0),
37        }
38    }
39
40    /// Reap zombie processes forever.
41    pub(crate) async fn reap(&'static self) -> ! {
42        loop {
43            // Fetch the next task.
44            let task = match self.recv.recv().await {
45                Ok(task) => task,
46                Err(_) => panic!("sender should never be closed"),
47            };
48
49            // Poll the task.
50            task.run();
51        }
52    }
53
54    /// Register a child into this reaper.
55    pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
56        Ok(ChildGuard {
57            inner: Some(WaitableChild::new(child)?),
58        })
59    }
60
61    /// Wait for a child to complete.
62    pub(crate) async fn status(
63        &'static self,
64        child: &Mutex<crate::ChildGuard>,
65    ) -> io::Result<std::process::ExitStatus> {
66        future::poll_fn(|cx| {
67            // Lock the child.
68            let mut child = child.lock().unwrap();
69
70            // Get the inner child value.
71            let inner = match &mut child.inner {
72                super::ChildGuard::Wait(inner) => inner,
73                #[cfg(not(windows))]
74                _ => unreachable!(),
75            };
76
77            // Poll for the next value.
78            inner.inner.as_mut().unwrap().poll_wait(cx)
79        })
80        .await
81    }
82
83    /// Do we have any registered zombie processes?
84    pub(crate) fn has_zombies(&'static self) -> bool {
85        self.zombies.load(Ordering::SeqCst) > 0
86    }
87}
88
89/// The wrapper around the child.
90pub(crate) struct ChildGuard {
91    inner: Option<WaitableChild>,
92}
93
94impl ChildGuard {
95    /// Get a mutable reference to the inner child.
96    pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
97        self.inner.as_mut().unwrap().get_mut()
98    }
99
100    /// Begin the reaping process for this child.
101    pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
102        // Create a future for polling this child.
103        let future = {
104            let mut inner = self.inner.take().unwrap();
105            async move {
106                // Increment the zombie count.
107                reaper.zombies.fetch_add(1, Ordering::Relaxed);
108
109                // Decrement the zombie count once we are done.
110                let _guard = crate::CallOnDrop(|| {
111                    reaper.zombies.fetch_sub(1, Ordering::SeqCst);
112                });
113
114                // Wait on this child forever.
115                let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
116                if let Err(e) = result {
117                    tracing::error!("error while polling zombie process: {}", e);
118                }
119            }
120        };
121
122        // Create a function for scheduling this future.
123        let schedule = move |runnable| {
124            reaper.sender.try_send(runnable).ok();
125        };
126
127        // Spawn the task and run it forever.
128        let (runnable, task) = async_task::spawn(future, schedule);
129        task.detach();
130        runnable.schedule();
131    }
132}
133
134cfg_if::cfg_if! {
135    if #[cfg(target_os = "linux")] {
136        use async_io::Async;
137        use rustix::process;
138        use std::os::unix::io::OwnedFd;
139
140        /// Waitable version of `std::process::Child`
141        struct WaitableChild {
142            child: std::process::Child,
143            handle: Async<OwnedFd>,
144        }
145
146        impl WaitableChild {
147            fn new(child: std::process::Child) -> io::Result<Self> {
148                let pidfd = process::pidfd_open(
149                    process::Pid::from_child(&child),
150                    process::PidfdFlags::empty()
151                )?;
152
153                Ok(Self {
154                    child,
155                    handle: Async::new(pidfd)?
156                })
157            }
158
159            fn get_mut(&mut self) -> &mut std::process::Child {
160                &mut self.child
161            }
162
163            fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
164                loop {
165                    if let Some(status) = self.child.try_wait()? {
166                        return Poll::Ready(Ok(status));
167                    }
168
169                    // Wait for us to become readable.
170                    futures_lite::ready!(self.handle.poll_readable(cx))?;
171                }
172            }
173        }
174
175        /// Tell if we are able to use this backend.
176        pub(crate) fn available() -> bool {
177            // Create a Pidfd for the current process and see if it works.
178            let result = process::pidfd_open(
179                process::getpid(),
180                process::PidfdFlags::empty()
181            );
182
183            // Tell if it was okay or not.
184            result.is_ok()
185        }
186    } else if #[cfg(windows)] {
187        use async_io::os::windows::Waitable;
188
189        /// Waitable version of `std::process::Child`.
190        struct WaitableChild {
191            inner: Waitable<std::process::Child>,
192        }
193
194        impl WaitableChild {
195            fn new(child: std::process::Child) -> io::Result<Self> {
196                Ok(Self {
197                    inner: Waitable::new(child)?
198                })
199            }
200
201            fn get_mut(&mut self) -> &mut std::process::Child {
202                // SAFETY: We never move the child out.
203                unsafe {
204                    self.inner.get_mut()
205                }
206            }
207
208            fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
209                loop {
210                    if let Some(status) = self.get_mut().try_wait()? {
211                        return Poll::Ready(Ok(status));
212                    }
213
214                    // Wait for us to become readable.
215                    futures_lite::ready!(self.inner.poll_ready(cx))?;
216                }
217            }
218        }
219
220        /// Tell if we are able to use this backend.
221        pub(crate) fn available() -> bool {
222            true
223        }
224    }
225}