mio/sys/unix/pipe.rs
1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::fd::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9 let mut fds: [RawFd; 2] = [-1, -1];
10
11 #[cfg(any(
12 target_os = "android",
13 target_os = "dragonfly",
14 target_os = "freebsd",
15 target_os = "fuchsia",
16 target_os = "hurd",
17 target_os = "linux",
18 target_os = "netbsd",
19 target_os = "openbsd",
20 target_os = "illumos",
21 target_os = "redox",
22 target_os = "solaris",
23 target_os = "vita",
24 ))]
25 unsafe {
26 if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
27 return Err(io::Error::last_os_error());
28 }
29 }
30
31 #[cfg(any(
32 target_os = "aix",
33 target_os = "haiku",
34 target_os = "ios",
35 target_os = "macos",
36 target_os = "tvos",
37 target_os = "visionos",
38 target_os = "watchos",
39 target_os = "espidf",
40 target_os = "nto",
41 ))]
42 unsafe {
43 // For platforms that don't have `pipe2(2)` we need to manually set the
44 // correct flags on the file descriptor.
45 if libc::pipe(fds.as_mut_ptr()) != 0 {
46 return Err(io::Error::last_os_error());
47 }
48
49 for fd in &fds {
50 if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
51 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
52 {
53 let err = io::Error::last_os_error();
54 // Don't leak file descriptors. Can't handle closing error though.
55 let _ = libc::close(fds[0]);
56 let _ = libc::close(fds[1]);
57 return Err(err);
58 }
59 }
60 }
61
62 Ok(fds)
63}
64
65cfg_os_ext! {
66use std::fs::File;
67use std::io::{IoSlice, IoSliceMut, Read, Write};
68use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd};
69use std::process::{ChildStderr, ChildStdin, ChildStdout};
70
71use crate::io_source::IoSource;
72use crate::{event, Interest, Registry, Token};
73
74/// Create a new non-blocking Unix pipe.
75///
76/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
77/// inter-process or thread communication channel.
78///
79/// This channel may be created before forking the process and then one end used
80/// in each process, e.g. the parent process has the sending end to send command
81/// to the child process.
82///
83/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
84///
85/// # Events
86///
87/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
88/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
89/// written to the `Sender` the `Receiver` will receive an [readable event].
90///
91/// In addition to those events, events will also be generated if the other side
92/// is dropped. To check if the `Sender` is dropped you'll need to check
93/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
94/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
95/// returns true the `Receiver` was dropped. Also see the second example below.
96///
97/// [`WRITABLE`]: Interest::WRITABLE
98/// [writable events]: event::Event::is_writable
99/// [`READABLE`]: Interest::READABLE
100/// [readable event]: event::Event::is_readable
101/// [`is_read_closed`]: event::Event::is_read_closed
102/// [`is_write_closed`]: event::Event::is_write_closed
103///
104/// # Deregistering
105///
106/// Both `Sender` and `Receiver` will deregister themselves when dropped,
107/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
108///
109/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
110///
111/// # Examples
112///
113/// Simple example that writes data into the sending end and read it from the
114/// receiving end.
115///
116/// ```
117/// use std::io::{self, Read, Write};
118///
119/// use mio::{Poll, Events, Interest, Token};
120/// use mio::unix::pipe;
121///
122/// // Unique tokens for the two ends of the channel.
123/// const PIPE_RECV: Token = Token(0);
124/// const PIPE_SEND: Token = Token(1);
125///
126/// # fn main() -> io::Result<()> {
127/// // Create our `Poll` instance and the `Events` container.
128/// let mut poll = Poll::new()?;
129/// let mut events = Events::with_capacity(8);
130///
131/// // Create a new pipe.
132/// let (mut sender, mut receiver) = pipe::new()?;
133///
134/// // Register both ends of the channel.
135/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
136/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
137///
138/// const MSG: &[u8; 11] = b"Hello world";
139///
140/// loop {
141/// poll.poll(&mut events, None)?;
142///
143/// for event in events.iter() {
144/// match event.token() {
145/// PIPE_SEND => sender.write(MSG)
146/// .and_then(|n| if n != MSG.len() {
147/// // We'll consider a short write an error in this
148/// // example. NOTE: we can't use `write_all` with
149/// // non-blocking I/O.
150/// Err(io::ErrorKind::WriteZero.into())
151/// } else {
152/// Ok(())
153/// })?,
154/// PIPE_RECV => {
155/// let mut buf = [0; 11];
156/// let n = receiver.read(&mut buf)?;
157/// println!("received: {:?}", &buf[0..n]);
158/// assert_eq!(n, MSG.len());
159/// assert_eq!(&buf, &*MSG);
160/// return Ok(());
161/// },
162/// _ => unreachable!(),
163/// }
164/// }
165/// }
166/// # }
167/// ```
168///
169/// Example that receives an event once the `Sender` is dropped.
170///
171/// ```
172/// # use std::io;
173/// #
174/// # use mio::{Poll, Events, Interest, Token};
175/// # use mio::unix::pipe;
176/// #
177/// # const PIPE_RECV: Token = Token(0);
178/// # const PIPE_SEND: Token = Token(1);
179/// #
180/// # fn main() -> io::Result<()> {
181/// // Same setup as in the example above.
182/// let mut poll = Poll::new()?;
183/// let mut events = Events::with_capacity(8);
184///
185/// let (mut sender, mut receiver) = pipe::new()?;
186///
187/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
188/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
189///
190/// // Drop the sender.
191/// drop(sender);
192///
193/// poll.poll(&mut events, None)?;
194///
195/// for event in events.iter() {
196/// match event.token() {
197/// PIPE_RECV if event.is_read_closed() => {
198/// // Detected that the sender was dropped.
199/// println!("Sender dropped!");
200/// return Ok(());
201/// },
202/// _ => unreachable!(),
203/// }
204/// }
205/// # unreachable!();
206/// # }
207/// ```
208pub fn new() -> io::Result<(Sender, Receiver)> {
209 let fds = new_raw()?;
210 // SAFETY: `new_raw` initialised the `fds` above.
211 let r = unsafe { Receiver::from_raw_fd(fds[0]) };
212 let w = unsafe { Sender::from_raw_fd(fds[1]) };
213 Ok((w, r))
214}
215
216/// Sending end of an Unix pipe.
217///
218/// See [`new`] for documentation, including examples.
219#[derive(Debug)]
220pub struct Sender {
221 inner: IoSource<File>,
222}
223
224impl Sender {
225 /// Set the `Sender` into or out of non-blocking mode.
226 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
227 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
228 }
229
230 /// Execute an I/O operation ensuring that the socket receives more events
231 /// if it hits a [`WouldBlock`] error.
232 ///
233 /// # Notes
234 ///
235 /// This method is required to be called for **all** I/O operations to
236 /// ensure the user will receive events once the socket is ready again after
237 /// returning a [`WouldBlock`] error.
238 ///
239 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
240 ///
241 /// # Examples
242 ///
243 /// ```
244 /// # use std::error::Error;
245 /// #
246 /// # fn main() -> Result<(), Box<dyn Error>> {
247 /// use std::io;
248 /// use std::os::fd::AsRawFd;
249 /// use mio::unix::pipe;
250 ///
251 /// let (sender, receiver) = pipe::new()?;
252 ///
253 /// // Wait until the sender is writable...
254 ///
255 /// // Write to the sender using a direct libc call, of course the
256 /// // `io::Write` implementation would be easier to use.
257 /// let buf = b"hello";
258 /// let n = sender.try_io(|| {
259 /// let buf_ptr = &buf as *const _ as *const _;
260 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
261 /// if res != -1 {
262 /// Ok(res as usize)
263 /// } else {
264 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
265 /// // should return `WouldBlock` error.
266 /// Err(io::Error::last_os_error())
267 /// }
268 /// })?;
269 /// eprintln!("write {} bytes", n);
270 ///
271 /// // Wait until the receiver is readable...
272 ///
273 /// // Read from the receiver using a direct libc call, of course the
274 /// // `io::Read` implementation would be easier to use.
275 /// let mut buf = [0; 512];
276 /// let n = receiver.try_io(|| {
277 /// let buf_ptr = &mut buf as *mut _ as *mut _;
278 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
279 /// if res != -1 {
280 /// Ok(res as usize)
281 /// } else {
282 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
283 /// // should return `WouldBlock` error.
284 /// Err(io::Error::last_os_error())
285 /// }
286 /// })?;
287 /// eprintln!("read {} bytes", n);
288 /// # Ok(())
289 /// # }
290 /// ```
291 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
292 where
293 F: FnOnce() -> io::Result<T>,
294 {
295 self.inner.do_io(|_| f())
296 }
297}
298
299impl event::Source for Sender {
300 fn register(
301 &mut self,
302 registry: &Registry,
303 token: Token,
304 interests: Interest,
305 ) -> io::Result<()> {
306 self.inner.register(registry, token, interests)
307 }
308
309 fn reregister(
310 &mut self,
311 registry: &Registry,
312 token: Token,
313 interests: Interest,
314 ) -> io::Result<()> {
315 self.inner.reregister(registry, token, interests)
316 }
317
318 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
319 self.inner.deregister(registry)
320 }
321}
322
323impl Write for Sender {
324 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
325 self.inner.do_io(|mut sender| sender.write(buf))
326 }
327
328 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
329 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
330 }
331
332 fn flush(&mut self) -> io::Result<()> {
333 self.inner.do_io(|mut sender| sender.flush())
334 }
335}
336
337impl Write for &Sender {
338 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
339 self.inner.do_io(|mut sender| sender.write(buf))
340 }
341
342 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
343 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
344 }
345
346 fn flush(&mut self) -> io::Result<()> {
347 self.inner.do_io(|mut sender| sender.flush())
348 }
349}
350
351/// # Notes
352///
353/// The underlying pipe is **not** set to non-blocking.
354impl From<ChildStdin> for Sender {
355 fn from(stdin: ChildStdin) -> Sender {
356 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
357 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
358 }
359}
360
361impl FromRawFd for Sender {
362 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
363 Sender {
364 inner: IoSource::new(File::from_raw_fd(fd)),
365 }
366 }
367}
368
369impl AsRawFd for Sender {
370 fn as_raw_fd(&self) -> RawFd {
371 self.inner.as_raw_fd()
372 }
373}
374
375impl IntoRawFd for Sender {
376 fn into_raw_fd(self) -> RawFd {
377 self.inner.into_inner().into_raw_fd()
378 }
379}
380
381impl From<Sender> for OwnedFd {
382 fn from(sender: Sender) -> Self {
383 sender.inner.into_inner().into()
384 }
385}
386
387impl AsFd for Sender {
388 fn as_fd(&self) -> BorrowedFd<'_> {
389 self.inner.as_fd()
390 }
391}
392
393impl From<OwnedFd> for Sender {
394 fn from(fd: OwnedFd) -> Self {
395 Sender {
396 inner: IoSource::new(File::from(fd)),
397 }
398 }
399}
400
401/// Receiving end of an Unix pipe.
402///
403/// See [`new`] for documentation, including examples.
404#[derive(Debug)]
405pub struct Receiver {
406 inner: IoSource<File>,
407}
408
409impl Receiver {
410 /// Set the `Receiver` into or out of non-blocking mode.
411 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
412 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
413 }
414
415 /// Execute an I/O operation ensuring that the socket receives more events
416 /// if it hits a [`WouldBlock`] error.
417 ///
418 /// # Notes
419 ///
420 /// This method is required to be called for **all** I/O operations to
421 /// ensure the user will receive events once the socket is ready again after
422 /// returning a [`WouldBlock`] error.
423 ///
424 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
425 ///
426 /// # Examples
427 ///
428 /// ```
429 /// # use std::error::Error;
430 /// #
431 /// # fn main() -> Result<(), Box<dyn Error>> {
432 /// use std::io;
433 /// use std::os::fd::AsRawFd;
434 /// use mio::unix::pipe;
435 ///
436 /// let (sender, receiver) = pipe::new()?;
437 ///
438 /// // Wait until the sender is writable...
439 ///
440 /// // Write to the sender using a direct libc call, of course the
441 /// // `io::Write` implementation would be easier to use.
442 /// let buf = b"hello";
443 /// let n = sender.try_io(|| {
444 /// let buf_ptr = &buf as *const _ as *const _;
445 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
446 /// if res != -1 {
447 /// Ok(res as usize)
448 /// } else {
449 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
450 /// // should return `WouldBlock` error.
451 /// Err(io::Error::last_os_error())
452 /// }
453 /// })?;
454 /// eprintln!("write {} bytes", n);
455 ///
456 /// // Wait until the receiver is readable...
457 ///
458 /// // Read from the receiver using a direct libc call, of course the
459 /// // `io::Read` implementation would be easier to use.
460 /// let mut buf = [0; 512];
461 /// let n = receiver.try_io(|| {
462 /// let buf_ptr = &mut buf as *mut _ as *mut _;
463 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
464 /// if res != -1 {
465 /// Ok(res as usize)
466 /// } else {
467 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
468 /// // should return `WouldBlock` error.
469 /// Err(io::Error::last_os_error())
470 /// }
471 /// })?;
472 /// eprintln!("read {} bytes", n);
473 /// # Ok(())
474 /// # }
475 /// ```
476 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
477 where
478 F: FnOnce() -> io::Result<T>,
479 {
480 self.inner.do_io(|_| f())
481 }
482}
483
484impl event::Source for Receiver {
485 fn register(
486 &mut self,
487 registry: &Registry,
488 token: Token,
489 interests: Interest,
490 ) -> io::Result<()> {
491 self.inner.register(registry, token, interests)
492 }
493
494 fn reregister(
495 &mut self,
496 registry: &Registry,
497 token: Token,
498 interests: Interest,
499 ) -> io::Result<()> {
500 self.inner.reregister(registry, token, interests)
501 }
502
503 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
504 self.inner.deregister(registry)
505 }
506}
507
508impl Read for Receiver {
509 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
510 self.inner.do_io(|mut sender| sender.read(buf))
511 }
512
513 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
514 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
515 }
516}
517
518impl Read for &Receiver {
519 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
520 self.inner.do_io(|mut sender| sender.read(buf))
521 }
522
523 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
524 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
525 }
526}
527
528/// # Notes
529///
530/// The underlying pipe is **not** set to non-blocking.
531impl From<ChildStdout> for Receiver {
532 fn from(stdout: ChildStdout) -> Receiver {
533 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
534 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
535 }
536}
537
538/// # Notes
539///
540/// The underlying pipe is **not** set to non-blocking.
541impl From<ChildStderr> for Receiver {
542 fn from(stderr: ChildStderr) -> Receiver {
543 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
544 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
545 }
546}
547
548impl IntoRawFd for Receiver {
549 fn into_raw_fd(self) -> RawFd {
550 self.inner.into_inner().into_raw_fd()
551 }
552}
553
554impl AsRawFd for Receiver {
555 fn as_raw_fd(&self) -> RawFd {
556 self.inner.as_raw_fd()
557 }
558}
559
560impl FromRawFd for Receiver {
561 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
562 Receiver {
563 inner: IoSource::new(File::from_raw_fd(fd)),
564 }
565 }
566}
567
568impl From<Receiver> for OwnedFd {
569 fn from(receiver: Receiver) -> Self {
570 receiver.inner.into_inner().into()
571 }
572}
573
574impl AsFd for Receiver {
575 fn as_fd(&self) -> BorrowedFd<'_> {
576 self.inner.as_fd()
577 }
578}
579
580impl From<OwnedFd> for Receiver {
581 fn from(fd: OwnedFd) -> Self {
582 Receiver {
583 inner: IoSource::new(File::from(fd)),
584 }
585 }
586}
587
588#[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
589fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
590 let value = nonblocking as libc::c_int;
591 if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
592 Err(io::Error::last_os_error())
593 } else {
594 Ok(())
595 }
596}
597
598#[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
599fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
600 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
601 if flags < 0 {
602 return Err(io::Error::last_os_error());
603 }
604
605 let nflags = if nonblocking {
606 flags | libc::O_NONBLOCK
607 } else {
608 flags & !libc::O_NONBLOCK
609 };
610
611 if flags != nflags {
612 if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
613 return Err(io::Error::last_os_error());
614 }
615 }
616
617 Ok(())
618}
619} // `cfg_os_ext!`.