async_process/
lib.rs

1//! Async interface for working with processes.
2//!
3//! This crate is an async version of [`std::process`].
4//!
5//! # Implementation
6//!
7//! A background thread named "async-process" is lazily created on first use, which waits for
8//! spawned child processes to exit and then calls the `wait()` syscall to clean up the "zombie"
9//! processes. This is unlike the `process` API in the standard library, where dropping a running
10//! `Child` leaks its resources.
11//!
12//! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
13//! on Windows.
14//!
15//! [`async-io`]: https://docs.rs/async-io
16//! [`blocking`]: https://docs.rs/blocking
17//!
18//! # Examples
19//!
20//! Spawn a process and collect its output:
21//!
22//! ```no_run
23//! # futures_lite::future::block_on(async {
24//! use async_process::Command;
25//!
26//! let out = Command::new("echo").arg("hello").arg("world").output().await?;
27//! assert_eq!(out.stdout, b"hello world\n");
28//! # std::io::Result::Ok(()) });
29//! ```
30//!
31//! Read the output line-by-line as it gets produced:
32//!
33//! ```no_run
34//! # futures_lite::future::block_on(async {
35//! use async_process::{Command, Stdio};
36//! use futures_lite::{io::BufReader, prelude::*};
37//!
38//! let mut child = Command::new("find")
39//!     .arg(".")
40//!     .stdout(Stdio::piped())
41//!     .spawn()?;
42//!
43//! let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
44//!
45//! while let Some(line) = lines.next().await {
46//!     println!("{}", line?);
47//! }
48//! # std::io::Result::Ok(()) });
49//! ```
50
51#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
52#![doc(
53    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55#![doc(
56    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
57)]
58
59use std::convert::Infallible;
60use std::ffi::OsStr;
61use std::fmt;
62use std::path::Path;
63use std::pin::Pin;
64use std::sync::atomic::{AtomicUsize, Ordering};
65use std::sync::{Arc, Mutex};
66use std::task::{Context, Poll};
67use std::thread;
68
69#[cfg(unix)]
70use async_io::Async;
71#[cfg(unix)]
72use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
73
74#[cfg(windows)]
75use blocking::Unblock;
76
77use async_lock::OnceCell;
78use futures_lite::{future, io, prelude::*};
79
80#[doc(no_inline)]
81pub use std::process::{ExitStatus, Output, Stdio};
82
83#[cfg(unix)]
84pub mod unix;
85#[cfg(windows)]
86pub mod windows;
87
88mod reaper;
89
90mod sealed {
91    pub trait Sealed {}
92}
93
94#[cfg(test)]
95static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
96    std::sync::atomic::AtomicBool::new(false);
97
98/// The zombie process reaper.
99///
100/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
101struct Reaper {
102    /// Underlying system reaper.
103    sys: reaper::Reaper,
104
105    /// The number of tasks polling the SIGCHLD event.
106    ///
107    /// If this is zero, the `async-process` thread must be spawned.
108    drivers: AtomicUsize,
109
110    /// Number of live `Child` instances currently running.
111    ///
112    /// This is used to prevent the reaper thread from being spawned right as the program closes,
113    /// when the reaper thread isn't needed. This represents the number of active processes.
114    child_count: AtomicUsize,
115}
116
117impl Reaper {
118    /// Get the singleton instance of the reaper.
119    fn get() -> &'static Self {
120        static REAPER: OnceCell<Reaper> = OnceCell::new();
121
122        REAPER.get_or_init_blocking(|| Reaper {
123            sys: reaper::Reaper::new(),
124            drivers: AtomicUsize::new(0),
125            child_count: AtomicUsize::new(0),
126        })
127    }
128
129    /// Ensure that the reaper is driven.
130    ///
131    /// If there are no active `driver()` callers, this will spawn the `async-process` thread.
132    #[inline]
133    fn ensure_driven(&'static self) {
134        if self
135            .drivers
136            .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
137            .is_ok()
138        {
139            self.start_driver_thread();
140        }
141    }
142
143    /// Start the `async-process` thread.
144    #[cold]
145    fn start_driver_thread(&'static self) {
146        #[cfg(test)]
147        DRIVER_THREAD_SPAWNED
148            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
149            .unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
150
151        thread::Builder::new()
152            .name("async-process".to_string())
153            .spawn(move || {
154                let driver = async move {
155                    // No need to bump self.drivers, it was already bumped in ensure_driven.
156                    let guard = self.sys.lock().await;
157                    self.sys.reap(guard).await
158                };
159
160                #[cfg(unix)]
161                async_io::block_on(driver);
162
163                #[cfg(not(unix))]
164                future::block_on(driver);
165            })
166            .expect("cannot spawn async-process thread");
167    }
168
169    /// Register a process with this reaper.
170    fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
171        self.ensure_driven();
172        self.sys.register(child)
173    }
174}
175
176cfg_if::cfg_if! {
177    if #[cfg(windows)] {
178        // Wraps a sync I/O type into an async I/O type.
179        fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
180            Ok(Unblock::new(io))
181        }
182    } else if #[cfg(unix)] {
183        /// Wrap a file descriptor into a non-blocking I/O type.
184        fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
185            Async::new(io)
186        }
187    }
188}
189
190/// A guard that can kill child processes, or push them into the zombie list.
191struct ChildGuard {
192    inner: reaper::ChildGuard,
193    reap_on_drop: bool,
194    kill_on_drop: bool,
195    reaper: &'static Reaper,
196}
197
198impl ChildGuard {
199    fn get_mut(&mut self) -> &mut std::process::Child {
200        self.inner.get_mut()
201    }
202}
203
204// When the last reference to the child process is dropped, push it into the zombie list.
205impl Drop for ChildGuard {
206    fn drop(&mut self) {
207        if self.kill_on_drop {
208            self.get_mut().kill().ok();
209        }
210        if self.reap_on_drop {
211            self.inner.reap(&self.reaper.sys);
212        }
213
214        // Decrement number of children.
215        self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
216    }
217}
218
219/// A spawned child process.
220///
221/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
222/// [`output()`][`Child::output()`] to wait for it to exit.
223///
224/// If the [`Child`] is dropped, the process keeps running in the background.
225///
226/// # Examples
227///
228/// Spawn a process and wait for it to complete:
229///
230/// ```no_run
231/// # futures_lite::future::block_on(async {
232/// use async_process::Command;
233///
234/// Command::new("cp").arg("a.txt").arg("b.txt").status().await?;
235/// # std::io::Result::Ok(()) });
236/// ```
237pub struct Child {
238    /// The handle for writing to the child's standard input (stdin), if it has been captured.
239    pub stdin: Option<ChildStdin>,
240
241    /// The handle for reading from the child's standard output (stdout), if it has been captured.
242    pub stdout: Option<ChildStdout>,
243
244    /// The handle for reading from the child's standard error (stderr), if it has been captured.
245    pub stderr: Option<ChildStderr>,
246
247    /// The inner child process handle.
248    child: Arc<Mutex<ChildGuard>>,
249}
250
251impl Child {
252    /// Wraps the inner child process handle and registers it in the global process list.
253    ///
254    /// The "async-process" thread waits for processes in the global list and cleans up the
255    /// resources when they exit.
256    fn new(cmd: &mut Command) -> io::Result<Child> {
257        // Make sure the reaper exists before we spawn the child process.
258        let reaper = Reaper::get();
259        let mut child = cmd.inner.spawn()?;
260
261        // Convert sync I/O types into async I/O types.
262        let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
263        let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
264        let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
265
266        // Bump the child count.
267        reaper.child_count.fetch_add(1, Ordering::Relaxed);
268
269        // Register the child process in the global list.
270        let inner = reaper.register(child)?;
271
272        Ok(Child {
273            stdin,
274            stdout,
275            stderr,
276            child: Arc::new(Mutex::new(ChildGuard {
277                inner,
278                reap_on_drop: cmd.reap_on_drop,
279                kill_on_drop: cmd.kill_on_drop,
280                reaper,
281            })),
282        })
283    }
284
285    /// Returns the OS-assigned process identifier associated with this child.
286    ///
287    /// # Examples
288    ///
289    /// ```no_run
290    /// # futures_lite::future::block_on(async {
291    /// use async_process::Command;
292    ///
293    /// let mut child = Command::new("ls").spawn()?;
294    /// println!("id: {}", child.id());
295    /// # std::io::Result::Ok(()) });
296    /// ```
297    pub fn id(&self) -> u32 {
298        self.child.lock().unwrap().get_mut().id()
299    }
300
301    /// Forces the child process to exit.
302    ///
303    /// If the child has already exited, an [`InvalidInput`] error is returned.
304    ///
305    /// This is equivalent to sending a SIGKILL on Unix platforms.
306    ///
307    /// [`InvalidInput`]: `std::io::ErrorKind::InvalidInput`
308    ///
309    /// # Examples
310    ///
311    /// ```no_run
312    /// # futures_lite::future::block_on(async {
313    /// use async_process::Command;
314    ///
315    /// let mut child = Command::new("yes").spawn()?;
316    /// child.kill()?;
317    /// println!("exit status: {}", child.status().await?);
318    /// # std::io::Result::Ok(()) });
319    /// ```
320    pub fn kill(&mut self) -> io::Result<()> {
321        self.child.lock().unwrap().get_mut().kill()
322    }
323
324    /// Returns the exit status if the process has exited.
325    ///
326    /// Unlike [`status()`][`Child::status()`], this method will not drop the stdin handle.
327    ///
328    /// # Examples
329    ///
330    /// ```no_run
331    /// # futures_lite::future::block_on(async {
332    /// use async_process::Command;
333    ///
334    /// let mut child = Command::new("ls").spawn()?;
335    ///
336    /// match child.try_status()? {
337    ///     None => println!("still running"),
338    ///     Some(status) => println!("exited with: {}", status),
339    /// }
340    /// # std::io::Result::Ok(()) });
341    /// ```
342    pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
343        self.child.lock().unwrap().get_mut().try_wait()
344    }
345
346    /// Drops the stdin handle and waits for the process to exit.
347    ///
348    /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
349    /// not block waiting for input from the parent process while the parent waits for the child to
350    /// exit.
351    ///
352    /// # Examples
353    ///
354    /// ```no_run
355    /// # futures_lite::future::block_on(async {
356    /// use async_process::{Command, Stdio};
357    ///
358    /// let mut child = Command::new("cp")
359    ///     .arg("a.txt")
360    ///     .arg("b.txt")
361    ///     .spawn()?;
362    ///
363    /// println!("exit status: {}", child.status().await?);
364    /// # std::io::Result::Ok(()) });
365    /// ```
366    pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
367        self.stdin.take();
368        let child = self.child.clone();
369
370        async move { Reaper::get().sys.status(&child).await }
371    }
372
373    /// Drops the stdin handle and collects the output of the process.
374    ///
375    /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
376    /// not block waiting for input from the parent process while the parent waits for the child to
377    /// exit.
378    ///
379    /// In order to capture the output of the process, [`Command::stdout()`] and
380    /// [`Command::stderr()`] must be configured with [`Stdio::piped()`].
381    ///
382    /// # Examples
383    ///
384    /// ```no_run
385    /// # futures_lite::future::block_on(async {
386    /// use async_process::{Command, Stdio};
387    ///
388    /// let child = Command::new("ls")
389    ///     .stdout(Stdio::piped())
390    ///     .stderr(Stdio::piped())
391    ///     .spawn()?;
392    ///
393    /// let out = child.output().await?;
394    /// # std::io::Result::Ok(()) });
395    /// ```
396    pub fn output(mut self) -> impl Future<Output = io::Result<Output>> {
397        // A future that waits for the exit status.
398        let status = self.status();
399
400        // A future that collects stdout.
401        let stdout = self.stdout.take();
402        let stdout = async move {
403            let mut v = Vec::new();
404            if let Some(mut s) = stdout {
405                s.read_to_end(&mut v).await?;
406            }
407            io::Result::Ok(v)
408        };
409
410        // A future that collects stderr.
411        let stderr = self.stderr.take();
412        let stderr = async move {
413            let mut v = Vec::new();
414            if let Some(mut s) = stderr {
415                s.read_to_end(&mut v).await?;
416            }
417            io::Result::Ok(v)
418        };
419
420        async move {
421            let (stdout, stderr) = future::try_zip(stdout, stderr).await?;
422            let status = status.await?;
423            Ok(Output {
424                status,
425                stdout,
426                stderr,
427            })
428        }
429    }
430}
431
432impl fmt::Debug for Child {
433    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434        f.debug_struct("Child")
435            .field("stdin", &self.stdin)
436            .field("stdout", &self.stdout)
437            .field("stderr", &self.stderr)
438            .finish()
439    }
440}
441
442/// A handle to a child process's standard input (stdin).
443///
444/// When a [`ChildStdin`] is dropped, the underlying handle gets closed. If the child process was
445/// previously blocked on input, it becomes unblocked after dropping.
446#[derive(Debug)]
447pub struct ChildStdin(
448    #[cfg(windows)] Unblock<std::process::ChildStdin>,
449    #[cfg(unix)] Async<std::process::ChildStdin>,
450);
451
452impl ChildStdin {
453    /// Convert async_process::ChildStdin into std::process::Stdio.
454    ///
455    /// You can use it to associate to the next process.
456    ///
457    /// # Examples
458    ///
459    /// ```no_run
460    /// # futures_lite::future::block_on(async {
461    /// use async_process::Command;
462    /// use std::process::Stdio;
463    ///
464    /// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
465    /// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
466    ///
467    /// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
468    ///
469    /// # std::io::Result::Ok(()) });
470    /// ```
471    pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
472        cfg_if::cfg_if! {
473            if #[cfg(windows)] {
474                Ok(self.0.into_inner().await.into())
475            } else if #[cfg(unix)] {
476                let child_stdin = self.0.into_inner()?;
477                blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
478                Ok(child_stdin.into())
479            }
480        }
481    }
482}
483
484impl io::AsyncWrite for ChildStdin {
485    fn poll_write(
486        mut self: Pin<&mut Self>,
487        cx: &mut Context<'_>,
488        buf: &[u8],
489    ) -> Poll<io::Result<usize>> {
490        Pin::new(&mut self.0).poll_write(cx, buf)
491    }
492
493    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
494        Pin::new(&mut self.0).poll_flush(cx)
495    }
496
497    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
498        Pin::new(&mut self.0).poll_close(cx)
499    }
500}
501
502#[cfg(unix)]
503impl AsRawFd for ChildStdin {
504    fn as_raw_fd(&self) -> RawFd {
505        self.0.as_raw_fd()
506    }
507}
508
509#[cfg(unix)]
510impl AsFd for ChildStdin {
511    fn as_fd(&self) -> BorrowedFd<'_> {
512        self.0.as_fd()
513    }
514}
515
516#[cfg(unix)]
517impl TryFrom<ChildStdin> for OwnedFd {
518    type Error = io::Error;
519
520    fn try_from(value: ChildStdin) -> Result<Self, Self::Error> {
521        value.0.try_into()
522    }
523}
524
525// TODO(notgull): Add mirroring AsRawHandle impls for all of the child handles
526//
527// at the moment this is pretty hard to do because of how they're wrapped in
528// Unblock, meaning that we can't always access the underlying handle. async-fs
529// gets around this by putting the handle in an Arc, but there's still some decision
530// to be made about how to handle this (no pun intended)
531
532/// A handle to a child process's standard output (stdout).
533///
534/// When a [`ChildStdout`] is dropped, the underlying handle gets closed.
535#[derive(Debug)]
536pub struct ChildStdout(
537    #[cfg(windows)] Unblock<std::process::ChildStdout>,
538    #[cfg(unix)] Async<std::process::ChildStdout>,
539);
540
541impl ChildStdout {
542    /// Convert async_process::ChildStdout into std::process::Stdio.
543    ///
544    /// You can use it to associate to the next process.
545    ///
546    /// # Examples
547    ///
548    /// ```no_run
549    /// # futures_lite::future::block_on(async {
550    /// use async_process::Command;
551    /// use std::process::Stdio;
552    /// use std::io::Read;
553    /// use futures_lite::AsyncReadExt;
554    ///
555    /// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
556    /// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
557    ///
558    /// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
559    /// let mut buf = vec![];
560    /// echo_child.stdout.take().unwrap().read(&mut buf).await;
561    /// # std::io::Result::Ok(()) });
562    /// ```
563    pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
564        cfg_if::cfg_if! {
565            if #[cfg(windows)] {
566                Ok(self.0.into_inner().await.into())
567            } else if #[cfg(unix)] {
568                let child_stdout = self.0.into_inner()?;
569                blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
570                Ok(child_stdout.into())
571            }
572        }
573    }
574}
575
576impl io::AsyncRead for ChildStdout {
577    fn poll_read(
578        mut self: Pin<&mut Self>,
579        cx: &mut Context<'_>,
580        buf: &mut [u8],
581    ) -> Poll<io::Result<usize>> {
582        Pin::new(&mut self.0).poll_read(cx, buf)
583    }
584}
585
586#[cfg(unix)]
587impl AsRawFd for ChildStdout {
588    fn as_raw_fd(&self) -> RawFd {
589        self.0.as_raw_fd()
590    }
591}
592
593#[cfg(unix)]
594impl AsFd for ChildStdout {
595    fn as_fd(&self) -> BorrowedFd<'_> {
596        self.0.as_fd()
597    }
598}
599
600#[cfg(unix)]
601impl TryFrom<ChildStdout> for OwnedFd {
602    type Error = io::Error;
603
604    fn try_from(value: ChildStdout) -> Result<Self, Self::Error> {
605        value.0.try_into()
606    }
607}
608
609/// A handle to a child process's standard error (stderr).
610///
611/// When a [`ChildStderr`] is dropped, the underlying handle gets closed.
612#[derive(Debug)]
613pub struct ChildStderr(
614    #[cfg(windows)] Unblock<std::process::ChildStderr>,
615    #[cfg(unix)] Async<std::process::ChildStderr>,
616);
617
618impl ChildStderr {
619    /// Convert async_process::ChildStderr into std::process::Stdio.
620    ///
621    /// You can use it to associate to the next process.
622    ///
623    /// # Examples
624    ///
625    /// ```no_run
626    /// # futures_lite::future::block_on(async {
627    /// use async_process::Command;
628    /// use std::process::Stdio;
629    ///
630    /// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
631    /// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
632    ///
633    /// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
634    /// # std::io::Result::Ok(()) });
635    /// ```
636    pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
637        cfg_if::cfg_if! {
638            if #[cfg(windows)] {
639                Ok(self.0.into_inner().await.into())
640            } else if #[cfg(unix)] {
641                let child_stderr = self.0.into_inner()?;
642                blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
643                Ok(child_stderr.into())
644            }
645        }
646    }
647}
648
649impl io::AsyncRead for ChildStderr {
650    fn poll_read(
651        mut self: Pin<&mut Self>,
652        cx: &mut Context<'_>,
653        buf: &mut [u8],
654    ) -> Poll<io::Result<usize>> {
655        Pin::new(&mut self.0).poll_read(cx, buf)
656    }
657}
658
659#[cfg(unix)]
660impl AsRawFd for ChildStderr {
661    fn as_raw_fd(&self) -> RawFd {
662        self.0.as_raw_fd()
663    }
664}
665
666#[cfg(unix)]
667impl AsFd for ChildStderr {
668    fn as_fd(&self) -> BorrowedFd<'_> {
669        self.0.as_fd()
670    }
671}
672
673#[cfg(unix)]
674impl TryFrom<ChildStderr> for OwnedFd {
675    type Error = io::Error;
676
677    fn try_from(value: ChildStderr) -> Result<Self, Self::Error> {
678        value.0.try_into()
679    }
680}
681
682/// Runs the driver for the asynchronous processes.
683///
684/// This future takes control of global structures related to driving [`Child`]ren and reaping
685/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
686/// making sure zombie processes are successfully waited on.
687///
688/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
689/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
690/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
691/// will be spawned. The "async-process" thread just blocks on this future and will automatically
692/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
693///
694/// This future will never complete. It is intended to be ran on a background task in your
695/// executor of choice.
696///
697/// # Examples
698///
699/// ```no_run
700/// use async_executor::Executor;
701/// use async_process::{driver, Command};
702///
703/// # futures_lite::future::block_on(async {
704/// // Create an executor and run on it.
705/// let ex = Executor::new();
706/// ex.run(async {
707///     // Run the driver future in the background.
708///     ex.spawn(driver()).detach();
709///
710///     // Run a command.
711///     Command::new("ls").output().await.ok();
712/// }).await;
713/// # });
714/// ```
715#[allow(clippy::manual_async_fn)]
716#[inline]
717pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
718    async {
719        // Get the reaper.
720        let reaper = Reaper::get();
721
722        // Make sure the reaper knows we're driving it.
723        reaper.drivers.fetch_add(1, Ordering::SeqCst);
724
725        // Decrement the driver count when this future is dropped.
726        let _guard = CallOnDrop(|| {
727            let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
728
729            // If this was the last driver, and there are still resources actively using the
730            // reaper, make sure that there is a thread driving the reaper.
731            if prev_count == 1
732                && (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
733            {
734                reaper.ensure_driven();
735            }
736        });
737
738        // Acquire the reaper lock and start polling the SIGCHLD event.
739        let guard = reaper.sys.lock().await;
740        reaper.sys.reap(guard).await
741    }
742}
743
744/// A builder for spawning processes.
745///
746/// # Examples
747///
748/// ```no_run
749/// # futures_lite::future::block_on(async {
750/// use async_process::Command;
751///
752/// let output = if cfg!(target_os = "windows") {
753///     Command::new("cmd").args(&["/C", "echo hello"]).output().await?
754/// } else {
755///     Command::new("sh").arg("-c").arg("echo hello").output().await?
756/// };
757/// # std::io::Result::Ok(()) });
758/// ```
759pub struct Command {
760    inner: std::process::Command,
761    stdin: bool,
762    stdout: bool,
763    stderr: bool,
764    reap_on_drop: bool,
765    kill_on_drop: bool,
766}
767
768impl Command {
769    /// Constructs a new [`Command`] for launching `program`.
770    ///
771    /// The initial configuration (the working directory and environment variables) is inherited
772    /// from the current process.
773    ///
774    /// # Examples
775    ///
776    /// ```
777    /// use async_process::Command;
778    ///
779    /// let mut cmd = Command::new("ls");
780    /// ```
781    pub fn new<S: AsRef<OsStr>>(program: S) -> Command {
782        Self::from(std::process::Command::new(program))
783    }
784
785    /// Adds a single argument to pass to the program.
786    ///
787    /// # Examples
788    ///
789    /// ```
790    /// use async_process::Command;
791    ///
792    /// let mut cmd = Command::new("echo");
793    /// cmd.arg("hello");
794    /// cmd.arg("world");
795    /// ```
796    pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command {
797        self.inner.arg(arg);
798        self
799    }
800
801    /// Adds multiple arguments to pass to the program.
802    ///
803    /// # Examples
804    ///
805    /// ```
806    /// use async_process::Command;
807    ///
808    /// let mut cmd = Command::new("echo");
809    /// cmd.args(&["hello", "world"]);
810    /// ```
811    pub fn args<I, S>(&mut self, args: I) -> &mut Command
812    where
813        I: IntoIterator<Item = S>,
814        S: AsRef<OsStr>,
815    {
816        self.inner.args(args);
817        self
818    }
819
820    /// Configures an environment variable for the new process.
821    ///
822    /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
823    /// and case-sensitive on all other platforms.
824    ///
825    /// # Examples
826    ///
827    /// ```
828    /// use async_process::Command;
829    ///
830    /// let mut cmd = Command::new("ls");
831    /// cmd.env("PATH", "/bin");
832    /// ```
833    pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command
834    where
835        K: AsRef<OsStr>,
836        V: AsRef<OsStr>,
837    {
838        self.inner.env(key, val);
839        self
840    }
841
842    /// Configures multiple environment variables for the new process.
843    ///
844    /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
845    /// and case-sensitive on all other platforms.
846    ///
847    /// # Examples
848    ///
849    /// ```
850    /// use async_process::Command;
851    ///
852    /// let mut cmd = Command::new("ls");
853    /// cmd.envs(vec![("PATH", "/bin"), ("TERM", "xterm-256color")]);
854    /// ```
855    pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command
856    where
857        I: IntoIterator<Item = (K, V)>,
858        K: AsRef<OsStr>,
859        V: AsRef<OsStr>,
860    {
861        self.inner.envs(vars);
862        self
863    }
864
865    /// Removes an environment variable mapping.
866    ///
867    /// # Examples
868    ///
869    /// ```
870    /// use async_process::Command;
871    ///
872    /// let mut cmd = Command::new("ls");
873    /// cmd.env_remove("PATH");
874    /// ```
875    pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command {
876        self.inner.env_remove(key);
877        self
878    }
879
880    /// Removes all environment variable mappings.
881    ///
882    /// # Examples
883    ///
884    /// ```
885    /// use async_process::Command;
886    ///
887    /// let mut cmd = Command::new("ls");
888    /// cmd.env_clear();
889    /// ```
890    pub fn env_clear(&mut self) -> &mut Command {
891        self.inner.env_clear();
892        self
893    }
894
895    /// Configures the working directory for the new process.
896    ///
897    /// # Examples
898    ///
899    /// ```
900    /// use async_process::Command;
901    ///
902    /// let mut cmd = Command::new("ls");
903    /// cmd.current_dir("/");
904    /// ```
905    pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command {
906        self.inner.current_dir(dir);
907        self
908    }
909
910    /// Configures the standard input (stdin) for the new process.
911    ///
912    /// # Examples
913    ///
914    /// ```
915    /// use async_process::{Command, Stdio};
916    ///
917    /// let mut cmd = Command::new("cat");
918    /// cmd.stdin(Stdio::null());
919    /// ```
920    pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
921        self.stdin = true;
922        self.inner.stdin(cfg);
923        self
924    }
925
926    /// Configures the standard output (stdout) for the new process.
927    ///
928    /// # Examples
929    ///
930    /// ```
931    /// use async_process::{Command, Stdio};
932    ///
933    /// let mut cmd = Command::new("ls");
934    /// cmd.stdout(Stdio::piped());
935    /// ```
936    pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
937        self.stdout = true;
938        self.inner.stdout(cfg);
939        self
940    }
941
942    /// Configures the standard error (stderr) for the new process.
943    ///
944    /// # Examples
945    ///
946    /// ```
947    /// use async_process::{Command, Stdio};
948    ///
949    /// let mut cmd = Command::new("ls");
950    /// cmd.stderr(Stdio::piped());
951    /// ```
952    pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
953        self.stderr = true;
954        self.inner.stderr(cfg);
955        self
956    }
957
958    /// Configures whether to reap the zombie process when [`Child`] is dropped.
959    ///
960    /// When the process finishes, it becomes a "zombie" and some resources associated with it
961    /// remain until [`Child::try_status()`], [`Child::status()`], or [`Child::output()`] collects
962    /// its exit code.
963    ///
964    /// If its exit code is never collected, the resources may leak forever. This crate has a
965    /// background thread named "async-process" that collects such "zombie" processes and then
966    /// "reaps" them, thus preventing the resource leaks.
967    ///
968    /// The default value of this option is `true`.
969    ///
970    /// # Examples
971    ///
972    /// ```
973    /// use async_process::{Command, Stdio};
974    ///
975    /// let mut cmd = Command::new("cat");
976    /// cmd.reap_on_drop(false);
977    /// ```
978    pub fn reap_on_drop(&mut self, reap_on_drop: bool) -> &mut Command {
979        self.reap_on_drop = reap_on_drop;
980        self
981    }
982
983    /// Configures whether to kill the process when [`Child`] is dropped.
984    ///
985    /// The default value of this option is `false`.
986    ///
987    /// # Examples
988    ///
989    /// ```
990    /// use async_process::{Command, Stdio};
991    ///
992    /// let mut cmd = Command::new("cat");
993    /// cmd.kill_on_drop(true);
994    /// ```
995    pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command {
996        self.kill_on_drop = kill_on_drop;
997        self
998    }
999
1000    /// Executes the command and returns the [`Child`] handle to it.
1001    ///
1002    /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1003    ///
1004    /// # Examples
1005    ///
1006    /// ```no_run
1007    /// # futures_lite::future::block_on(async {
1008    /// use async_process::Command;
1009    ///
1010    /// let child = Command::new("ls").spawn()?;
1011    /// # std::io::Result::Ok(()) });
1012    /// ```
1013    pub fn spawn(&mut self) -> io::Result<Child> {
1014        if !self.stdin {
1015            self.inner.stdin(Stdio::inherit());
1016        }
1017        if !self.stdout {
1018            self.inner.stdout(Stdio::inherit());
1019        }
1020        if !self.stderr {
1021            self.inner.stderr(Stdio::inherit());
1022        }
1023
1024        Child::new(self)
1025    }
1026
1027    /// Executes the command, waits for it to exit, and returns the exit status.
1028    ///
1029    /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1030    ///
1031    /// # Examples
1032    ///
1033    /// ```no_run
1034    /// # futures_lite::future::block_on(async {
1035    /// use async_process::Command;
1036    ///
1037    /// let status = Command::new("cp")
1038    ///     .arg("a.txt")
1039    ///     .arg("b.txt")
1040    ///     .status()
1041    ///     .await?;
1042    /// # std::io::Result::Ok(()) });
1043    /// ```
1044    pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
1045        let child = self.spawn();
1046        async { child?.status().await }
1047    }
1048
1049    /// Executes the command and collects its output.
1050    ///
1051    /// If not configured, stdin will be set to [`Stdio::null()`], and stdout and stderr will be
1052    /// set to [`Stdio::piped()`].
1053    ///
1054    /// # Examples
1055    ///
1056    /// ```no_run
1057    /// # futures_lite::future::block_on(async {
1058    /// use async_process::Command;
1059    ///
1060    /// let output = Command::new("cat")
1061    ///     .arg("a.txt")
1062    ///     .output()
1063    ///     .await?;
1064    /// # std::io::Result::Ok(()) });
1065    /// ```
1066    pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> {
1067        if !self.stdin {
1068            self.inner.stdin(Stdio::null());
1069        }
1070        if !self.stdout {
1071            self.inner.stdout(Stdio::piped());
1072        }
1073        if !self.stderr {
1074            self.inner.stderr(Stdio::piped());
1075        }
1076
1077        let child = Child::new(self);
1078        async { child?.output().await }
1079    }
1080}
1081
1082impl From<std::process::Command> for Command {
1083    fn from(inner: std::process::Command) -> Self {
1084        Self {
1085            inner,
1086            stdin: false,
1087            stdout: false,
1088            stderr: false,
1089            reap_on_drop: true,
1090            kill_on_drop: false,
1091        }
1092    }
1093}
1094
1095impl fmt::Debug for Command {
1096    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1097        if f.alternate() {
1098            f.debug_struct("Command")
1099                .field("inner", &self.inner)
1100                .field("stdin", &self.stdin)
1101                .field("stdout", &self.stdout)
1102                .field("stderr", &self.stderr)
1103                .field("reap_on_drop", &self.reap_on_drop)
1104                .field("kill_on_drop", &self.kill_on_drop)
1105                .finish()
1106        } else {
1107            // Stdlib outputs command-line in Debug for Command. This does the
1108            // same, if not in "alternate" (long pretty-printed) mode.
1109            // This is useful for logs, for example.
1110            fmt::Debug::fmt(&self.inner, f)
1111        }
1112    }
1113}
1114
1115/// Moves `Fd` out of non-blocking mode.
1116#[cfg(unix)]
1117fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
1118    cfg_if::cfg_if! {
1119        // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
1120        // for now, as with the standard library, because it seems to behave
1121        // differently depending on the platform.
1122        // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
1123        // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
1124        // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
1125        if #[cfg(target_os = "linux")] {
1126            rustix::io::ioctl_fionbio(fd, false)?;
1127        } else {
1128            let previous = rustix::fs::fcntl_getfl(fd)?;
1129            let new = previous & !rustix::fs::OFlags::NONBLOCK;
1130            if new != previous {
1131                rustix::fs::fcntl_setfl(fd, new)?;
1132            }
1133        }
1134    }
1135    Ok(())
1136}
1137
1138struct CallOnDrop<F: FnMut()>(F);
1139
1140impl<F: FnMut()> Drop for CallOnDrop<F> {
1141    fn drop(&mut self) {
1142        (self.0)();
1143    }
1144}
1145
1146#[cfg(test)]
1147mod test {
1148    #[test]
1149    fn polled_driver() {
1150        use super::{driver, Command};
1151        use futures_lite::future;
1152        use futures_lite::prelude::*;
1153
1154        let is_thread_spawned =
1155            || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
1156
1157        #[cfg(unix)]
1158        fn command() -> Command {
1159            let mut cmd = Command::new("sh");
1160            cmd.arg("-c").arg("echo hello");
1161            cmd
1162        }
1163
1164        #[cfg(windows)]
1165        fn command() -> Command {
1166            let mut cmd = Command::new("cmd");
1167            cmd.arg("/C").arg("echo hello");
1168            cmd
1169        }
1170
1171        #[cfg(unix)]
1172        const OUTPUT: &[u8] = b"hello\n";
1173        #[cfg(windows)]
1174        const OUTPUT: &[u8] = b"hello\r\n";
1175
1176        future::block_on(async {
1177            // Thread should not be spawned off the bat.
1178            assert!(!is_thread_spawned());
1179
1180            // Spawn a driver.
1181            let mut driver1 = Box::pin(driver());
1182            future::poll_once(&mut driver1).await;
1183            assert!(!is_thread_spawned());
1184
1185            // We should be able to run the driver in parallel with a process future.
1186            async {
1187                (&mut driver1).await;
1188            }
1189            .or(async {
1190                let output = command().output().await.unwrap();
1191                assert_eq!(output.stdout, OUTPUT);
1192            })
1193            .await;
1194            assert!(!is_thread_spawned());
1195
1196            // Spawn a second driver.
1197            let mut driver2 = Box::pin(driver());
1198            future::poll_once(&mut driver2).await;
1199            assert!(!is_thread_spawned());
1200
1201            // Poll both drivers in parallel.
1202            async {
1203                (&mut driver1).await;
1204            }
1205            .or(async {
1206                (&mut driver2).await;
1207            })
1208            .or(async {
1209                let output = command().output().await.unwrap();
1210                assert_eq!(output.stdout, OUTPUT);
1211            })
1212            .await;
1213            assert!(!is_thread_spawned());
1214
1215            // Once one is dropped, the other should take over.
1216            drop(driver1);
1217            assert!(!is_thread_spawned());
1218
1219            // Poll driver2 in parallel with a process future.
1220            async {
1221                (&mut driver2).await;
1222            }
1223            .or(async {
1224                let output = command().output().await.unwrap();
1225                assert_eq!(output.stdout, OUTPUT);
1226            })
1227            .await;
1228            assert!(!is_thread_spawned());
1229
1230            // Once driver2 is dropped, the thread should not be spawned, as there are no active
1231            // child processes..
1232            drop(driver2);
1233            assert!(!is_thread_spawned());
1234
1235            // We should now be able to poll the process future independently, it will spawn the
1236            // thread.
1237            let output = command().output().await.unwrap();
1238            assert_eq!(output.stdout, OUTPUT);
1239            assert!(is_thread_spawned());
1240        });
1241    }
1242}