async_process/reaper/
wait.rs
1use async_channel::{Receiver, Sender};
9use async_task::Runnable;
10use futures_lite::future;
11
12use std::io;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Mutex;
15use std::task::{Context, Poll};
16
17pub(crate) struct Reaper {
19 sender: Sender<Runnable>,
21
22 recv: Receiver<Runnable>,
24
25 zombies: AtomicUsize,
27}
28
29impl Reaper {
30 pub(crate) fn new() -> Self {
32 let (sender, recv) = async_channel::unbounded();
33 Self {
34 sender,
35 recv,
36 zombies: AtomicUsize::new(0),
37 }
38 }
39
40 pub(crate) async fn reap(&'static self) -> ! {
42 loop {
43 let task = match self.recv.recv().await {
45 Ok(task) => task,
46 Err(_) => panic!("sender should never be closed"),
47 };
48
49 task.run();
51 }
52 }
53
54 pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
56 Ok(ChildGuard {
57 inner: Some(WaitableChild::new(child)?),
58 })
59 }
60
61 pub(crate) async fn status(
63 &'static self,
64 child: &Mutex<crate::ChildGuard>,
65 ) -> io::Result<std::process::ExitStatus> {
66 future::poll_fn(|cx| {
67 let mut child = child.lock().unwrap();
69
70 let inner = match &mut child.inner {
72 super::ChildGuard::Wait(inner) => inner,
73 #[cfg(not(windows))]
74 _ => unreachable!(),
75 };
76
77 inner.inner.as_mut().unwrap().poll_wait(cx)
79 })
80 .await
81 }
82
83 pub(crate) fn has_zombies(&'static self) -> bool {
85 self.zombies.load(Ordering::SeqCst) > 0
86 }
87}
88
89pub(crate) struct ChildGuard {
91 inner: Option<WaitableChild>,
92}
93
94impl ChildGuard {
95 pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
97 self.inner.as_mut().unwrap().get_mut()
98 }
99
100 pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
102 let future = {
104 let mut inner = self.inner.take().unwrap();
105 async move {
106 reaper.zombies.fetch_add(1, Ordering::Relaxed);
108
109 let _guard = crate::CallOnDrop(|| {
111 reaper.zombies.fetch_sub(1, Ordering::SeqCst);
112 });
113
114 let result = future::poll_fn(|cx| inner.poll_wait(cx)).await;
116 if let Err(e) = result {
117 tracing::error!("error while polling zombie process: {}", e);
118 }
119 }
120 };
121
122 let schedule = move |runnable| {
124 reaper.sender.try_send(runnable).ok();
125 };
126
127 let (runnable, task) = async_task::spawn(future, schedule);
129 task.detach();
130 runnable.schedule();
131 }
132}
133
134cfg_if::cfg_if! {
135 if #[cfg(target_os = "linux")] {
136 use async_io::Async;
137 use rustix::process;
138 use std::os::unix::io::OwnedFd;
139
140 struct WaitableChild {
142 child: std::process::Child,
143 handle: Async<OwnedFd>,
144 }
145
146 impl WaitableChild {
147 fn new(child: std::process::Child) -> io::Result<Self> {
148 let pidfd = process::pidfd_open(
149 process::Pid::from_child(&child),
150 process::PidfdFlags::empty()
151 )?;
152
153 Ok(Self {
154 child,
155 handle: Async::new(pidfd)?
156 })
157 }
158
159 fn get_mut(&mut self) -> &mut std::process::Child {
160 &mut self.child
161 }
162
163 fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
164 loop {
165 if let Some(status) = self.child.try_wait()? {
166 return Poll::Ready(Ok(status));
167 }
168
169 futures_lite::ready!(self.handle.poll_readable(cx))?;
171 }
172 }
173 }
174
175 pub(crate) fn available() -> bool {
177 let result = process::pidfd_open(
179 process::getpid(),
180 process::PidfdFlags::empty()
181 );
182
183 result.is_ok()
185 }
186 } else if #[cfg(windows)] {
187 use async_io::os::windows::Waitable;
188
189 struct WaitableChild {
191 inner: Waitable<std::process::Child>,
192 }
193
194 impl WaitableChild {
195 fn new(child: std::process::Child) -> io::Result<Self> {
196 Ok(Self {
197 inner: Waitable::new(child)?
198 })
199 }
200
201 fn get_mut(&mut self) -> &mut std::process::Child {
202 unsafe {
204 self.inner.get_mut()
205 }
206 }
207
208 fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<std::process::ExitStatus>> {
209 loop {
210 if let Some(status) = self.get_mut().try_wait()? {
211 return Poll::Ready(Ok(status));
212 }
213
214 futures_lite::ready!(self.inner.poll_ready(cx))?;
216 }
217 }
218 }
219
220 pub(crate) fn available() -> bool {
222 true
223 }
224 }
225}