1//! A version of the reaper that waits for a signal to check for process progress.
23use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
4use async_signal::{Signal, Signals};
5use event_listener::Event;
6use futures_lite::{future, prelude::*};
78use std::io;
9use std::mem;
10use std::sync::Mutex;
1112pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
1314/// The zombie process reaper.
15pub(crate) struct Reaper {
16/// An event delivered every time the SIGCHLD signal occurs.
17sigchld: Event,
1819/// The list of zombie processes.
20zombies: Mutex<Vec<std::process::Child>>,
2122/// The pipe that delivers signal notifications.
23pipe: Pipe,
2425/// Locking this mutex indicates that we are polling the SIGCHLD event.
26driver_guard: AsyncMutex<()>,
27}
2829impl Reaper {
30/// Create a new reaper.
31pub(crate) fn new() -> Self {
32 Reaper {
33 sigchld: Event::new(),
34 zombies: Mutex::new(Vec::new()),
35 pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
36 driver_guard: AsyncMutex::new(()),
37 }
38 }
3940/// Lock the driver thread.
41pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
42self.driver_guard.lock().await
43}
4445/// Reap zombie processes forever.
46pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
47loop {
48// Wait for the next SIGCHLD signal.
49self.pipe.wait().await;
5051// Notify all listeners waiting on the SIGCHLD event.
52self.sigchld.notify(usize::MAX);
5354// Reap zombie processes, but make sure we don't hold onto the lock for too long!
55let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
56let mut i = 0;
57'reap_zombies: loop {
58for _ in 0..50 {
59if i >= zombies.len() {
60break 'reap_zombies;
61 }
6263if let Ok(None) = zombies[i].try_wait() {
64 i += 1;
65 } else {
66 zombies.swap_remove(i);
67 }
68 }
6970// Be a good citizen; yield if there are a lot of processes.
71 //
72 // After we yield, check if there are more zombie processes.
73future::yield_now().await;
74 zombies.append(&mut self.zombies.lock().unwrap());
75 }
7677// Put zombie processes back.
78self.zombies.lock().unwrap().append(&mut zombies);
79 }
80 }
8182/// Register a process with this reaper.
83pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
84self.pipe.register(&child)?;
85Ok(ChildGuard { inner: Some(child) })
86 }
8788/// Wait for an event to occur for a child process.
89pub(crate) async fn status(
90&'static self,
91 child: &Mutex<crate::ChildGuard>,
92 ) -> io::Result<std::process::ExitStatus> {
93loop {
94// Wait on the child process.
95if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
96return Ok(status);
97 }
9899// Start listening.
100event_listener::listener!(self.sigchld => listener);
101102// Try again.
103if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
104return Ok(status);
105 }
106107// Wait on the listener.
108listener.await;
109 }
110 }
111112/// Do we have any registered zombie processes?
113pub(crate) fn has_zombies(&'static self) -> bool {
114 !self
115.zombies
116 .lock()
117 .unwrap_or_else(|x| x.into_inner())
118 .is_empty()
119 }
120}
121122/// The wrapper around the child.
123pub(crate) struct ChildGuard {
124 inner: Option<std::process::Child>,
125}
126127impl ChildGuard {
128/// Get a mutable reference to the inner child.
129pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
130self.inner.as_mut().unwrap()
131 }
132133/// Begin the reaping process for this child.
134pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
135if let Ok(None) = self.get_mut().try_wait() {
136 reaper
137 .zombies
138 .lock()
139 .unwrap()
140 .push(self.inner.take().unwrap());
141 }
142 }
143}
144145/// Waits for the next SIGCHLD signal.
146struct Pipe {
147/// The iterator over SIGCHLD signals.
148signals: Signals,
149}
150151impl Pipe {
152/// Creates a new pipe.
153fn new() -> io::Result<Pipe> {
154Ok(Pipe {
155 signals: Signals::new(Some(Signal::Child))?,
156 })
157 }
158159/// Waits for the next SIGCHLD signal.
160async fn wait(&self) {
161 (&self.signals).next().await;
162 }
163164/// Register a process object into this pipe.
165fn register(&self, _child: &std::process::Child) -> io::Result<()> {
166Ok(())
167 }
168}