async_process/reaper/
signal.rs

1//! A version of the reaper that waits for a signal to check for process progress.
2
3use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
4use async_signal::{Signal, Signals};
5use event_listener::Event;
6use futures_lite::{future, prelude::*};
7
8use std::io;
9use std::mem;
10use std::sync::Mutex;
11
12pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
13
14/// The zombie process reaper.
15pub(crate) struct Reaper {
16    /// An event delivered every time the SIGCHLD signal occurs.
17    sigchld: Event,
18
19    /// The list of zombie processes.
20    zombies: Mutex<Vec<std::process::Child>>,
21
22    /// The pipe that delivers signal notifications.
23    pipe: Pipe,
24
25    /// Locking this mutex indicates that we are polling the SIGCHLD event.
26    driver_guard: AsyncMutex<()>,
27}
28
29impl Reaper {
30    /// Create a new reaper.
31    pub(crate) fn new() -> Self {
32        Reaper {
33            sigchld: Event::new(),
34            zombies: Mutex::new(Vec::new()),
35            pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
36            driver_guard: AsyncMutex::new(()),
37        }
38    }
39
40    /// Lock the driver thread.
41    pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
42        self.driver_guard.lock().await
43    }
44
45    /// Reap zombie processes forever.
46    pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
47        loop {
48            // Wait for the next SIGCHLD signal.
49            self.pipe.wait().await;
50
51            // Notify all listeners waiting on the SIGCHLD event.
52            self.sigchld.notify(usize::MAX);
53
54            // Reap zombie processes, but make sure we don't hold onto the lock for too long!
55            let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
56            let mut i = 0;
57            'reap_zombies: loop {
58                for _ in 0..50 {
59                    if i >= zombies.len() {
60                        break 'reap_zombies;
61                    }
62
63                    if let Ok(None) = zombies[i].try_wait() {
64                        i += 1;
65                    } else {
66                        zombies.swap_remove(i);
67                    }
68                }
69
70                // Be a good citizen; yield if there are a lot of processes.
71                //
72                // After we yield, check if there are more zombie processes.
73                future::yield_now().await;
74                zombies.append(&mut self.zombies.lock().unwrap());
75            }
76
77            // Put zombie processes back.
78            self.zombies.lock().unwrap().append(&mut zombies);
79        }
80    }
81
82    /// Register a process with this reaper.
83    pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
84        self.pipe.register(&child)?;
85        Ok(ChildGuard { inner: Some(child) })
86    }
87
88    /// Wait for an event to occur for a child process.
89    pub(crate) async fn status(
90        &'static self,
91        child: &Mutex<crate::ChildGuard>,
92    ) -> io::Result<std::process::ExitStatus> {
93        loop {
94            // Wait on the child process.
95            if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
96                return Ok(status);
97            }
98
99            // Start listening.
100            event_listener::listener!(self.sigchld => listener);
101
102            // Try again.
103            if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
104                return Ok(status);
105            }
106
107            // Wait on the listener.
108            listener.await;
109        }
110    }
111
112    /// Do we have any registered zombie processes?
113    pub(crate) fn has_zombies(&'static self) -> bool {
114        !self
115            .zombies
116            .lock()
117            .unwrap_or_else(|x| x.into_inner())
118            .is_empty()
119    }
120}
121
122/// The wrapper around the child.
123pub(crate) struct ChildGuard {
124    inner: Option<std::process::Child>,
125}
126
127impl ChildGuard {
128    /// Get a mutable reference to the inner child.
129    pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
130        self.inner.as_mut().unwrap()
131    }
132
133    /// Begin the reaping process for this child.
134    pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
135        if let Ok(None) = self.get_mut().try_wait() {
136            reaper
137                .zombies
138                .lock()
139                .unwrap()
140                .push(self.inner.take().unwrap());
141        }
142    }
143}
144
145/// Waits for the next SIGCHLD signal.
146struct Pipe {
147    /// The iterator over SIGCHLD signals.
148    signals: Signals,
149}
150
151impl Pipe {
152    /// Creates a new pipe.
153    fn new() -> io::Result<Pipe> {
154        Ok(Pipe {
155            signals: Signals::new(Some(Signal::Child))?,
156        })
157    }
158
159    /// Waits for the next SIGCHLD signal.
160    async fn wait(&self) {
161        (&self.signals).next().await;
162    }
163
164    /// Register a process object into this pipe.
165    fn register(&self, _child: &std::process::Child) -> io::Result<()> {
166        Ok(())
167    }
168}