async_process/reaper/
signal.rs1use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
4use async_signal::{Signal, Signals};
5use event_listener::Event;
6use futures_lite::{future, prelude::*};
7
8use std::io;
9use std::mem;
10use std::sync::Mutex;
11
12pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
13
14pub(crate) struct Reaper {
16 sigchld: Event,
18
19 zombies: Mutex<Vec<std::process::Child>>,
21
22 pipe: Pipe,
24
25 driver_guard: AsyncMutex<()>,
27}
28
29impl Reaper {
30 pub(crate) fn new() -> Self {
32 Reaper {
33 sigchld: Event::new(),
34 zombies: Mutex::new(Vec::new()),
35 pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
36 driver_guard: AsyncMutex::new(()),
37 }
38 }
39
40 pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
42 self.driver_guard.lock().await
43 }
44
45 pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
47 loop {
48 self.pipe.wait().await;
50
51 self.sigchld.notify(usize::MAX);
53
54 let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
56 let mut i = 0;
57 'reap_zombies: loop {
58 for _ in 0..50 {
59 if i >= zombies.len() {
60 break 'reap_zombies;
61 }
62
63 if let Ok(None) = zombies[i].try_wait() {
64 i += 1;
65 } else {
66 zombies.swap_remove(i);
67 }
68 }
69
70 future::yield_now().await;
74 zombies.append(&mut self.zombies.lock().unwrap());
75 }
76
77 self.zombies.lock().unwrap().append(&mut zombies);
79 }
80 }
81
82 pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
84 self.pipe.register(&child)?;
85 Ok(ChildGuard { inner: Some(child) })
86 }
87
88 pub(crate) async fn status(
90 &'static self,
91 child: &Mutex<crate::ChildGuard>,
92 ) -> io::Result<std::process::ExitStatus> {
93 loop {
94 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
96 return Ok(status);
97 }
98
99 event_listener::listener!(self.sigchld => listener);
101
102 if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
104 return Ok(status);
105 }
106
107 listener.await;
109 }
110 }
111
112 pub(crate) fn has_zombies(&'static self) -> bool {
114 !self
115 .zombies
116 .lock()
117 .unwrap_or_else(|x| x.into_inner())
118 .is_empty()
119 }
120}
121
122pub(crate) struct ChildGuard {
124 inner: Option<std::process::Child>,
125}
126
127impl ChildGuard {
128 pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
130 self.inner.as_mut().unwrap()
131 }
132
133 pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
135 if let Ok(None) = self.get_mut().try_wait() {
136 reaper
137 .zombies
138 .lock()
139 .unwrap()
140 .push(self.inner.take().unwrap());
141 }
142 }
143}
144
145struct Pipe {
147 signals: Signals,
149}
150
151impl Pipe {
152 fn new() -> io::Result<Pipe> {
154 Ok(Pipe {
155 signals: Signals::new(Some(Signal::Child))?,
156 })
157 }
158
159 async fn wait(&self) {
161 (&self.signals).next().await;
162 }
163
164 fn register(&self, _child: &std::process::Child) -> io::Result<()> {
166 Ok(())
167 }
168}