async_process/lib.rs
1//! Async interface for working with processes.
2//!
3//! This crate is an async version of [`std::process`].
4//!
5//! # Implementation
6//!
7//! A background thread named "async-process" is lazily created on first use, which waits for
8//! spawned child processes to exit and then calls the `wait()` syscall to clean up the "zombie"
9//! processes. This is unlike the `process` API in the standard library, where dropping a running
10//! `Child` leaks its resources.
11//!
12//! This crate uses [`async-io`] for async I/O on Unix-like systems and [`blocking`] for async I/O
13//! on Windows.
14//!
15//! [`async-io`]: https://docs.rs/async-io
16//! [`blocking`]: https://docs.rs/blocking
17//!
18//! # Examples
19//!
20//! Spawn a process and collect its output:
21//!
22//! ```no_run
23//! # futures_lite::future::block_on(async {
24//! use async_process::Command;
25//!
26//! let out = Command::new("echo").arg("hello").arg("world").output().await?;
27//! assert_eq!(out.stdout, b"hello world\n");
28//! # std::io::Result::Ok(()) });
29//! ```
30//!
31//! Read the output line-by-line as it gets produced:
32//!
33//! ```no_run
34//! # futures_lite::future::block_on(async {
35//! use async_process::{Command, Stdio};
36//! use futures_lite::{io::BufReader, prelude::*};
37//!
38//! let mut child = Command::new("find")
39//! .arg(".")
40//! .stdout(Stdio::piped())
41//! .spawn()?;
42//!
43//! let mut lines = BufReader::new(child.stdout.take().unwrap()).lines();
44//!
45//! while let Some(line) = lines.next().await {
46//! println!("{}", line?);
47//! }
48//! # std::io::Result::Ok(()) });
49//! ```
50
51#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
52#![doc(
53 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
54)]
55#![doc(
56 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
57)]
58
59use std::convert::Infallible;
60use std::ffi::OsStr;
61use std::fmt;
62use std::path::Path;
63use std::pin::Pin;
64use std::sync::atomic::{AtomicUsize, Ordering};
65use std::sync::{Arc, Mutex};
66use std::task::{Context, Poll};
67use std::thread;
68
69#[cfg(unix)]
70use async_io::Async;
71#[cfg(unix)]
72use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
73
74#[cfg(windows)]
75use blocking::Unblock;
76
77use async_lock::OnceCell;
78use futures_lite::{future, io, prelude::*};
79
80#[doc(no_inline)]
81pub use std::process::{ExitStatus, Output, Stdio};
82
83#[cfg(unix)]
84pub mod unix;
85#[cfg(windows)]
86pub mod windows;
87
88mod reaper;
89
90mod sealed {
91 pub trait Sealed {}
92}
93
94#[cfg(test)]
95static DRIVER_THREAD_SPAWNED: std::sync::atomic::AtomicBool =
96 std::sync::atomic::AtomicBool::new(false);
97
98/// The zombie process reaper.
99///
100/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
101struct Reaper {
102 /// Underlying system reaper.
103 sys: reaper::Reaper,
104
105 /// The number of tasks polling the SIGCHLD event.
106 ///
107 /// If this is zero, the `async-process` thread must be spawned.
108 drivers: AtomicUsize,
109
110 /// Number of live `Child` instances currently running.
111 ///
112 /// This is used to prevent the reaper thread from being spawned right as the program closes,
113 /// when the reaper thread isn't needed. This represents the number of active processes.
114 child_count: AtomicUsize,
115}
116
117impl Reaper {
118 /// Get the singleton instance of the reaper.
119 fn get() -> &'static Self {
120 static REAPER: OnceCell<Reaper> = OnceCell::new();
121
122 REAPER.get_or_init_blocking(|| Reaper {
123 sys: reaper::Reaper::new(),
124 drivers: AtomicUsize::new(0),
125 child_count: AtomicUsize::new(0),
126 })
127 }
128
129 /// Ensure that the reaper is driven.
130 ///
131 /// If there are no active `driver()` callers, this will spawn the `async-process` thread.
132 #[inline]
133 fn ensure_driven(&'static self) {
134 if self
135 .drivers
136 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Acquire)
137 .is_ok()
138 {
139 self.start_driver_thread();
140 }
141 }
142
143 /// Start the `async-process` thread.
144 #[cold]
145 fn start_driver_thread(&'static self) {
146 #[cfg(test)]
147 DRIVER_THREAD_SPAWNED
148 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
149 .unwrap_or_else(|_| unreachable!("Driver thread already spawned"));
150
151 thread::Builder::new()
152 .name("async-process".to_string())
153 .spawn(move || {
154 let driver = async move {
155 // No need to bump self.drivers, it was already bumped in ensure_driven.
156 let guard = self.sys.lock().await;
157 self.sys.reap(guard).await
158 };
159
160 #[cfg(unix)]
161 async_io::block_on(driver);
162
163 #[cfg(not(unix))]
164 future::block_on(driver);
165 })
166 .expect("cannot spawn async-process thread");
167 }
168
169 /// Register a process with this reaper.
170 fn register(&'static self, child: std::process::Child) -> io::Result<reaper::ChildGuard> {
171 self.ensure_driven();
172 self.sys.register(child)
173 }
174}
175
176cfg_if::cfg_if! {
177 if #[cfg(windows)] {
178 // Wraps a sync I/O type into an async I/O type.
179 fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
180 Ok(Unblock::new(io))
181 }
182 } else if #[cfg(unix)] {
183 /// Wrap a file descriptor into a non-blocking I/O type.
184 fn wrap<T: std::os::unix::io::AsFd>(io: T) -> io::Result<Async<T>> {
185 Async::new(io)
186 }
187 }
188}
189
190/// A guard that can kill child processes, or push them into the zombie list.
191struct ChildGuard {
192 inner: reaper::ChildGuard,
193 reap_on_drop: bool,
194 kill_on_drop: bool,
195 reaper: &'static Reaper,
196}
197
198impl ChildGuard {
199 fn get_mut(&mut self) -> &mut std::process::Child {
200 self.inner.get_mut()
201 }
202}
203
204// When the last reference to the child process is dropped, push it into the zombie list.
205impl Drop for ChildGuard {
206 fn drop(&mut self) {
207 if self.kill_on_drop {
208 self.get_mut().kill().ok();
209 }
210 if self.reap_on_drop {
211 self.inner.reap(&self.reaper.sys);
212 }
213
214 // Decrement number of children.
215 self.reaper.child_count.fetch_sub(1, Ordering::Acquire);
216 }
217}
218
219/// A spawned child process.
220///
221/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
222/// [`output()`][`Child::output()`] to wait for it to exit.
223///
224/// If the [`Child`] is dropped, the process keeps running in the background.
225///
226/// # Examples
227///
228/// Spawn a process and wait for it to complete:
229///
230/// ```no_run
231/// # futures_lite::future::block_on(async {
232/// use async_process::Command;
233///
234/// Command::new("cp").arg("a.txt").arg("b.txt").status().await?;
235/// # std::io::Result::Ok(()) });
236/// ```
237pub struct Child {
238 /// The handle for writing to the child's standard input (stdin), if it has been captured.
239 pub stdin: Option<ChildStdin>,
240
241 /// The handle for reading from the child's standard output (stdout), if it has been captured.
242 pub stdout: Option<ChildStdout>,
243
244 /// The handle for reading from the child's standard error (stderr), if it has been captured.
245 pub stderr: Option<ChildStderr>,
246
247 /// The inner child process handle.
248 child: Arc<Mutex<ChildGuard>>,
249}
250
251impl Child {
252 /// Wraps the inner child process handle and registers it in the global process list.
253 ///
254 /// The "async-process" thread waits for processes in the global list and cleans up the
255 /// resources when they exit.
256 fn new(cmd: &mut Command) -> io::Result<Child> {
257 // Make sure the reaper exists before we spawn the child process.
258 let reaper = Reaper::get();
259 let mut child = cmd.inner.spawn()?;
260
261 // Convert sync I/O types into async I/O types.
262 let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
263 let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
264 let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);
265
266 // Bump the child count.
267 reaper.child_count.fetch_add(1, Ordering::Relaxed);
268
269 // Register the child process in the global list.
270 let inner = reaper.register(child)?;
271
272 Ok(Child {
273 stdin,
274 stdout,
275 stderr,
276 child: Arc::new(Mutex::new(ChildGuard {
277 inner,
278 reap_on_drop: cmd.reap_on_drop,
279 kill_on_drop: cmd.kill_on_drop,
280 reaper,
281 })),
282 })
283 }
284
285 /// Returns the OS-assigned process identifier associated with this child.
286 ///
287 /// # Examples
288 ///
289 /// ```no_run
290 /// # futures_lite::future::block_on(async {
291 /// use async_process::Command;
292 ///
293 /// let mut child = Command::new("ls").spawn()?;
294 /// println!("id: {}", child.id());
295 /// # std::io::Result::Ok(()) });
296 /// ```
297 pub fn id(&self) -> u32 {
298 self.child.lock().unwrap().get_mut().id()
299 }
300
301 /// Forces the child process to exit.
302 ///
303 /// If the child has already exited, an [`InvalidInput`] error is returned.
304 ///
305 /// This is equivalent to sending a SIGKILL on Unix platforms.
306 ///
307 /// [`InvalidInput`]: `std::io::ErrorKind::InvalidInput`
308 ///
309 /// # Examples
310 ///
311 /// ```no_run
312 /// # futures_lite::future::block_on(async {
313 /// use async_process::Command;
314 ///
315 /// let mut child = Command::new("yes").spawn()?;
316 /// child.kill()?;
317 /// println!("exit status: {}", child.status().await?);
318 /// # std::io::Result::Ok(()) });
319 /// ```
320 pub fn kill(&mut self) -> io::Result<()> {
321 self.child.lock().unwrap().get_mut().kill()
322 }
323
324 /// Returns the exit status if the process has exited.
325 ///
326 /// Unlike [`status()`][`Child::status()`], this method will not drop the stdin handle.
327 ///
328 /// # Examples
329 ///
330 /// ```no_run
331 /// # futures_lite::future::block_on(async {
332 /// use async_process::Command;
333 ///
334 /// let mut child = Command::new("ls").spawn()?;
335 ///
336 /// match child.try_status()? {
337 /// None => println!("still running"),
338 /// Some(status) => println!("exited with: {}", status),
339 /// }
340 /// # std::io::Result::Ok(()) });
341 /// ```
342 pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
343 self.child.lock().unwrap().get_mut().try_wait()
344 }
345
346 /// Drops the stdin handle and waits for the process to exit.
347 ///
348 /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
349 /// not block waiting for input from the parent process while the parent waits for the child to
350 /// exit.
351 ///
352 /// # Examples
353 ///
354 /// ```no_run
355 /// # futures_lite::future::block_on(async {
356 /// use async_process::{Command, Stdio};
357 ///
358 /// let mut child = Command::new("cp")
359 /// .arg("a.txt")
360 /// .arg("b.txt")
361 /// .spawn()?;
362 ///
363 /// println!("exit status: {}", child.status().await?);
364 /// # std::io::Result::Ok(()) });
365 /// ```
366 pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
367 self.stdin.take();
368 let child = self.child.clone();
369
370 async move { Reaper::get().sys.status(&child).await }
371 }
372
373 /// Drops the stdin handle and collects the output of the process.
374 ///
375 /// Closing the stdin of the process helps avoid deadlocks. It ensures that the process does
376 /// not block waiting for input from the parent process while the parent waits for the child to
377 /// exit.
378 ///
379 /// In order to capture the output of the process, [`Command::stdout()`] and
380 /// [`Command::stderr()`] must be configured with [`Stdio::piped()`].
381 ///
382 /// # Examples
383 ///
384 /// ```no_run
385 /// # futures_lite::future::block_on(async {
386 /// use async_process::{Command, Stdio};
387 ///
388 /// let child = Command::new("ls")
389 /// .stdout(Stdio::piped())
390 /// .stderr(Stdio::piped())
391 /// .spawn()?;
392 ///
393 /// let out = child.output().await?;
394 /// # std::io::Result::Ok(()) });
395 /// ```
396 pub fn output(mut self) -> impl Future<Output = io::Result<Output>> {
397 // A future that waits for the exit status.
398 let status = self.status();
399
400 // A future that collects stdout.
401 let stdout = self.stdout.take();
402 let stdout = async move {
403 let mut v = Vec::new();
404 if let Some(mut s) = stdout {
405 s.read_to_end(&mut v).await?;
406 }
407 io::Result::Ok(v)
408 };
409
410 // A future that collects stderr.
411 let stderr = self.stderr.take();
412 let stderr = async move {
413 let mut v = Vec::new();
414 if let Some(mut s) = stderr {
415 s.read_to_end(&mut v).await?;
416 }
417 io::Result::Ok(v)
418 };
419
420 async move {
421 let (stdout, stderr) = future::try_zip(stdout, stderr).await?;
422 let status = status.await?;
423 Ok(Output {
424 status,
425 stdout,
426 stderr,
427 })
428 }
429 }
430}
431
432impl fmt::Debug for Child {
433 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
434 f.debug_struct("Child")
435 .field("stdin", &self.stdin)
436 .field("stdout", &self.stdout)
437 .field("stderr", &self.stderr)
438 .finish()
439 }
440}
441
442/// A handle to a child process's standard input (stdin).
443///
444/// When a [`ChildStdin`] is dropped, the underlying handle gets closed. If the child process was
445/// previously blocked on input, it becomes unblocked after dropping.
446#[derive(Debug)]
447pub struct ChildStdin(
448 #[cfg(windows)] Unblock<std::process::ChildStdin>,
449 #[cfg(unix)] Async<std::process::ChildStdin>,
450);
451
452impl ChildStdin {
453 /// Convert async_process::ChildStdin into std::process::Stdio.
454 ///
455 /// You can use it to associate to the next process.
456 ///
457 /// # Examples
458 ///
459 /// ```no_run
460 /// # futures_lite::future::block_on(async {
461 /// use async_process::Command;
462 /// use std::process::Stdio;
463 ///
464 /// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
465 /// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
466 ///
467 /// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
468 ///
469 /// # std::io::Result::Ok(()) });
470 /// ```
471 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
472 cfg_if::cfg_if! {
473 if #[cfg(windows)] {
474 Ok(self.0.into_inner().await.into())
475 } else if #[cfg(unix)] {
476 let child_stdin = self.0.into_inner()?;
477 blocking_fd(rustix::fd::AsFd::as_fd(&child_stdin))?;
478 Ok(child_stdin.into())
479 }
480 }
481 }
482}
483
484impl io::AsyncWrite for ChildStdin {
485 fn poll_write(
486 mut self: Pin<&mut Self>,
487 cx: &mut Context<'_>,
488 buf: &[u8],
489 ) -> Poll<io::Result<usize>> {
490 Pin::new(&mut self.0).poll_write(cx, buf)
491 }
492
493 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
494 Pin::new(&mut self.0).poll_flush(cx)
495 }
496
497 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
498 Pin::new(&mut self.0).poll_close(cx)
499 }
500}
501
502#[cfg(unix)]
503impl AsRawFd for ChildStdin {
504 fn as_raw_fd(&self) -> RawFd {
505 self.0.as_raw_fd()
506 }
507}
508
509#[cfg(unix)]
510impl AsFd for ChildStdin {
511 fn as_fd(&self) -> BorrowedFd<'_> {
512 self.0.as_fd()
513 }
514}
515
516#[cfg(unix)]
517impl TryFrom<ChildStdin> for OwnedFd {
518 type Error = io::Error;
519
520 fn try_from(value: ChildStdin) -> Result<Self, Self::Error> {
521 value.0.try_into()
522 }
523}
524
525// TODO(notgull): Add mirroring AsRawHandle impls for all of the child handles
526//
527// at the moment this is pretty hard to do because of how they're wrapped in
528// Unblock, meaning that we can't always access the underlying handle. async-fs
529// gets around this by putting the handle in an Arc, but there's still some decision
530// to be made about how to handle this (no pun intended)
531
532/// A handle to a child process's standard output (stdout).
533///
534/// When a [`ChildStdout`] is dropped, the underlying handle gets closed.
535#[derive(Debug)]
536pub struct ChildStdout(
537 #[cfg(windows)] Unblock<std::process::ChildStdout>,
538 #[cfg(unix)] Async<std::process::ChildStdout>,
539);
540
541impl ChildStdout {
542 /// Convert async_process::ChildStdout into std::process::Stdio.
543 ///
544 /// You can use it to associate to the next process.
545 ///
546 /// # Examples
547 ///
548 /// ```no_run
549 /// # futures_lite::future::block_on(async {
550 /// use async_process::Command;
551 /// use std::process::Stdio;
552 /// use std::io::Read;
553 /// use futures_lite::AsyncReadExt;
554 ///
555 /// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
556 /// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
557 ///
558 /// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
559 /// let mut buf = vec![];
560 /// echo_child.stdout.take().unwrap().read(&mut buf).await;
561 /// # std::io::Result::Ok(()) });
562 /// ```
563 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
564 cfg_if::cfg_if! {
565 if #[cfg(windows)] {
566 Ok(self.0.into_inner().await.into())
567 } else if #[cfg(unix)] {
568 let child_stdout = self.0.into_inner()?;
569 blocking_fd(rustix::fd::AsFd::as_fd(&child_stdout))?;
570 Ok(child_stdout.into())
571 }
572 }
573 }
574}
575
576impl io::AsyncRead for ChildStdout {
577 fn poll_read(
578 mut self: Pin<&mut Self>,
579 cx: &mut Context<'_>,
580 buf: &mut [u8],
581 ) -> Poll<io::Result<usize>> {
582 Pin::new(&mut self.0).poll_read(cx, buf)
583 }
584}
585
586#[cfg(unix)]
587impl AsRawFd for ChildStdout {
588 fn as_raw_fd(&self) -> RawFd {
589 self.0.as_raw_fd()
590 }
591}
592
593#[cfg(unix)]
594impl AsFd for ChildStdout {
595 fn as_fd(&self) -> BorrowedFd<'_> {
596 self.0.as_fd()
597 }
598}
599
600#[cfg(unix)]
601impl TryFrom<ChildStdout> for OwnedFd {
602 type Error = io::Error;
603
604 fn try_from(value: ChildStdout) -> Result<Self, Self::Error> {
605 value.0.try_into()
606 }
607}
608
609/// A handle to a child process's standard error (stderr).
610///
611/// When a [`ChildStderr`] is dropped, the underlying handle gets closed.
612#[derive(Debug)]
613pub struct ChildStderr(
614 #[cfg(windows)] Unblock<std::process::ChildStderr>,
615 #[cfg(unix)] Async<std::process::ChildStderr>,
616);
617
618impl ChildStderr {
619 /// Convert async_process::ChildStderr into std::process::Stdio.
620 ///
621 /// You can use it to associate to the next process.
622 ///
623 /// # Examples
624 ///
625 /// ```no_run
626 /// # futures_lite::future::block_on(async {
627 /// use async_process::Command;
628 /// use std::process::Stdio;
629 ///
630 /// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
631 /// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
632 ///
633 /// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
634 /// # std::io::Result::Ok(()) });
635 /// ```
636 pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
637 cfg_if::cfg_if! {
638 if #[cfg(windows)] {
639 Ok(self.0.into_inner().await.into())
640 } else if #[cfg(unix)] {
641 let child_stderr = self.0.into_inner()?;
642 blocking_fd(rustix::fd::AsFd::as_fd(&child_stderr))?;
643 Ok(child_stderr.into())
644 }
645 }
646 }
647}
648
649impl io::AsyncRead for ChildStderr {
650 fn poll_read(
651 mut self: Pin<&mut Self>,
652 cx: &mut Context<'_>,
653 buf: &mut [u8],
654 ) -> Poll<io::Result<usize>> {
655 Pin::new(&mut self.0).poll_read(cx, buf)
656 }
657}
658
659#[cfg(unix)]
660impl AsRawFd for ChildStderr {
661 fn as_raw_fd(&self) -> RawFd {
662 self.0.as_raw_fd()
663 }
664}
665
666#[cfg(unix)]
667impl AsFd for ChildStderr {
668 fn as_fd(&self) -> BorrowedFd<'_> {
669 self.0.as_fd()
670 }
671}
672
673#[cfg(unix)]
674impl TryFrom<ChildStderr> for OwnedFd {
675 type Error = io::Error;
676
677 fn try_from(value: ChildStderr) -> Result<Self, Self::Error> {
678 value.0.try_into()
679 }
680}
681
682/// Runs the driver for the asynchronous processes.
683///
684/// This future takes control of global structures related to driving [`Child`]ren and reaping
685/// zombie processes. These responsibilities include listening for the `SIGCHLD` signal and
686/// making sure zombie processes are successfully waited on.
687///
688/// If multiple tasks run `driver()` at once, only one will actually drive the reaper; the other
689/// ones will just sleep. If a task that is driving the reaper is dropped, a previously sleeping
690/// task will take over. If all tasks driving the reaper are dropped, the "async-process" thread
691/// will be spawned. The "async-process" thread just blocks on this future and will automatically
692/// be spawned if no tasks are driving the reaper once a [`Child`] is created.
693///
694/// This future will never complete. It is intended to be ran on a background task in your
695/// executor of choice.
696///
697/// # Examples
698///
699/// ```no_run
700/// use async_executor::Executor;
701/// use async_process::{driver, Command};
702///
703/// # futures_lite::future::block_on(async {
704/// // Create an executor and run on it.
705/// let ex = Executor::new();
706/// ex.run(async {
707/// // Run the driver future in the background.
708/// ex.spawn(driver()).detach();
709///
710/// // Run a command.
711/// Command::new("ls").output().await.ok();
712/// }).await;
713/// # });
714/// ```
715#[allow(clippy::manual_async_fn)]
716#[inline]
717pub fn driver() -> impl Future<Output = Infallible> + Send + 'static {
718 async {
719 // Get the reaper.
720 let reaper = Reaper::get();
721
722 // Make sure the reaper knows we're driving it.
723 reaper.drivers.fetch_add(1, Ordering::SeqCst);
724
725 // Decrement the driver count when this future is dropped.
726 let _guard = CallOnDrop(|| {
727 let prev_count = reaper.drivers.fetch_sub(1, Ordering::SeqCst);
728
729 // If this was the last driver, and there are still resources actively using the
730 // reaper, make sure that there is a thread driving the reaper.
731 if prev_count == 1
732 && (reaper.child_count.load(Ordering::SeqCst) > 0 || reaper.sys.has_zombies())
733 {
734 reaper.ensure_driven();
735 }
736 });
737
738 // Acquire the reaper lock and start polling the SIGCHLD event.
739 let guard = reaper.sys.lock().await;
740 reaper.sys.reap(guard).await
741 }
742}
743
744/// A builder for spawning processes.
745///
746/// # Examples
747///
748/// ```no_run
749/// # futures_lite::future::block_on(async {
750/// use async_process::Command;
751///
752/// let output = if cfg!(target_os = "windows") {
753/// Command::new("cmd").args(&["/C", "echo hello"]).output().await?
754/// } else {
755/// Command::new("sh").arg("-c").arg("echo hello").output().await?
756/// };
757/// # std::io::Result::Ok(()) });
758/// ```
759pub struct Command {
760 inner: std::process::Command,
761 stdin: bool,
762 stdout: bool,
763 stderr: bool,
764 reap_on_drop: bool,
765 kill_on_drop: bool,
766}
767
768impl Command {
769 /// Constructs a new [`Command`] for launching `program`.
770 ///
771 /// The initial configuration (the working directory and environment variables) is inherited
772 /// from the current process.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// use async_process::Command;
778 ///
779 /// let mut cmd = Command::new("ls");
780 /// ```
781 pub fn new<S: AsRef<OsStr>>(program: S) -> Command {
782 Self::from(std::process::Command::new(program))
783 }
784
785 /// Adds a single argument to pass to the program.
786 ///
787 /// # Examples
788 ///
789 /// ```
790 /// use async_process::Command;
791 ///
792 /// let mut cmd = Command::new("echo");
793 /// cmd.arg("hello");
794 /// cmd.arg("world");
795 /// ```
796 pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command {
797 self.inner.arg(arg);
798 self
799 }
800
801 /// Adds multiple arguments to pass to the program.
802 ///
803 /// # Examples
804 ///
805 /// ```
806 /// use async_process::Command;
807 ///
808 /// let mut cmd = Command::new("echo");
809 /// cmd.args(&["hello", "world"]);
810 /// ```
811 pub fn args<I, S>(&mut self, args: I) -> &mut Command
812 where
813 I: IntoIterator<Item = S>,
814 S: AsRef<OsStr>,
815 {
816 self.inner.args(args);
817 self
818 }
819
820 /// Configures an environment variable for the new process.
821 ///
822 /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
823 /// and case-sensitive on all other platforms.
824 ///
825 /// # Examples
826 ///
827 /// ```
828 /// use async_process::Command;
829 ///
830 /// let mut cmd = Command::new("ls");
831 /// cmd.env("PATH", "/bin");
832 /// ```
833 pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command
834 where
835 K: AsRef<OsStr>,
836 V: AsRef<OsStr>,
837 {
838 self.inner.env(key, val);
839 self
840 }
841
842 /// Configures multiple environment variables for the new process.
843 ///
844 /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
845 /// and case-sensitive on all other platforms.
846 ///
847 /// # Examples
848 ///
849 /// ```
850 /// use async_process::Command;
851 ///
852 /// let mut cmd = Command::new("ls");
853 /// cmd.envs(vec![("PATH", "/bin"), ("TERM", "xterm-256color")]);
854 /// ```
855 pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command
856 where
857 I: IntoIterator<Item = (K, V)>,
858 K: AsRef<OsStr>,
859 V: AsRef<OsStr>,
860 {
861 self.inner.envs(vars);
862 self
863 }
864
865 /// Removes an environment variable mapping.
866 ///
867 /// # Examples
868 ///
869 /// ```
870 /// use async_process::Command;
871 ///
872 /// let mut cmd = Command::new("ls");
873 /// cmd.env_remove("PATH");
874 /// ```
875 pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command {
876 self.inner.env_remove(key);
877 self
878 }
879
880 /// Removes all environment variable mappings.
881 ///
882 /// # Examples
883 ///
884 /// ```
885 /// use async_process::Command;
886 ///
887 /// let mut cmd = Command::new("ls");
888 /// cmd.env_clear();
889 /// ```
890 pub fn env_clear(&mut self) -> &mut Command {
891 self.inner.env_clear();
892 self
893 }
894
895 /// Configures the working directory for the new process.
896 ///
897 /// # Examples
898 ///
899 /// ```
900 /// use async_process::Command;
901 ///
902 /// let mut cmd = Command::new("ls");
903 /// cmd.current_dir("/");
904 /// ```
905 pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command {
906 self.inner.current_dir(dir);
907 self
908 }
909
910 /// Configures the standard input (stdin) for the new process.
911 ///
912 /// # Examples
913 ///
914 /// ```
915 /// use async_process::{Command, Stdio};
916 ///
917 /// let mut cmd = Command::new("cat");
918 /// cmd.stdin(Stdio::null());
919 /// ```
920 pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
921 self.stdin = true;
922 self.inner.stdin(cfg);
923 self
924 }
925
926 /// Configures the standard output (stdout) for the new process.
927 ///
928 /// # Examples
929 ///
930 /// ```
931 /// use async_process::{Command, Stdio};
932 ///
933 /// let mut cmd = Command::new("ls");
934 /// cmd.stdout(Stdio::piped());
935 /// ```
936 pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
937 self.stdout = true;
938 self.inner.stdout(cfg);
939 self
940 }
941
942 /// Configures the standard error (stderr) for the new process.
943 ///
944 /// # Examples
945 ///
946 /// ```
947 /// use async_process::{Command, Stdio};
948 ///
949 /// let mut cmd = Command::new("ls");
950 /// cmd.stderr(Stdio::piped());
951 /// ```
952 pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
953 self.stderr = true;
954 self.inner.stderr(cfg);
955 self
956 }
957
958 /// Configures whether to reap the zombie process when [`Child`] is dropped.
959 ///
960 /// When the process finishes, it becomes a "zombie" and some resources associated with it
961 /// remain until [`Child::try_status()`], [`Child::status()`], or [`Child::output()`] collects
962 /// its exit code.
963 ///
964 /// If its exit code is never collected, the resources may leak forever. This crate has a
965 /// background thread named "async-process" that collects such "zombie" processes and then
966 /// "reaps" them, thus preventing the resource leaks.
967 ///
968 /// The default value of this option is `true`.
969 ///
970 /// # Examples
971 ///
972 /// ```
973 /// use async_process::{Command, Stdio};
974 ///
975 /// let mut cmd = Command::new("cat");
976 /// cmd.reap_on_drop(false);
977 /// ```
978 pub fn reap_on_drop(&mut self, reap_on_drop: bool) -> &mut Command {
979 self.reap_on_drop = reap_on_drop;
980 self
981 }
982
983 /// Configures whether to kill the process when [`Child`] is dropped.
984 ///
985 /// The default value of this option is `false`.
986 ///
987 /// # Examples
988 ///
989 /// ```
990 /// use async_process::{Command, Stdio};
991 ///
992 /// let mut cmd = Command::new("cat");
993 /// cmd.kill_on_drop(true);
994 /// ```
995 pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command {
996 self.kill_on_drop = kill_on_drop;
997 self
998 }
999
1000 /// Executes the command and returns the [`Child`] handle to it.
1001 ///
1002 /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1003 ///
1004 /// # Examples
1005 ///
1006 /// ```no_run
1007 /// # futures_lite::future::block_on(async {
1008 /// use async_process::Command;
1009 ///
1010 /// let child = Command::new("ls").spawn()?;
1011 /// # std::io::Result::Ok(()) });
1012 /// ```
1013 pub fn spawn(&mut self) -> io::Result<Child> {
1014 if !self.stdin {
1015 self.inner.stdin(Stdio::inherit());
1016 }
1017 if !self.stdout {
1018 self.inner.stdout(Stdio::inherit());
1019 }
1020 if !self.stderr {
1021 self.inner.stderr(Stdio::inherit());
1022 }
1023
1024 Child::new(self)
1025 }
1026
1027 /// Executes the command, waits for it to exit, and returns the exit status.
1028 ///
1029 /// If not configured, stdin, stdout and stderr will be set to [`Stdio::inherit()`].
1030 ///
1031 /// # Examples
1032 ///
1033 /// ```no_run
1034 /// # futures_lite::future::block_on(async {
1035 /// use async_process::Command;
1036 ///
1037 /// let status = Command::new("cp")
1038 /// .arg("a.txt")
1039 /// .arg("b.txt")
1040 /// .status()
1041 /// .await?;
1042 /// # std::io::Result::Ok(()) });
1043 /// ```
1044 pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
1045 let child = self.spawn();
1046 async { child?.status().await }
1047 }
1048
1049 /// Executes the command and collects its output.
1050 ///
1051 /// If not configured, stdin will be set to [`Stdio::null()`], and stdout and stderr will be
1052 /// set to [`Stdio::piped()`].
1053 ///
1054 /// # Examples
1055 ///
1056 /// ```no_run
1057 /// # futures_lite::future::block_on(async {
1058 /// use async_process::Command;
1059 ///
1060 /// let output = Command::new("cat")
1061 /// .arg("a.txt")
1062 /// .output()
1063 /// .await?;
1064 /// # std::io::Result::Ok(()) });
1065 /// ```
1066 pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> {
1067 if !self.stdin {
1068 self.inner.stdin(Stdio::null());
1069 }
1070 if !self.stdout {
1071 self.inner.stdout(Stdio::piped());
1072 }
1073 if !self.stderr {
1074 self.inner.stderr(Stdio::piped());
1075 }
1076
1077 let child = Child::new(self);
1078 async { child?.output().await }
1079 }
1080}
1081
1082impl From<std::process::Command> for Command {
1083 fn from(inner: std::process::Command) -> Self {
1084 Self {
1085 inner,
1086 stdin: false,
1087 stdout: false,
1088 stderr: false,
1089 reap_on_drop: true,
1090 kill_on_drop: false,
1091 }
1092 }
1093}
1094
1095impl fmt::Debug for Command {
1096 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1097 if f.alternate() {
1098 f.debug_struct("Command")
1099 .field("inner", &self.inner)
1100 .field("stdin", &self.stdin)
1101 .field("stdout", &self.stdout)
1102 .field("stderr", &self.stderr)
1103 .field("reap_on_drop", &self.reap_on_drop)
1104 .field("kill_on_drop", &self.kill_on_drop)
1105 .finish()
1106 } else {
1107 // Stdlib outputs command-line in Debug for Command. This does the
1108 // same, if not in "alternate" (long pretty-printed) mode.
1109 // This is useful for logs, for example.
1110 fmt::Debug::fmt(&self.inner, f)
1111 }
1112 }
1113}
1114
1115/// Moves `Fd` out of non-blocking mode.
1116#[cfg(unix)]
1117fn blocking_fd(fd: rustix::fd::BorrowedFd<'_>) -> io::Result<()> {
1118 cfg_if::cfg_if! {
1119 // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
1120 // for now, as with the standard library, because it seems to behave
1121 // differently depending on the platform.
1122 // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
1123 // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
1124 // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
1125 if #[cfg(target_os = "linux")] {
1126 rustix::io::ioctl_fionbio(fd, false)?;
1127 } else {
1128 let previous = rustix::fs::fcntl_getfl(fd)?;
1129 let new = previous & !rustix::fs::OFlags::NONBLOCK;
1130 if new != previous {
1131 rustix::fs::fcntl_setfl(fd, new)?;
1132 }
1133 }
1134 }
1135 Ok(())
1136}
1137
1138struct CallOnDrop<F: FnMut()>(F);
1139
1140impl<F: FnMut()> Drop for CallOnDrop<F> {
1141 fn drop(&mut self) {
1142 (self.0)();
1143 }
1144}
1145
1146#[cfg(test)]
1147mod test {
1148 #[test]
1149 fn polled_driver() {
1150 use super::{driver, Command};
1151 use futures_lite::future;
1152 use futures_lite::prelude::*;
1153
1154 let is_thread_spawned =
1155 || super::DRIVER_THREAD_SPAWNED.load(std::sync::atomic::Ordering::SeqCst);
1156
1157 #[cfg(unix)]
1158 fn command() -> Command {
1159 let mut cmd = Command::new("sh");
1160 cmd.arg("-c").arg("echo hello");
1161 cmd
1162 }
1163
1164 #[cfg(windows)]
1165 fn command() -> Command {
1166 let mut cmd = Command::new("cmd");
1167 cmd.arg("/C").arg("echo hello");
1168 cmd
1169 }
1170
1171 #[cfg(unix)]
1172 const OUTPUT: &[u8] = b"hello\n";
1173 #[cfg(windows)]
1174 const OUTPUT: &[u8] = b"hello\r\n";
1175
1176 future::block_on(async {
1177 // Thread should not be spawned off the bat.
1178 assert!(!is_thread_spawned());
1179
1180 // Spawn a driver.
1181 let mut driver1 = Box::pin(driver());
1182 future::poll_once(&mut driver1).await;
1183 assert!(!is_thread_spawned());
1184
1185 // We should be able to run the driver in parallel with a process future.
1186 async {
1187 (&mut driver1).await;
1188 }
1189 .or(async {
1190 let output = command().output().await.unwrap();
1191 assert_eq!(output.stdout, OUTPUT);
1192 })
1193 .await;
1194 assert!(!is_thread_spawned());
1195
1196 // Spawn a second driver.
1197 let mut driver2 = Box::pin(driver());
1198 future::poll_once(&mut driver2).await;
1199 assert!(!is_thread_spawned());
1200
1201 // Poll both drivers in parallel.
1202 async {
1203 (&mut driver1).await;
1204 }
1205 .or(async {
1206 (&mut driver2).await;
1207 })
1208 .or(async {
1209 let output = command().output().await.unwrap();
1210 assert_eq!(output.stdout, OUTPUT);
1211 })
1212 .await;
1213 assert!(!is_thread_spawned());
1214
1215 // Once one is dropped, the other should take over.
1216 drop(driver1);
1217 assert!(!is_thread_spawned());
1218
1219 // Poll driver2 in parallel with a process future.
1220 async {
1221 (&mut driver2).await;
1222 }
1223 .or(async {
1224 let output = command().output().await.unwrap();
1225 assert_eq!(output.stdout, OUTPUT);
1226 })
1227 .await;
1228 assert!(!is_thread_spawned());
1229
1230 // Once driver2 is dropped, the thread should not be spawned, as there are no active
1231 // child processes..
1232 drop(driver2);
1233 assert!(!is_thread_spawned());
1234
1235 // We should now be able to poll the process future independently, it will spawn the
1236 // thread.
1237 let output = command().output().await.unwrap();
1238 assert_eq!(output.stdout, OUTPUT);
1239 assert!(is_thread_spawned());
1240 });
1241 }
1242}