mio/sys/unix/
pipe.rs

1//! Unix pipe.
2//!
3//! See the [`new`] function for documentation.
4
5use std::io;
6use std::os::fd::RawFd;
7
8pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
9    let mut fds: [RawFd; 2] = [-1, -1];
10
11    #[cfg(any(
12        target_os = "android",
13        target_os = "dragonfly",
14        target_os = "freebsd",
15        target_os = "fuchsia",
16        target_os = "hurd",
17        target_os = "linux",
18        target_os = "netbsd",
19        target_os = "openbsd",
20        target_os = "illumos",
21        target_os = "redox",
22        target_os = "solaris",
23        target_os = "vita",
24    ))]
25    unsafe {
26        if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
27            return Err(io::Error::last_os_error());
28        }
29    }
30
31    #[cfg(any(
32        target_os = "aix",
33        target_os = "haiku",
34        target_os = "ios",
35        target_os = "macos",
36        target_os = "tvos",
37        target_os = "visionos",
38        target_os = "watchos",
39        target_os = "espidf",
40        target_os = "nto",
41    ))]
42    unsafe {
43        // For platforms that don't have `pipe2(2)` we need to manually set the
44        // correct flags on the file descriptor.
45        if libc::pipe(fds.as_mut_ptr()) != 0 {
46            return Err(io::Error::last_os_error());
47        }
48
49        for fd in &fds {
50            if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
51                || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
52            {
53                let err = io::Error::last_os_error();
54                // Don't leak file descriptors. Can't handle closing error though.
55                let _ = libc::close(fds[0]);
56                let _ = libc::close(fds[1]);
57                return Err(err);
58            }
59        }
60    }
61
62    Ok(fds)
63}
64
65cfg_os_ext! {
66use std::fs::File;
67use std::io::{IoSlice, IoSliceMut, Read, Write};
68use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd};
69use std::process::{ChildStderr, ChildStdin, ChildStdout};
70
71use crate::io_source::IoSource;
72use crate::{event, Interest, Registry, Token};
73
74/// Create a new non-blocking Unix pipe.
75///
76/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
77/// inter-process or thread communication channel.
78///
79/// This channel may be created before forking the process and then one end used
80/// in each process, e.g. the parent process has the sending end to send command
81/// to the child process.
82///
83/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
84///
85/// # Events
86///
87/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
88/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
89/// written to the `Sender` the `Receiver` will receive an [readable event].
90///
91/// In addition to those events, events will also be generated if the other side
92/// is dropped. To check if the `Sender` is dropped you'll need to check
93/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
94/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
95/// returns true the `Receiver` was dropped. Also see the second example below.
96///
97/// [`WRITABLE`]: Interest::WRITABLE
98/// [writable events]: event::Event::is_writable
99/// [`READABLE`]: Interest::READABLE
100/// [readable event]: event::Event::is_readable
101/// [`is_read_closed`]: event::Event::is_read_closed
102/// [`is_write_closed`]: event::Event::is_write_closed
103///
104/// # Deregistering
105///
106/// Both `Sender` and `Receiver` will deregister themselves when dropped,
107/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
108///
109/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
110///
111/// # Examples
112///
113/// Simple example that writes data into the sending end and read it from the
114/// receiving end.
115///
116/// ```
117/// use std::io::{self, Read, Write};
118///
119/// use mio::{Poll, Events, Interest, Token};
120/// use mio::unix::pipe;
121///
122/// // Unique tokens for the two ends of the channel.
123/// const PIPE_RECV: Token = Token(0);
124/// const PIPE_SEND: Token = Token(1);
125///
126/// # fn main() -> io::Result<()> {
127/// // Create our `Poll` instance and the `Events` container.
128/// let mut poll = Poll::new()?;
129/// let mut events = Events::with_capacity(8);
130///
131/// // Create a new pipe.
132/// let (mut sender, mut receiver) = pipe::new()?;
133///
134/// // Register both ends of the channel.
135/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
136/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
137///
138/// const MSG: &[u8; 11] = b"Hello world";
139///
140/// loop {
141///     poll.poll(&mut events, None)?;
142///
143///     for event in events.iter() {
144///         match event.token() {
145///             PIPE_SEND => sender.write(MSG)
146///                 .and_then(|n| if n != MSG.len() {
147///                         // We'll consider a short write an error in this
148///                         // example. NOTE: we can't use `write_all` with
149///                         // non-blocking I/O.
150///                         Err(io::ErrorKind::WriteZero.into())
151///                     } else {
152///                         Ok(())
153///                     })?,
154///             PIPE_RECV => {
155///                 let mut buf = [0; 11];
156///                 let n = receiver.read(&mut buf)?;
157///                 println!("received: {:?}", &buf[0..n]);
158///                 assert_eq!(n, MSG.len());
159///                 assert_eq!(&buf, &*MSG);
160///                 return Ok(());
161///             },
162///             _ => unreachable!(),
163///         }
164///     }
165/// }
166/// # }
167/// ```
168///
169/// Example that receives an event once the `Sender` is dropped.
170///
171/// ```
172/// # use std::io;
173/// #
174/// # use mio::{Poll, Events, Interest, Token};
175/// # use mio::unix::pipe;
176/// #
177/// # const PIPE_RECV: Token = Token(0);
178/// # const PIPE_SEND: Token = Token(1);
179/// #
180/// # fn main() -> io::Result<()> {
181/// // Same setup as in the example above.
182/// let mut poll = Poll::new()?;
183/// let mut events = Events::with_capacity(8);
184///
185/// let (mut sender, mut receiver) = pipe::new()?;
186///
187/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
188/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
189///
190/// // Drop the sender.
191/// drop(sender);
192///
193/// poll.poll(&mut events, None)?;
194///
195/// for event in events.iter() {
196///     match event.token() {
197///         PIPE_RECV if event.is_read_closed() => {
198///             // Detected that the sender was dropped.
199///             println!("Sender dropped!");
200///             return Ok(());
201///         },
202///         _ => unreachable!(),
203///     }
204/// }
205/// # unreachable!();
206/// # }
207/// ```
208pub fn new() -> io::Result<(Sender, Receiver)> {
209    let fds = new_raw()?;
210    // SAFETY: `new_raw` initialised the `fds` above.
211    let r = unsafe { Receiver::from_raw_fd(fds[0]) };
212    let w = unsafe { Sender::from_raw_fd(fds[1]) };
213    Ok((w, r))
214}
215
216/// Sending end of an Unix pipe.
217///
218/// See [`new`] for documentation, including examples.
219#[derive(Debug)]
220pub struct Sender {
221    inner: IoSource<File>,
222}
223
224impl Sender {
225    /// Set the `Sender` into or out of non-blocking mode.
226    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
227        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
228    }
229
230    /// Execute an I/O operation ensuring that the socket receives more events
231    /// if it hits a [`WouldBlock`] error.
232    ///
233    /// # Notes
234    ///
235    /// This method is required to be called for **all** I/O operations to
236    /// ensure the user will receive events once the socket is ready again after
237    /// returning a [`WouldBlock`] error.
238    ///
239    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// # use std::error::Error;
245    /// #
246    /// # fn main() -> Result<(), Box<dyn Error>> {
247    /// use std::io;
248    /// use std::os::fd::AsRawFd;
249    /// use mio::unix::pipe;
250    ///
251    /// let (sender, receiver) = pipe::new()?;
252    ///
253    /// // Wait until the sender is writable...
254    ///
255    /// // Write to the sender using a direct libc call, of course the
256    /// // `io::Write` implementation would be easier to use.
257    /// let buf = b"hello";
258    /// let n = sender.try_io(|| {
259    ///     let buf_ptr = &buf as *const _ as *const _;
260    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
261    ///     if res != -1 {
262    ///         Ok(res as usize)
263    ///     } else {
264    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
265    ///         // should return `WouldBlock` error.
266    ///         Err(io::Error::last_os_error())
267    ///     }
268    /// })?;
269    /// eprintln!("write {} bytes", n);
270    ///
271    /// // Wait until the receiver is readable...
272    ///
273    /// // Read from the receiver using a direct libc call, of course the
274    /// // `io::Read` implementation would be easier to use.
275    /// let mut buf = [0; 512];
276    /// let n = receiver.try_io(|| {
277    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
278    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
279    ///     if res != -1 {
280    ///         Ok(res as usize)
281    ///     } else {
282    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
283    ///         // should return `WouldBlock` error.
284    ///         Err(io::Error::last_os_error())
285    ///     }
286    /// })?;
287    /// eprintln!("read {} bytes", n);
288    /// # Ok(())
289    /// # }
290    /// ```
291    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
292    where
293        F: FnOnce() -> io::Result<T>,
294    {
295        self.inner.do_io(|_| f())
296    }
297}
298
299impl event::Source for Sender {
300    fn register(
301        &mut self,
302        registry: &Registry,
303        token: Token,
304        interests: Interest,
305    ) -> io::Result<()> {
306        self.inner.register(registry, token, interests)
307    }
308
309    fn reregister(
310        &mut self,
311        registry: &Registry,
312        token: Token,
313        interests: Interest,
314    ) -> io::Result<()> {
315        self.inner.reregister(registry, token, interests)
316    }
317
318    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
319        self.inner.deregister(registry)
320    }
321}
322
323impl Write for Sender {
324    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
325        self.inner.do_io(|mut sender| sender.write(buf))
326    }
327
328    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
329        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
330    }
331
332    fn flush(&mut self) -> io::Result<()> {
333        self.inner.do_io(|mut sender| sender.flush())
334    }
335}
336
337impl Write for &Sender {
338    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
339        self.inner.do_io(|mut sender| sender.write(buf))
340    }
341
342    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
343        self.inner.do_io(|mut sender| sender.write_vectored(bufs))
344    }
345
346    fn flush(&mut self) -> io::Result<()> {
347        self.inner.do_io(|mut sender| sender.flush())
348    }
349}
350
351/// # Notes
352///
353/// The underlying pipe is **not** set to non-blocking.
354impl From<ChildStdin> for Sender {
355    fn from(stdin: ChildStdin) -> Sender {
356        // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
357        unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
358    }
359}
360
361impl FromRawFd for Sender {
362    unsafe fn from_raw_fd(fd: RawFd) -> Sender {
363        Sender {
364            inner: IoSource::new(File::from_raw_fd(fd)),
365        }
366    }
367}
368
369impl AsRawFd for Sender {
370    fn as_raw_fd(&self) -> RawFd {
371        self.inner.as_raw_fd()
372    }
373}
374
375impl IntoRawFd for Sender {
376    fn into_raw_fd(self) -> RawFd {
377        self.inner.into_inner().into_raw_fd()
378    }
379}
380
381impl From<Sender> for OwnedFd {
382    fn from(sender: Sender) -> Self {
383        sender.inner.into_inner().into()
384    }
385}
386
387impl AsFd for Sender {
388    fn as_fd(&self) -> BorrowedFd<'_> {
389        self.inner.as_fd()
390    }
391}
392
393impl From<OwnedFd> for Sender {
394    fn from(fd: OwnedFd) -> Self {
395        Sender {
396            inner: IoSource::new(File::from(fd)),
397        }
398    }
399}
400
401/// Receiving end of an Unix pipe.
402///
403/// See [`new`] for documentation, including examples.
404#[derive(Debug)]
405pub struct Receiver {
406    inner: IoSource<File>,
407}
408
409impl Receiver {
410    /// Set the `Receiver` into or out of non-blocking mode.
411    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
412        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
413    }
414
415    /// Execute an I/O operation ensuring that the socket receives more events
416    /// if it hits a [`WouldBlock`] error.
417    ///
418    /// # Notes
419    ///
420    /// This method is required to be called for **all** I/O operations to
421    /// ensure the user will receive events once the socket is ready again after
422    /// returning a [`WouldBlock`] error.
423    ///
424    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
425    ///
426    /// # Examples
427    ///
428    /// ```
429    /// # use std::error::Error;
430    /// #
431    /// # fn main() -> Result<(), Box<dyn Error>> {
432    /// use std::io;
433    /// use std::os::fd::AsRawFd;
434    /// use mio::unix::pipe;
435    ///
436    /// let (sender, receiver) = pipe::new()?;
437    ///
438    /// // Wait until the sender is writable...
439    ///
440    /// // Write to the sender using a direct libc call, of course the
441    /// // `io::Write` implementation would be easier to use.
442    /// let buf = b"hello";
443    /// let n = sender.try_io(|| {
444    ///     let buf_ptr = &buf as *const _ as *const _;
445    ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
446    ///     if res != -1 {
447    ///         Ok(res as usize)
448    ///     } else {
449    ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
450    ///         // should return `WouldBlock` error.
451    ///         Err(io::Error::last_os_error())
452    ///     }
453    /// })?;
454    /// eprintln!("write {} bytes", n);
455    ///
456    /// // Wait until the receiver is readable...
457    ///
458    /// // Read from the receiver using a direct libc call, of course the
459    /// // `io::Read` implementation would be easier to use.
460    /// let mut buf = [0; 512];
461    /// let n = receiver.try_io(|| {
462    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
463    ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
464    ///     if res != -1 {
465    ///         Ok(res as usize)
466    ///     } else {
467    ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
468    ///         // should return `WouldBlock` error.
469    ///         Err(io::Error::last_os_error())
470    ///     }
471    /// })?;
472    /// eprintln!("read {} bytes", n);
473    /// # Ok(())
474    /// # }
475    /// ```
476    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
477    where
478        F: FnOnce() -> io::Result<T>,
479    {
480        self.inner.do_io(|_| f())
481    }
482}
483
484impl event::Source for Receiver {
485    fn register(
486        &mut self,
487        registry: &Registry,
488        token: Token,
489        interests: Interest,
490    ) -> io::Result<()> {
491        self.inner.register(registry, token, interests)
492    }
493
494    fn reregister(
495        &mut self,
496        registry: &Registry,
497        token: Token,
498        interests: Interest,
499    ) -> io::Result<()> {
500        self.inner.reregister(registry, token, interests)
501    }
502
503    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
504        self.inner.deregister(registry)
505    }
506}
507
508impl Read for Receiver {
509    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
510        self.inner.do_io(|mut sender| sender.read(buf))
511    }
512
513    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
514        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
515    }
516}
517
518impl Read for &Receiver {
519    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
520        self.inner.do_io(|mut sender| sender.read(buf))
521    }
522
523    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
524        self.inner.do_io(|mut sender| sender.read_vectored(bufs))
525    }
526}
527
528/// # Notes
529///
530/// The underlying pipe is **not** set to non-blocking.
531impl From<ChildStdout> for Receiver {
532    fn from(stdout: ChildStdout) -> Receiver {
533        // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
534        unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
535    }
536}
537
538/// # Notes
539///
540/// The underlying pipe is **not** set to non-blocking.
541impl From<ChildStderr> for Receiver {
542    fn from(stderr: ChildStderr) -> Receiver {
543        // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
544        unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
545    }
546}
547
548impl IntoRawFd for Receiver {
549    fn into_raw_fd(self) -> RawFd {
550        self.inner.into_inner().into_raw_fd()
551    }
552}
553
554impl AsRawFd for Receiver {
555    fn as_raw_fd(&self) -> RawFd {
556        self.inner.as_raw_fd()
557    }
558}
559
560impl FromRawFd for Receiver {
561    unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
562        Receiver {
563            inner: IoSource::new(File::from_raw_fd(fd)),
564        }
565    }
566}
567
568impl From<Receiver> for OwnedFd {
569    fn from(receiver: Receiver) -> Self {
570        receiver.inner.into_inner().into()
571    }
572}
573
574impl AsFd for Receiver {
575    fn as_fd(&self) -> BorrowedFd<'_> {
576        self.inner.as_fd()
577    }
578}
579
580impl From<OwnedFd> for Receiver {
581    fn from(fd: OwnedFd) -> Self {
582        Receiver {
583            inner: IoSource::new(File::from(fd)),
584        }
585    }
586}
587
588#[cfg(not(any(target_os = "illumos", target_os = "solaris", target_os = "vita")))]
589fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
590    let value = nonblocking as libc::c_int;
591    if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
592        Err(io::Error::last_os_error())
593    } else {
594        Ok(())
595    }
596}
597
598#[cfg(any(target_os = "illumos", target_os = "solaris", target_os = "vita"))]
599fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
600    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
601    if flags < 0 {
602        return Err(io::Error::last_os_error());
603    }
604
605    let nflags = if nonblocking {
606        flags | libc::O_NONBLOCK
607    } else {
608        flags & !libc::O_NONBLOCK
609    };
610
611    if flags != nflags {
612        if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
613            return Err(io::Error::last_os_error());
614        }
615    }
616
617    Ok(())
618}
619} // `cfg_os_ext!`.