tokio/net/unix/
stream.rs

1use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::split::{split, ReadHalf, WriteHalf};
3use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
4use crate::net::unix::ucred::{self, UCred};
5use crate::net::unix::SocketAddr;
6use crate::util::check_socket_for_blocking;
7
8use std::fmt;
9use std::future::poll_fn;
10use std::io::{self, Read, Write};
11use std::net::Shutdown;
12#[cfg(target_os = "android")]
13use std::os::android::net::SocketAddrExt;
14#[cfg(target_os = "linux")]
15use std::os::linux::net::SocketAddrExt;
16#[cfg(any(target_os = "linux", target_os = "android"))]
17use std::os::unix::ffi::OsStrExt;
18use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
19use std::os::unix::net::{self, SocketAddr as StdSocketAddr};
20use std::path::Path;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23
24cfg_io_util! {
25    use bytes::BufMut;
26}
27
28cfg_net_unix! {
29    /// A structure representing a connected Unix socket.
30    ///
31    /// This socket can be connected directly with [`UnixStream::connect`] or accepted
32    /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
33    /// anonymous Unix sockets can be created with `UnixStream::pair`.
34    ///
35    /// To shut down the stream in the write direction, you can call the
36    /// [`shutdown()`] method. This will cause the other peer to receive a read of
37    /// length 0, indicating that no more data will be sent. This only closes
38    /// the stream in one direction.
39    ///
40    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
41    /// [`UnixListener::accept`]: crate::net::UnixListener::accept
42    #[cfg_attr(docsrs, doc(alias = "uds"))]
43    pub struct UnixStream {
44        io: PollEvented<mio::net::UnixStream>,
45    }
46}
47
48impl UnixStream {
49    pub(crate) async fn connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream> {
50        let stream = UnixStream::new(sys)?;
51
52        // Once we've connected, wait for the stream to be writable as
53        // that's when the actual connection has been initiated. Once we're
54        // writable we check for `take_socket_error` to see if the connect
55        // actually hit an error or not.
56        //
57        // If all that succeeded then we ship everything on up.
58        poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
59
60        if let Some(e) = stream.io.take_error()? {
61            return Err(e);
62        }
63
64        Ok(stream)
65    }
66
67    /// Connects to the socket named by `path`.
68    ///
69    /// This function will create a new Unix socket and connect to the path
70    /// specified, associating the returned stream with the default event loop's
71    /// handle.
72    pub async fn connect<P>(path: P) -> io::Result<UnixStream>
73    where
74        P: AsRef<Path>,
75    {
76        // On linux, abstract socket paths need to be considered.
77        #[cfg(any(target_os = "linux", target_os = "android"))]
78        let addr = {
79            let os_str_bytes = path.as_ref().as_os_str().as_bytes();
80            if os_str_bytes.starts_with(b"\0") {
81                StdSocketAddr::from_abstract_name(&os_str_bytes[1..])?
82            } else {
83                StdSocketAddr::from_pathname(path)?
84            }
85        };
86        #[cfg(not(any(target_os = "linux", target_os = "android")))]
87        let addr = StdSocketAddr::from_pathname(path)?;
88
89        let stream = mio::net::UnixStream::connect_addr(&addr)?;
90        let stream = UnixStream::new(stream)?;
91
92        poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
93
94        if let Some(e) = stream.io.take_error()? {
95            return Err(e);
96        }
97
98        Ok(stream)
99    }
100
101    /// Waits for any of the requested ready states.
102    ///
103    /// This function is usually paired with `try_read()` or `try_write()`. It
104    /// can be used to concurrently read / write to the same socket on a single
105    /// task without splitting the socket.
106    ///
107    /// The function may complete without the socket being ready. This is a
108    /// false-positive and attempting an operation will return with
109    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
110    /// [`Ready`] set, so you should always check the returned value and possibly
111    /// wait again if the requested states are not set.
112    ///
113    /// # Cancel safety
114    ///
115    /// This method is cancel safe. Once a readiness event occurs, the method
116    /// will continue to return immediately until the readiness event is
117    /// consumed by an attempt to read or write that fails with `WouldBlock` or
118    /// `Poll::Pending`.
119    ///
120    /// # Examples
121    ///
122    /// Concurrently read and write to the stream on the same task without
123    /// splitting.
124    ///
125    /// ```no_run
126    /// use tokio::io::Interest;
127    /// use tokio::net::UnixStream;
128    /// use std::error::Error;
129    /// use std::io;
130    ///
131    /// #[tokio::main]
132    /// async fn main() -> Result<(), Box<dyn Error>> {
133    ///     let dir = tempfile::tempdir().unwrap();
134    ///     let bind_path = dir.path().join("bind_path");
135    ///     let stream = UnixStream::connect(bind_path).await?;
136    ///
137    ///     loop {
138    ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
139    ///
140    ///         if ready.is_readable() {
141    ///             let mut data = vec![0; 1024];
142    ///             // Try to read data, this may still fail with `WouldBlock`
143    ///             // if the readiness event is a false positive.
144    ///             match stream.try_read(&mut data) {
145    ///                 Ok(n) => {
146    ///                     println!("read {} bytes", n);
147    ///                 }
148    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
149    ///                     continue;
150    ///                 }
151    ///                 Err(e) => {
152    ///                     return Err(e.into());
153    ///                 }
154    ///             }
155    ///
156    ///         }
157    ///
158    ///         if ready.is_writable() {
159    ///             // Try to write data, this may still fail with `WouldBlock`
160    ///             // if the readiness event is a false positive.
161    ///             match stream.try_write(b"hello world") {
162    ///                 Ok(n) => {
163    ///                     println!("write {} bytes", n);
164    ///                 }
165    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
166    ///                     continue;
167    ///                 }
168    ///                 Err(e) => {
169    ///                     return Err(e.into());
170    ///                 }
171    ///             }
172    ///         }
173    ///     }
174    /// }
175    /// ```
176    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
177        let event = self.io.registration().readiness(interest).await?;
178        Ok(event.ready)
179    }
180
181    /// Waits for the socket to become readable.
182    ///
183    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
184    /// paired with `try_read()`.
185    ///
186    /// # Cancel safety
187    ///
188    /// This method is cancel safe. Once a readiness event occurs, the method
189    /// will continue to return immediately until the readiness event is
190    /// consumed by an attempt to read that fails with `WouldBlock` or
191    /// `Poll::Pending`.
192    ///
193    /// # Examples
194    ///
195    /// ```no_run
196    /// use tokio::net::UnixStream;
197    /// use std::error::Error;
198    /// use std::io;
199    ///
200    /// #[tokio::main]
201    /// async fn main() -> Result<(), Box<dyn Error>> {
202    ///     // Connect to a peer
203    ///     let dir = tempfile::tempdir().unwrap();
204    ///     let bind_path = dir.path().join("bind_path");
205    ///     let stream = UnixStream::connect(bind_path).await?;
206    ///
207    ///     let mut msg = vec![0; 1024];
208    ///
209    ///     loop {
210    ///         // Wait for the socket to be readable
211    ///         stream.readable().await?;
212    ///
213    ///         // Try to read data, this may still fail with `WouldBlock`
214    ///         // if the readiness event is a false positive.
215    ///         match stream.try_read(&mut msg) {
216    ///             Ok(n) => {
217    ///                 msg.truncate(n);
218    ///                 break;
219    ///             }
220    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
221    ///                 continue;
222    ///             }
223    ///             Err(e) => {
224    ///                 return Err(e.into());
225    ///             }
226    ///         }
227    ///     }
228    ///
229    ///     println!("GOT = {:?}", msg);
230    ///     Ok(())
231    /// }
232    /// ```
233    pub async fn readable(&self) -> io::Result<()> {
234        self.ready(Interest::READABLE).await?;
235        Ok(())
236    }
237
238    /// Polls for read readiness.
239    ///
240    /// If the unix stream is not currently ready for reading, this method will
241    /// store a clone of the `Waker` from the provided `Context`. When the unix
242    /// stream becomes ready for reading, `Waker::wake` will be called on the
243    /// waker.
244    ///
245    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
246    /// the `Waker` from the `Context` passed to the most recent call is
247    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
248    /// second, independent waker.)
249    ///
250    /// This function is intended for cases where creating and pinning a future
251    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
252    /// preferred, as this supports polling from multiple tasks at once.
253    ///
254    /// # Return value
255    ///
256    /// The function returns:
257    ///
258    /// * `Poll::Pending` if the unix stream is not ready for reading.
259    /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
260    /// * `Poll::Ready(Err(e))` if an error is encountered.
261    ///
262    /// # Errors
263    ///
264    /// This function may encounter any standard I/O error except `WouldBlock`.
265    ///
266    /// [`readable`]: method@Self::readable
267    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
268        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
269    }
270
271    /// Try to read data from the stream into the provided buffer, returning how
272    /// many bytes were read.
273    ///
274    /// Receives any pending data from the socket but does not wait for new data
275    /// to arrive. On success, returns the number of bytes read. Because
276    /// `try_read()` is non-blocking, the buffer does not have to be stored by
277    /// the async task and can exist entirely on the stack.
278    ///
279    /// Usually, [`readable()`] or [`ready()`] is used with this function.
280    ///
281    /// [`readable()`]: UnixStream::readable()
282    /// [`ready()`]: UnixStream::ready()
283    ///
284    /// # Return
285    ///
286    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
287    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
288    ///
289    /// 1. The stream's read half is closed and will no longer yield data.
290    /// 2. The specified buffer was 0 bytes in length.
291    ///
292    /// If the stream is not ready to read data,
293    /// `Err(io::ErrorKind::WouldBlock)` is returned.
294    ///
295    /// # Examples
296    ///
297    /// ```no_run
298    /// use tokio::net::UnixStream;
299    /// use std::error::Error;
300    /// use std::io;
301    ///
302    /// #[tokio::main]
303    /// async fn main() -> Result<(), Box<dyn Error>> {
304    ///     // Connect to a peer
305    ///     let dir = tempfile::tempdir().unwrap();
306    ///     let bind_path = dir.path().join("bind_path");
307    ///     let stream = UnixStream::connect(bind_path).await?;
308    ///
309    ///     loop {
310    ///         // Wait for the socket to be readable
311    ///         stream.readable().await?;
312    ///
313    ///         // Creating the buffer **after** the `await` prevents it from
314    ///         // being stored in the async task.
315    ///         let mut buf = [0; 4096];
316    ///
317    ///         // Try to read data, this may still fail with `WouldBlock`
318    ///         // if the readiness event is a false positive.
319    ///         match stream.try_read(&mut buf) {
320    ///             Ok(0) => break,
321    ///             Ok(n) => {
322    ///                 println!("read {} bytes", n);
323    ///             }
324    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
325    ///                 continue;
326    ///             }
327    ///             Err(e) => {
328    ///                 return Err(e.into());
329    ///             }
330    ///         }
331    ///     }
332    ///
333    ///     Ok(())
334    /// }
335    /// ```
336    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
337        self.io
338            .registration()
339            .try_io(Interest::READABLE, || (&*self.io).read(buf))
340    }
341
342    /// Tries to read data from the stream into the provided buffers, returning
343    /// how many bytes were read.
344    ///
345    /// Data is copied to fill each buffer in order, with the final buffer
346    /// written to possibly being only partially filled. This method behaves
347    /// equivalently to a single call to [`try_read()`] with concatenated
348    /// buffers.
349    ///
350    /// Receives any pending data from the socket but does not wait for new data
351    /// to arrive. On success, returns the number of bytes read. Because
352    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
353    /// stored by the async task and can exist entirely on the stack.
354    ///
355    /// Usually, [`readable()`] or [`ready()`] is used with this function.
356    ///
357    /// [`try_read()`]: UnixStream::try_read()
358    /// [`readable()`]: UnixStream::readable()
359    /// [`ready()`]: UnixStream::ready()
360    ///
361    /// # Return
362    ///
363    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
364    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
365    /// and will no longer yield data. If the stream is not ready to read data
366    /// `Err(io::ErrorKind::WouldBlock)` is returned.
367    ///
368    /// # Examples
369    ///
370    /// ```no_run
371    /// use tokio::net::UnixStream;
372    /// use std::error::Error;
373    /// use std::io::{self, IoSliceMut};
374    ///
375    /// #[tokio::main]
376    /// async fn main() -> Result<(), Box<dyn Error>> {
377    ///     // Connect to a peer
378    ///     let dir = tempfile::tempdir().unwrap();
379    ///     let bind_path = dir.path().join("bind_path");
380    ///     let stream = UnixStream::connect(bind_path).await?;
381    ///
382    ///     loop {
383    ///         // Wait for the socket to be readable
384    ///         stream.readable().await?;
385    ///
386    ///         // Creating the buffer **after** the `await` prevents it from
387    ///         // being stored in the async task.
388    ///         let mut buf_a = [0; 512];
389    ///         let mut buf_b = [0; 1024];
390    ///         let mut bufs = [
391    ///             IoSliceMut::new(&mut buf_a),
392    ///             IoSliceMut::new(&mut buf_b),
393    ///         ];
394    ///
395    ///         // Try to read data, this may still fail with `WouldBlock`
396    ///         // if the readiness event is a false positive.
397    ///         match stream.try_read_vectored(&mut bufs) {
398    ///             Ok(0) => break,
399    ///             Ok(n) => {
400    ///                 println!("read {} bytes", n);
401    ///             }
402    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
403    ///                 continue;
404    ///             }
405    ///             Err(e) => {
406    ///                 return Err(e.into());
407    ///             }
408    ///         }
409    ///     }
410    ///
411    ///     Ok(())
412    /// }
413    /// ```
414    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
415        self.io
416            .registration()
417            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
418    }
419
420    cfg_io_util! {
421        /// Tries to read data from the stream into the provided buffer, advancing the
422        /// buffer's internal cursor, returning how many bytes were read.
423        ///
424        /// Receives any pending data from the socket but does not wait for new data
425        /// to arrive. On success, returns the number of bytes read. Because
426        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
427        /// the async task and can exist entirely on the stack.
428        ///
429        /// Usually, [`readable()`] or [`ready()`] is used with this function.
430        ///
431        /// [`readable()`]: UnixStream::readable()
432        /// [`ready()`]: UnixStream::ready()
433        ///
434        /// # Return
435        ///
436        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
437        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
438        /// and will no longer yield data. If the stream is not ready to read data
439        /// `Err(io::ErrorKind::WouldBlock)` is returned.
440        ///
441        /// # Examples
442        ///
443        /// ```no_run
444        /// use tokio::net::UnixStream;
445        /// use std::error::Error;
446        /// use std::io;
447        ///
448        /// #[tokio::main]
449        /// async fn main() -> Result<(), Box<dyn Error>> {
450        ///     // Connect to a peer
451        ///     let dir = tempfile::tempdir().unwrap();
452        ///     let bind_path = dir.path().join("bind_path");
453        ///     let stream = UnixStream::connect(bind_path).await?;
454        ///
455        ///     loop {
456        ///         // Wait for the socket to be readable
457        ///         stream.readable().await?;
458        ///
459        ///         let mut buf = Vec::with_capacity(4096);
460        ///
461        ///         // Try to read data, this may still fail with `WouldBlock`
462        ///         // if the readiness event is a false positive.
463        ///         match stream.try_read_buf(&mut buf) {
464        ///             Ok(0) => break,
465        ///             Ok(n) => {
466        ///                 println!("read {} bytes", n);
467        ///             }
468        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
469        ///                 continue;
470        ///             }
471        ///             Err(e) => {
472        ///                 return Err(e.into());
473        ///             }
474        ///         }
475        ///     }
476        ///
477        ///     Ok(())
478        /// }
479        /// ```
480        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
481            self.io.registration().try_io(Interest::READABLE, || {
482                use std::io::Read;
483
484                let dst = buf.chunk_mut();
485                let dst =
486                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
487
488                // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
489                // buffer.
490                let n = (&*self.io).read(dst)?;
491
492                unsafe {
493                    buf.advance_mut(n);
494                }
495
496                Ok(n)
497            })
498        }
499    }
500
501    /// Waits for the socket to become writable.
502    ///
503    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
504    /// paired with `try_write()`.
505    ///
506    /// # Cancel safety
507    ///
508    /// This method is cancel safe. Once a readiness event occurs, the method
509    /// will continue to return immediately until the readiness event is
510    /// consumed by an attempt to write that fails with `WouldBlock` or
511    /// `Poll::Pending`.
512    ///
513    /// # Examples
514    ///
515    /// ```no_run
516    /// use tokio::net::UnixStream;
517    /// use std::error::Error;
518    /// use std::io;
519    ///
520    /// #[tokio::main]
521    /// async fn main() -> Result<(), Box<dyn Error>> {
522    ///     // Connect to a peer
523    ///     let dir = tempfile::tempdir().unwrap();
524    ///     let bind_path = dir.path().join("bind_path");
525    ///     let stream = UnixStream::connect(bind_path).await?;
526    ///
527    ///     loop {
528    ///         // Wait for the socket to be writable
529    ///         stream.writable().await?;
530    ///
531    ///         // Try to write data, this may still fail with `WouldBlock`
532    ///         // if the readiness event is a false positive.
533    ///         match stream.try_write(b"hello world") {
534    ///             Ok(n) => {
535    ///                 break;
536    ///             }
537    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
538    ///                 continue;
539    ///             }
540    ///             Err(e) => {
541    ///                 return Err(e.into());
542    ///             }
543    ///         }
544    ///     }
545    ///
546    ///     Ok(())
547    /// }
548    /// ```
549    pub async fn writable(&self) -> io::Result<()> {
550        self.ready(Interest::WRITABLE).await?;
551        Ok(())
552    }
553
554    /// Polls for write readiness.
555    ///
556    /// If the unix stream is not currently ready for writing, this method will
557    /// store a clone of the `Waker` from the provided `Context`. When the unix
558    /// stream becomes ready for writing, `Waker::wake` will be called on the
559    /// waker.
560    ///
561    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
562    /// the `Waker` from the `Context` passed to the most recent call is
563    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
564    /// second, independent waker.)
565    ///
566    /// This function is intended for cases where creating and pinning a future
567    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
568    /// preferred, as this supports polling from multiple tasks at once.
569    ///
570    /// # Return value
571    ///
572    /// The function returns:
573    ///
574    /// * `Poll::Pending` if the unix stream is not ready for writing.
575    /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
576    /// * `Poll::Ready(Err(e))` if an error is encountered.
577    ///
578    /// # Errors
579    ///
580    /// This function may encounter any standard I/O error except `WouldBlock`.
581    ///
582    /// [`writable`]: method@Self::writable
583    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
584        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
585    }
586
587    /// Tries to write a buffer to the stream, returning how many bytes were
588    /// written.
589    ///
590    /// The function will attempt to write the entire contents of `buf`, but
591    /// only part of the buffer may be written.
592    ///
593    /// This function is usually paired with `writable()`.
594    ///
595    /// # Return
596    ///
597    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
598    /// number of bytes written. If the stream is not ready to write data,
599    /// `Err(io::ErrorKind::WouldBlock)` is returned.
600    ///
601    /// # Examples
602    ///
603    /// ```no_run
604    /// use tokio::net::UnixStream;
605    /// use std::error::Error;
606    /// use std::io;
607    ///
608    /// #[tokio::main]
609    /// async fn main() -> Result<(), Box<dyn Error>> {
610    ///     // Connect to a peer
611    ///     let dir = tempfile::tempdir().unwrap();
612    ///     let bind_path = dir.path().join("bind_path");
613    ///     let stream = UnixStream::connect(bind_path).await?;
614    ///
615    ///     loop {
616    ///         // Wait for the socket to be writable
617    ///         stream.writable().await?;
618    ///
619    ///         // Try to write data, this may still fail with `WouldBlock`
620    ///         // if the readiness event is a false positive.
621    ///         match stream.try_write(b"hello world") {
622    ///             Ok(n) => {
623    ///                 break;
624    ///             }
625    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
626    ///                 continue;
627    ///             }
628    ///             Err(e) => {
629    ///                 return Err(e.into());
630    ///             }
631    ///         }
632    ///     }
633    ///
634    ///     Ok(())
635    /// }
636    /// ```
637    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
638        self.io
639            .registration()
640            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
641    }
642
643    /// Tries to write several buffers to the stream, returning how many bytes
644    /// were written.
645    ///
646    /// Data is written from each buffer in order, with the final buffer read
647    /// from possible being only partially consumed. This method behaves
648    /// equivalently to a single call to [`try_write()`] with concatenated
649    /// buffers.
650    ///
651    /// This function is usually paired with `writable()`.
652    ///
653    /// [`try_write()`]: UnixStream::try_write()
654    ///
655    /// # Return
656    ///
657    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
658    /// number of bytes written. If the stream is not ready to write data,
659    /// `Err(io::ErrorKind::WouldBlock)` is returned.
660    ///
661    /// # Examples
662    ///
663    /// ```no_run
664    /// use tokio::net::UnixStream;
665    /// use std::error::Error;
666    /// use std::io;
667    ///
668    /// #[tokio::main]
669    /// async fn main() -> Result<(), Box<dyn Error>> {
670    ///     // Connect to a peer
671    ///     let dir = tempfile::tempdir().unwrap();
672    ///     let bind_path = dir.path().join("bind_path");
673    ///     let stream = UnixStream::connect(bind_path).await?;
674    ///
675    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
676    ///
677    ///     loop {
678    ///         // Wait for the socket to be writable
679    ///         stream.writable().await?;
680    ///
681    ///         // Try to write data, this may still fail with `WouldBlock`
682    ///         // if the readiness event is a false positive.
683    ///         match stream.try_write_vectored(&bufs) {
684    ///             Ok(n) => {
685    ///                 break;
686    ///             }
687    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
688    ///                 continue;
689    ///             }
690    ///             Err(e) => {
691    ///                 return Err(e.into());
692    ///             }
693    ///         }
694    ///     }
695    ///
696    ///     Ok(())
697    /// }
698    /// ```
699    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
700        self.io
701            .registration()
702            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
703    }
704
705    /// Tries to read or write from the socket using a user-provided IO operation.
706    ///
707    /// If the socket is ready, the provided closure is called. The closure
708    /// should attempt to perform IO operation on the socket by manually
709    /// calling the appropriate syscall. If the operation fails because the
710    /// socket is not actually ready, then the closure should return a
711    /// `WouldBlock` error and the readiness flag is cleared. The return value
712    /// of the closure is then returned by `try_io`.
713    ///
714    /// If the socket is not ready, then the closure is not called
715    /// and a `WouldBlock` error is returned.
716    ///
717    /// The closure should only return a `WouldBlock` error if it has performed
718    /// an IO operation on the socket that failed due to the socket not being
719    /// ready. Returning a `WouldBlock` error in any other situation will
720    /// incorrectly clear the readiness flag, which can cause the socket to
721    /// behave incorrectly.
722    ///
723    /// The closure should not perform the IO operation using any of the methods
724    /// defined on the Tokio `UnixStream` type, as this will mess with the
725    /// readiness flag and can cause the socket to behave incorrectly.
726    ///
727    /// This method is not intended to be used with combined interests.
728    /// The closure should perform only one type of IO operation, so it should not
729    /// require more than one ready state. This method may panic or sleep forever
730    /// if it is called with a combined interest.
731    ///
732    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
733    ///
734    /// [`readable()`]: UnixStream::readable()
735    /// [`writable()`]: UnixStream::writable()
736    /// [`ready()`]: UnixStream::ready()
737    pub fn try_io<R>(
738        &self,
739        interest: Interest,
740        f: impl FnOnce() -> io::Result<R>,
741    ) -> io::Result<R> {
742        self.io
743            .registration()
744            .try_io(interest, || self.io.try_io(f))
745    }
746
747    /// Reads or writes from the socket using a user-provided IO operation.
748    ///
749    /// The readiness of the socket is awaited and when the socket is ready,
750    /// the provided closure is called. The closure should attempt to perform
751    /// IO operation on the socket by manually calling the appropriate syscall.
752    /// If the operation fails because the socket is not actually ready,
753    /// then the closure should return a `WouldBlock` error. In such case the
754    /// readiness flag is cleared and the socket readiness is awaited again.
755    /// This loop is repeated until the closure returns an `Ok` or an error
756    /// other than `WouldBlock`.
757    ///
758    /// The closure should only return a `WouldBlock` error if it has performed
759    /// an IO operation on the socket that failed due to the socket not being
760    /// ready. Returning a `WouldBlock` error in any other situation will
761    /// incorrectly clear the readiness flag, which can cause the socket to
762    /// behave incorrectly.
763    ///
764    /// The closure should not perform the IO operation using any of the methods
765    /// defined on the Tokio `UnixStream` type, as this will mess with the
766    /// readiness flag and can cause the socket to behave incorrectly.
767    ///
768    /// This method is not intended to be used with combined interests.
769    /// The closure should perform only one type of IO operation, so it should not
770    /// require more than one ready state. This method may panic or sleep forever
771    /// if it is called with a combined interest.
772    pub async fn async_io<R>(
773        &self,
774        interest: Interest,
775        mut f: impl FnMut() -> io::Result<R>,
776    ) -> io::Result<R> {
777        self.io
778            .registration()
779            .async_io(interest, || self.io.try_io(&mut f))
780            .await
781    }
782
783    /// Creates new [`UnixStream`] from a [`std::os::unix::net::UnixStream`].
784    ///
785    /// This function is intended to be used to wrap a `UnixStream` from the
786    /// standard library in the Tokio equivalent.
787    ///
788    /// # Notes
789    ///
790    /// The caller is responsible for ensuring that the stream is in
791    /// non-blocking mode. Otherwise all I/O operations on the stream
792    /// will block the thread, which will cause unexpected behavior.
793    /// Non-blocking mode can be set using [`set_nonblocking`].
794    ///
795    /// Passing a listener in blocking mode is always erroneous,
796    /// and the behavior in that case may change in the future.
797    /// For example, it could panic.
798    ///
799    /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
800    ///
801    /// # Examples
802    ///
803    /// ```no_run
804    /// use tokio::net::UnixStream;
805    /// use std::os::unix::net::UnixStream as StdUnixStream;
806    /// # use std::error::Error;
807    ///
808    /// # async fn dox() -> Result<(), Box<dyn Error>> {
809    /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
810    /// std_stream.set_nonblocking(true)?;
811    /// let stream = UnixStream::from_std(std_stream)?;
812    /// # Ok(())
813    /// # }
814    /// ```
815    ///
816    /// # Panics
817    ///
818    /// This function panics if it is not called from within a runtime with
819    /// IO enabled.
820    ///
821    /// The runtime is usually set implicitly when this function is called
822    /// from a future driven by a tokio runtime, otherwise runtime can be set
823    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
824    #[track_caller]
825    pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
826        check_socket_for_blocking(&stream)?;
827
828        let stream = mio::net::UnixStream::from_std(stream);
829        let io = PollEvented::new(stream)?;
830
831        Ok(UnixStream { io })
832    }
833
834    /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
835    ///
836    /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
837    /// mode set as `true`.  Use [`set_nonblocking`] to change the blocking
838    /// mode if needed.
839    ///
840    /// # Examples
841    ///
842    /// ```
843    /// # if cfg!(miri) { return } // No `socket` in miri.
844    /// use std::error::Error;
845    /// use std::io::Read;
846    /// use tokio::net::UnixListener;
847    /// # use tokio::net::UnixStream;
848    /// # use tokio::io::AsyncWriteExt;
849    ///
850    /// #[tokio::main]
851    /// async fn main() -> Result<(), Box<dyn Error>> {
852    ///     let dir = tempfile::tempdir().unwrap();
853    ///     let bind_path = dir.path().join("bind_path");
854    ///
855    ///     let mut data = [0u8; 12];
856    ///     let listener = UnixListener::bind(&bind_path)?;
857    /// #   let handle = tokio::spawn(async {
858    /// #       let mut stream = UnixStream::connect(bind_path).await.unwrap();
859    /// #       stream.write(b"Hello world!").await.unwrap();
860    /// #   });
861    ///     let (tokio_unix_stream, _) = listener.accept().await?;
862    ///     let mut std_unix_stream = tokio_unix_stream.into_std()?;
863    /// #   handle.await.expect("The task being joined has panicked");
864    ///     std_unix_stream.set_nonblocking(false)?;
865    ///     std_unix_stream.read_exact(&mut data)?;
866    /// #   assert_eq!(b"Hello world!", &data);
867    ///     Ok(())
868    /// }
869    /// ```
870    /// [`tokio::net::UnixStream`]: UnixStream
871    /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
872    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
873    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
874        self.io
875            .into_inner()
876            .map(IntoRawFd::into_raw_fd)
877            .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
878    }
879
880    /// Creates an unnamed pair of connected sockets.
881    ///
882    /// This function will create a pair of interconnected Unix sockets for
883    /// communicating back and forth between one another. Each socket will
884    /// be associated with the default event loop's handle.
885    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
886        let (a, b) = mio::net::UnixStream::pair()?;
887        let a = UnixStream::new(a)?;
888        let b = UnixStream::new(b)?;
889
890        Ok((a, b))
891    }
892
893    pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
894        let io = PollEvented::new(stream)?;
895        Ok(UnixStream { io })
896    }
897
898    /// Returns the socket address of the local half of this connection.
899    ///
900    /// # Examples
901    ///
902    /// ```no_run
903    /// use tokio::net::UnixStream;
904    ///
905    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
906    /// let dir = tempfile::tempdir().unwrap();
907    /// let bind_path = dir.path().join("bind_path");
908    /// let stream = UnixStream::connect(bind_path).await?;
909    ///
910    /// println!("{:?}", stream.local_addr()?);
911    /// # Ok(())
912    /// # }
913    /// ```
914    pub fn local_addr(&self) -> io::Result<SocketAddr> {
915        self.io.local_addr().map(SocketAddr)
916    }
917
918    /// Returns the socket address of the remote half of this connection.
919    ///
920    /// # Examples
921    ///
922    /// ```no_run
923    /// use tokio::net::UnixStream;
924    ///
925    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
926    /// let dir = tempfile::tempdir().unwrap();
927    /// let bind_path = dir.path().join("bind_path");
928    /// let stream = UnixStream::connect(bind_path).await?;
929    ///
930    /// println!("{:?}", stream.peer_addr()?);
931    /// # Ok(())
932    /// # }
933    /// ```
934    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
935        self.io.peer_addr().map(SocketAddr)
936    }
937
938    /// Returns effective credentials of the process which called `connect` or `pair`.
939    pub fn peer_cred(&self) -> io::Result<UCred> {
940        ucred::get_peer_cred(self)
941    }
942
943    /// Returns the value of the `SO_ERROR` option.
944    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
945        self.io.take_error()
946    }
947
948    /// Shuts down the read, write, or both halves of this connection.
949    ///
950    /// This function will cause all pending and future I/O calls on the
951    /// specified portions to immediately return with an appropriate value
952    /// (see the documentation of `Shutdown`).
953    pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
954        self.io.shutdown(how)
955    }
956
957    // These lifetime markers also appear in the generated documentation, and make
958    // it more clear that this is a *borrowed* split.
959    #[allow(clippy::needless_lifetimes)]
960    /// Splits a `UnixStream` into a read half and a write half, which can be used
961    /// to read and write the stream concurrently.
962    ///
963    /// This method is more efficient than [`into_split`], but the halves cannot be
964    /// moved into independently spawned tasks.
965    ///
966    /// [`into_split`]: Self::into_split()
967    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
968        split(self)
969    }
970
971    /// Splits a `UnixStream` into a read half and a write half, which can be used
972    /// to read and write the stream concurrently.
973    ///
974    /// Unlike [`split`], the owned halves can be moved to separate tasks, however
975    /// this comes at the cost of a heap allocation.
976    ///
977    /// **Note:** Dropping the write half will shut down the write half of the
978    /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
979    ///
980    /// [`split`]: Self::split()
981    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
982    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
983        split_owned(self)
984    }
985}
986
987impl TryFrom<net::UnixStream> for UnixStream {
988    type Error = io::Error;
989
990    /// Consumes stream, returning the tokio I/O object.
991    ///
992    /// This is equivalent to
993    /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
994    fn try_from(stream: net::UnixStream) -> io::Result<Self> {
995        Self::from_std(stream)
996    }
997}
998
999impl AsyncRead for UnixStream {
1000    fn poll_read(
1001        self: Pin<&mut Self>,
1002        cx: &mut Context<'_>,
1003        buf: &mut ReadBuf<'_>,
1004    ) -> Poll<io::Result<()>> {
1005        self.poll_read_priv(cx, buf)
1006    }
1007}
1008
1009impl AsyncWrite for UnixStream {
1010    fn poll_write(
1011        self: Pin<&mut Self>,
1012        cx: &mut Context<'_>,
1013        buf: &[u8],
1014    ) -> Poll<io::Result<usize>> {
1015        self.poll_write_priv(cx, buf)
1016    }
1017
1018    fn poll_write_vectored(
1019        self: Pin<&mut Self>,
1020        cx: &mut Context<'_>,
1021        bufs: &[io::IoSlice<'_>],
1022    ) -> Poll<io::Result<usize>> {
1023        self.poll_write_vectored_priv(cx, bufs)
1024    }
1025
1026    fn is_write_vectored(&self) -> bool {
1027        true
1028    }
1029
1030    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1031        Poll::Ready(Ok(()))
1032    }
1033
1034    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1035        self.shutdown_std(std::net::Shutdown::Write)?;
1036        Poll::Ready(Ok(()))
1037    }
1038}
1039
1040impl UnixStream {
1041    // == Poll IO functions that takes `&self` ==
1042    //
1043    // To read or write without mutable access to the `UnixStream`, combine the
1044    // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1045    // `try_write` methods.
1046
1047    pub(crate) fn poll_read_priv(
1048        &self,
1049        cx: &mut Context<'_>,
1050        buf: &mut ReadBuf<'_>,
1051    ) -> Poll<io::Result<()>> {
1052        // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
1053        unsafe { self.io.poll_read(cx, buf) }
1054    }
1055
1056    pub(crate) fn poll_write_priv(
1057        &self,
1058        cx: &mut Context<'_>,
1059        buf: &[u8],
1060    ) -> Poll<io::Result<usize>> {
1061        self.io.poll_write(cx, buf)
1062    }
1063
1064    pub(super) fn poll_write_vectored_priv(
1065        &self,
1066        cx: &mut Context<'_>,
1067        bufs: &[io::IoSlice<'_>],
1068    ) -> Poll<io::Result<usize>> {
1069        self.io.poll_write_vectored(cx, bufs)
1070    }
1071}
1072
1073impl fmt::Debug for UnixStream {
1074    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1075        self.io.fmt(f)
1076    }
1077}
1078
1079impl AsRawFd for UnixStream {
1080    fn as_raw_fd(&self) -> RawFd {
1081        self.io.as_raw_fd()
1082    }
1083}
1084
1085impl AsFd for UnixStream {
1086    fn as_fd(&self) -> BorrowedFd<'_> {
1087        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1088    }
1089}