tokio/net/unix/stream.rs
1use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::split::{split, ReadHalf, WriteHalf};
3use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
4use crate::net::unix::ucred::{self, UCred};
5use crate::net::unix::SocketAddr;
6use crate::util::check_socket_for_blocking;
7
8use std::fmt;
9use std::future::poll_fn;
10use std::io::{self, Read, Write};
11use std::net::Shutdown;
12#[cfg(target_os = "android")]
13use std::os::android::net::SocketAddrExt;
14#[cfg(target_os = "linux")]
15use std::os::linux::net::SocketAddrExt;
16#[cfg(any(target_os = "linux", target_os = "android"))]
17use std::os::unix::ffi::OsStrExt;
18use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
19use std::os::unix::net::{self, SocketAddr as StdSocketAddr};
20use std::path::Path;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23
24cfg_io_util! {
25 use bytes::BufMut;
26}
27
28cfg_net_unix! {
29 /// A structure representing a connected Unix socket.
30 ///
31 /// This socket can be connected directly with [`UnixStream::connect`] or accepted
32 /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
33 /// anonymous Unix sockets can be created with `UnixStream::pair`.
34 ///
35 /// To shut down the stream in the write direction, you can call the
36 /// [`shutdown()`] method. This will cause the other peer to receive a read of
37 /// length 0, indicating that no more data will be sent. This only closes
38 /// the stream in one direction.
39 ///
40 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
41 /// [`UnixListener::accept`]: crate::net::UnixListener::accept
42 #[cfg_attr(docsrs, doc(alias = "uds"))]
43 pub struct UnixStream {
44 io: PollEvented<mio::net::UnixStream>,
45 }
46}
47
48impl UnixStream {
49 pub(crate) async fn connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream> {
50 let stream = UnixStream::new(sys)?;
51
52 // Once we've connected, wait for the stream to be writable as
53 // that's when the actual connection has been initiated. Once we're
54 // writable we check for `take_socket_error` to see if the connect
55 // actually hit an error or not.
56 //
57 // If all that succeeded then we ship everything on up.
58 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
59
60 if let Some(e) = stream.io.take_error()? {
61 return Err(e);
62 }
63
64 Ok(stream)
65 }
66
67 /// Connects to the socket named by `path`.
68 ///
69 /// This function will create a new Unix socket and connect to the path
70 /// specified, associating the returned stream with the default event loop's
71 /// handle.
72 pub async fn connect<P>(path: P) -> io::Result<UnixStream>
73 where
74 P: AsRef<Path>,
75 {
76 // On linux, abstract socket paths need to be considered.
77 #[cfg(any(target_os = "linux", target_os = "android"))]
78 let addr = {
79 let os_str_bytes = path.as_ref().as_os_str().as_bytes();
80 if os_str_bytes.starts_with(b"\0") {
81 StdSocketAddr::from_abstract_name(&os_str_bytes[1..])?
82 } else {
83 StdSocketAddr::from_pathname(path)?
84 }
85 };
86 #[cfg(not(any(target_os = "linux", target_os = "android")))]
87 let addr = StdSocketAddr::from_pathname(path)?;
88
89 let stream = mio::net::UnixStream::connect_addr(&addr)?;
90 let stream = UnixStream::new(stream)?;
91
92 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
93
94 if let Some(e) = stream.io.take_error()? {
95 return Err(e);
96 }
97
98 Ok(stream)
99 }
100
101 /// Waits for any of the requested ready states.
102 ///
103 /// This function is usually paired with `try_read()` or `try_write()`. It
104 /// can be used to concurrently read / write to the same socket on a single
105 /// task without splitting the socket.
106 ///
107 /// The function may complete without the socket being ready. This is a
108 /// false-positive and attempting an operation will return with
109 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
110 /// [`Ready`] set, so you should always check the returned value and possibly
111 /// wait again if the requested states are not set.
112 ///
113 /// # Cancel safety
114 ///
115 /// This method is cancel safe. Once a readiness event occurs, the method
116 /// will continue to return immediately until the readiness event is
117 /// consumed by an attempt to read or write that fails with `WouldBlock` or
118 /// `Poll::Pending`.
119 ///
120 /// # Examples
121 ///
122 /// Concurrently read and write to the stream on the same task without
123 /// splitting.
124 ///
125 /// ```no_run
126 /// use tokio::io::Interest;
127 /// use tokio::net::UnixStream;
128 /// use std::error::Error;
129 /// use std::io;
130 ///
131 /// #[tokio::main]
132 /// async fn main() -> Result<(), Box<dyn Error>> {
133 /// let dir = tempfile::tempdir().unwrap();
134 /// let bind_path = dir.path().join("bind_path");
135 /// let stream = UnixStream::connect(bind_path).await?;
136 ///
137 /// loop {
138 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
139 ///
140 /// if ready.is_readable() {
141 /// let mut data = vec![0; 1024];
142 /// // Try to read data, this may still fail with `WouldBlock`
143 /// // if the readiness event is a false positive.
144 /// match stream.try_read(&mut data) {
145 /// Ok(n) => {
146 /// println!("read {} bytes", n);
147 /// }
148 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
149 /// continue;
150 /// }
151 /// Err(e) => {
152 /// return Err(e.into());
153 /// }
154 /// }
155 ///
156 /// }
157 ///
158 /// if ready.is_writable() {
159 /// // Try to write data, this may still fail with `WouldBlock`
160 /// // if the readiness event is a false positive.
161 /// match stream.try_write(b"hello world") {
162 /// Ok(n) => {
163 /// println!("write {} bytes", n);
164 /// }
165 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
166 /// continue;
167 /// }
168 /// Err(e) => {
169 /// return Err(e.into());
170 /// }
171 /// }
172 /// }
173 /// }
174 /// }
175 /// ```
176 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
177 let event = self.io.registration().readiness(interest).await?;
178 Ok(event.ready)
179 }
180
181 /// Waits for the socket to become readable.
182 ///
183 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
184 /// paired with `try_read()`.
185 ///
186 /// # Cancel safety
187 ///
188 /// This method is cancel safe. Once a readiness event occurs, the method
189 /// will continue to return immediately until the readiness event is
190 /// consumed by an attempt to read that fails with `WouldBlock` or
191 /// `Poll::Pending`.
192 ///
193 /// # Examples
194 ///
195 /// ```no_run
196 /// use tokio::net::UnixStream;
197 /// use std::error::Error;
198 /// use std::io;
199 ///
200 /// #[tokio::main]
201 /// async fn main() -> Result<(), Box<dyn Error>> {
202 /// // Connect to a peer
203 /// let dir = tempfile::tempdir().unwrap();
204 /// let bind_path = dir.path().join("bind_path");
205 /// let stream = UnixStream::connect(bind_path).await?;
206 ///
207 /// let mut msg = vec![0; 1024];
208 ///
209 /// loop {
210 /// // Wait for the socket to be readable
211 /// stream.readable().await?;
212 ///
213 /// // Try to read data, this may still fail with `WouldBlock`
214 /// // if the readiness event is a false positive.
215 /// match stream.try_read(&mut msg) {
216 /// Ok(n) => {
217 /// msg.truncate(n);
218 /// break;
219 /// }
220 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
221 /// continue;
222 /// }
223 /// Err(e) => {
224 /// return Err(e.into());
225 /// }
226 /// }
227 /// }
228 ///
229 /// println!("GOT = {:?}", msg);
230 /// Ok(())
231 /// }
232 /// ```
233 pub async fn readable(&self) -> io::Result<()> {
234 self.ready(Interest::READABLE).await?;
235 Ok(())
236 }
237
238 /// Polls for read readiness.
239 ///
240 /// If the unix stream is not currently ready for reading, this method will
241 /// store a clone of the `Waker` from the provided `Context`. When the unix
242 /// stream becomes ready for reading, `Waker::wake` will be called on the
243 /// waker.
244 ///
245 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
246 /// the `Waker` from the `Context` passed to the most recent call is
247 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
248 /// second, independent waker.)
249 ///
250 /// This function is intended for cases where creating and pinning a future
251 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
252 /// preferred, as this supports polling from multiple tasks at once.
253 ///
254 /// # Return value
255 ///
256 /// The function returns:
257 ///
258 /// * `Poll::Pending` if the unix stream is not ready for reading.
259 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
260 /// * `Poll::Ready(Err(e))` if an error is encountered.
261 ///
262 /// # Errors
263 ///
264 /// This function may encounter any standard I/O error except `WouldBlock`.
265 ///
266 /// [`readable`]: method@Self::readable
267 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
268 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
269 }
270
271 /// Try to read data from the stream into the provided buffer, returning how
272 /// many bytes were read.
273 ///
274 /// Receives any pending data from the socket but does not wait for new data
275 /// to arrive. On success, returns the number of bytes read. Because
276 /// `try_read()` is non-blocking, the buffer does not have to be stored by
277 /// the async task and can exist entirely on the stack.
278 ///
279 /// Usually, [`readable()`] or [`ready()`] is used with this function.
280 ///
281 /// [`readable()`]: UnixStream::readable()
282 /// [`ready()`]: UnixStream::ready()
283 ///
284 /// # Return
285 ///
286 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
287 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
288 ///
289 /// 1. The stream's read half is closed and will no longer yield data.
290 /// 2. The specified buffer was 0 bytes in length.
291 ///
292 /// If the stream is not ready to read data,
293 /// `Err(io::ErrorKind::WouldBlock)` is returned.
294 ///
295 /// # Examples
296 ///
297 /// ```no_run
298 /// use tokio::net::UnixStream;
299 /// use std::error::Error;
300 /// use std::io;
301 ///
302 /// #[tokio::main]
303 /// async fn main() -> Result<(), Box<dyn Error>> {
304 /// // Connect to a peer
305 /// let dir = tempfile::tempdir().unwrap();
306 /// let bind_path = dir.path().join("bind_path");
307 /// let stream = UnixStream::connect(bind_path).await?;
308 ///
309 /// loop {
310 /// // Wait for the socket to be readable
311 /// stream.readable().await?;
312 ///
313 /// // Creating the buffer **after** the `await` prevents it from
314 /// // being stored in the async task.
315 /// let mut buf = [0; 4096];
316 ///
317 /// // Try to read data, this may still fail with `WouldBlock`
318 /// // if the readiness event is a false positive.
319 /// match stream.try_read(&mut buf) {
320 /// Ok(0) => break,
321 /// Ok(n) => {
322 /// println!("read {} bytes", n);
323 /// }
324 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
325 /// continue;
326 /// }
327 /// Err(e) => {
328 /// return Err(e.into());
329 /// }
330 /// }
331 /// }
332 ///
333 /// Ok(())
334 /// }
335 /// ```
336 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
337 self.io
338 .registration()
339 .try_io(Interest::READABLE, || (&*self.io).read(buf))
340 }
341
342 /// Tries to read data from the stream into the provided buffers, returning
343 /// how many bytes were read.
344 ///
345 /// Data is copied to fill each buffer in order, with the final buffer
346 /// written to possibly being only partially filled. This method behaves
347 /// equivalently to a single call to [`try_read()`] with concatenated
348 /// buffers.
349 ///
350 /// Receives any pending data from the socket but does not wait for new data
351 /// to arrive. On success, returns the number of bytes read. Because
352 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
353 /// stored by the async task and can exist entirely on the stack.
354 ///
355 /// Usually, [`readable()`] or [`ready()`] is used with this function.
356 ///
357 /// [`try_read()`]: UnixStream::try_read()
358 /// [`readable()`]: UnixStream::readable()
359 /// [`ready()`]: UnixStream::ready()
360 ///
361 /// # Return
362 ///
363 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
364 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
365 /// and will no longer yield data. If the stream is not ready to read data
366 /// `Err(io::ErrorKind::WouldBlock)` is returned.
367 ///
368 /// # Examples
369 ///
370 /// ```no_run
371 /// use tokio::net::UnixStream;
372 /// use std::error::Error;
373 /// use std::io::{self, IoSliceMut};
374 ///
375 /// #[tokio::main]
376 /// async fn main() -> Result<(), Box<dyn Error>> {
377 /// // Connect to a peer
378 /// let dir = tempfile::tempdir().unwrap();
379 /// let bind_path = dir.path().join("bind_path");
380 /// let stream = UnixStream::connect(bind_path).await?;
381 ///
382 /// loop {
383 /// // Wait for the socket to be readable
384 /// stream.readable().await?;
385 ///
386 /// // Creating the buffer **after** the `await` prevents it from
387 /// // being stored in the async task.
388 /// let mut buf_a = [0; 512];
389 /// let mut buf_b = [0; 1024];
390 /// let mut bufs = [
391 /// IoSliceMut::new(&mut buf_a),
392 /// IoSliceMut::new(&mut buf_b),
393 /// ];
394 ///
395 /// // Try to read data, this may still fail with `WouldBlock`
396 /// // if the readiness event is a false positive.
397 /// match stream.try_read_vectored(&mut bufs) {
398 /// Ok(0) => break,
399 /// Ok(n) => {
400 /// println!("read {} bytes", n);
401 /// }
402 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
403 /// continue;
404 /// }
405 /// Err(e) => {
406 /// return Err(e.into());
407 /// }
408 /// }
409 /// }
410 ///
411 /// Ok(())
412 /// }
413 /// ```
414 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
415 self.io
416 .registration()
417 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
418 }
419
420 cfg_io_util! {
421 /// Tries to read data from the stream into the provided buffer, advancing the
422 /// buffer's internal cursor, returning how many bytes were read.
423 ///
424 /// Receives any pending data from the socket but does not wait for new data
425 /// to arrive. On success, returns the number of bytes read. Because
426 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
427 /// the async task and can exist entirely on the stack.
428 ///
429 /// Usually, [`readable()`] or [`ready()`] is used with this function.
430 ///
431 /// [`readable()`]: UnixStream::readable()
432 /// [`ready()`]: UnixStream::ready()
433 ///
434 /// # Return
435 ///
436 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
437 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
438 /// and will no longer yield data. If the stream is not ready to read data
439 /// `Err(io::ErrorKind::WouldBlock)` is returned.
440 ///
441 /// # Examples
442 ///
443 /// ```no_run
444 /// use tokio::net::UnixStream;
445 /// use std::error::Error;
446 /// use std::io;
447 ///
448 /// #[tokio::main]
449 /// async fn main() -> Result<(), Box<dyn Error>> {
450 /// // Connect to a peer
451 /// let dir = tempfile::tempdir().unwrap();
452 /// let bind_path = dir.path().join("bind_path");
453 /// let stream = UnixStream::connect(bind_path).await?;
454 ///
455 /// loop {
456 /// // Wait for the socket to be readable
457 /// stream.readable().await?;
458 ///
459 /// let mut buf = Vec::with_capacity(4096);
460 ///
461 /// // Try to read data, this may still fail with `WouldBlock`
462 /// // if the readiness event is a false positive.
463 /// match stream.try_read_buf(&mut buf) {
464 /// Ok(0) => break,
465 /// Ok(n) => {
466 /// println!("read {} bytes", n);
467 /// }
468 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
469 /// continue;
470 /// }
471 /// Err(e) => {
472 /// return Err(e.into());
473 /// }
474 /// }
475 /// }
476 ///
477 /// Ok(())
478 /// }
479 /// ```
480 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
481 self.io.registration().try_io(Interest::READABLE, || {
482 use std::io::Read;
483
484 let dst = buf.chunk_mut();
485 let dst =
486 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
487
488 // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
489 // buffer.
490 let n = (&*self.io).read(dst)?;
491
492 unsafe {
493 buf.advance_mut(n);
494 }
495
496 Ok(n)
497 })
498 }
499 }
500
501 /// Waits for the socket to become writable.
502 ///
503 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
504 /// paired with `try_write()`.
505 ///
506 /// # Cancel safety
507 ///
508 /// This method is cancel safe. Once a readiness event occurs, the method
509 /// will continue to return immediately until the readiness event is
510 /// consumed by an attempt to write that fails with `WouldBlock` or
511 /// `Poll::Pending`.
512 ///
513 /// # Examples
514 ///
515 /// ```no_run
516 /// use tokio::net::UnixStream;
517 /// use std::error::Error;
518 /// use std::io;
519 ///
520 /// #[tokio::main]
521 /// async fn main() -> Result<(), Box<dyn Error>> {
522 /// // Connect to a peer
523 /// let dir = tempfile::tempdir().unwrap();
524 /// let bind_path = dir.path().join("bind_path");
525 /// let stream = UnixStream::connect(bind_path).await?;
526 ///
527 /// loop {
528 /// // Wait for the socket to be writable
529 /// stream.writable().await?;
530 ///
531 /// // Try to write data, this may still fail with `WouldBlock`
532 /// // if the readiness event is a false positive.
533 /// match stream.try_write(b"hello world") {
534 /// Ok(n) => {
535 /// break;
536 /// }
537 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
538 /// continue;
539 /// }
540 /// Err(e) => {
541 /// return Err(e.into());
542 /// }
543 /// }
544 /// }
545 ///
546 /// Ok(())
547 /// }
548 /// ```
549 pub async fn writable(&self) -> io::Result<()> {
550 self.ready(Interest::WRITABLE).await?;
551 Ok(())
552 }
553
554 /// Polls for write readiness.
555 ///
556 /// If the unix stream is not currently ready for writing, this method will
557 /// store a clone of the `Waker` from the provided `Context`. When the unix
558 /// stream becomes ready for writing, `Waker::wake` will be called on the
559 /// waker.
560 ///
561 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
562 /// the `Waker` from the `Context` passed to the most recent call is
563 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
564 /// second, independent waker.)
565 ///
566 /// This function is intended for cases where creating and pinning a future
567 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
568 /// preferred, as this supports polling from multiple tasks at once.
569 ///
570 /// # Return value
571 ///
572 /// The function returns:
573 ///
574 /// * `Poll::Pending` if the unix stream is not ready for writing.
575 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
576 /// * `Poll::Ready(Err(e))` if an error is encountered.
577 ///
578 /// # Errors
579 ///
580 /// This function may encounter any standard I/O error except `WouldBlock`.
581 ///
582 /// [`writable`]: method@Self::writable
583 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
584 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
585 }
586
587 /// Tries to write a buffer to the stream, returning how many bytes were
588 /// written.
589 ///
590 /// The function will attempt to write the entire contents of `buf`, but
591 /// only part of the buffer may be written.
592 ///
593 /// This function is usually paired with `writable()`.
594 ///
595 /// # Return
596 ///
597 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
598 /// number of bytes written. If the stream is not ready to write data,
599 /// `Err(io::ErrorKind::WouldBlock)` is returned.
600 ///
601 /// # Examples
602 ///
603 /// ```no_run
604 /// use tokio::net::UnixStream;
605 /// use std::error::Error;
606 /// use std::io;
607 ///
608 /// #[tokio::main]
609 /// async fn main() -> Result<(), Box<dyn Error>> {
610 /// // Connect to a peer
611 /// let dir = tempfile::tempdir().unwrap();
612 /// let bind_path = dir.path().join("bind_path");
613 /// let stream = UnixStream::connect(bind_path).await?;
614 ///
615 /// loop {
616 /// // Wait for the socket to be writable
617 /// stream.writable().await?;
618 ///
619 /// // Try to write data, this may still fail with `WouldBlock`
620 /// // if the readiness event is a false positive.
621 /// match stream.try_write(b"hello world") {
622 /// Ok(n) => {
623 /// break;
624 /// }
625 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
626 /// continue;
627 /// }
628 /// Err(e) => {
629 /// return Err(e.into());
630 /// }
631 /// }
632 /// }
633 ///
634 /// Ok(())
635 /// }
636 /// ```
637 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
638 self.io
639 .registration()
640 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
641 }
642
643 /// Tries to write several buffers to the stream, returning how many bytes
644 /// were written.
645 ///
646 /// Data is written from each buffer in order, with the final buffer read
647 /// from possible being only partially consumed. This method behaves
648 /// equivalently to a single call to [`try_write()`] with concatenated
649 /// buffers.
650 ///
651 /// This function is usually paired with `writable()`.
652 ///
653 /// [`try_write()`]: UnixStream::try_write()
654 ///
655 /// # Return
656 ///
657 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
658 /// number of bytes written. If the stream is not ready to write data,
659 /// `Err(io::ErrorKind::WouldBlock)` is returned.
660 ///
661 /// # Examples
662 ///
663 /// ```no_run
664 /// use tokio::net::UnixStream;
665 /// use std::error::Error;
666 /// use std::io;
667 ///
668 /// #[tokio::main]
669 /// async fn main() -> Result<(), Box<dyn Error>> {
670 /// // Connect to a peer
671 /// let dir = tempfile::tempdir().unwrap();
672 /// let bind_path = dir.path().join("bind_path");
673 /// let stream = UnixStream::connect(bind_path).await?;
674 ///
675 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
676 ///
677 /// loop {
678 /// // Wait for the socket to be writable
679 /// stream.writable().await?;
680 ///
681 /// // Try to write data, this may still fail with `WouldBlock`
682 /// // if the readiness event is a false positive.
683 /// match stream.try_write_vectored(&bufs) {
684 /// Ok(n) => {
685 /// break;
686 /// }
687 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
688 /// continue;
689 /// }
690 /// Err(e) => {
691 /// return Err(e.into());
692 /// }
693 /// }
694 /// }
695 ///
696 /// Ok(())
697 /// }
698 /// ```
699 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
700 self.io
701 .registration()
702 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
703 }
704
705 /// Tries to read or write from the socket using a user-provided IO operation.
706 ///
707 /// If the socket is ready, the provided closure is called. The closure
708 /// should attempt to perform IO operation on the socket by manually
709 /// calling the appropriate syscall. If the operation fails because the
710 /// socket is not actually ready, then the closure should return a
711 /// `WouldBlock` error and the readiness flag is cleared. The return value
712 /// of the closure is then returned by `try_io`.
713 ///
714 /// If the socket is not ready, then the closure is not called
715 /// and a `WouldBlock` error is returned.
716 ///
717 /// The closure should only return a `WouldBlock` error if it has performed
718 /// an IO operation on the socket that failed due to the socket not being
719 /// ready. Returning a `WouldBlock` error in any other situation will
720 /// incorrectly clear the readiness flag, which can cause the socket to
721 /// behave incorrectly.
722 ///
723 /// The closure should not perform the IO operation using any of the methods
724 /// defined on the Tokio `UnixStream` type, as this will mess with the
725 /// readiness flag and can cause the socket to behave incorrectly.
726 ///
727 /// This method is not intended to be used with combined interests.
728 /// The closure should perform only one type of IO operation, so it should not
729 /// require more than one ready state. This method may panic or sleep forever
730 /// if it is called with a combined interest.
731 ///
732 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
733 ///
734 /// [`readable()`]: UnixStream::readable()
735 /// [`writable()`]: UnixStream::writable()
736 /// [`ready()`]: UnixStream::ready()
737 pub fn try_io<R>(
738 &self,
739 interest: Interest,
740 f: impl FnOnce() -> io::Result<R>,
741 ) -> io::Result<R> {
742 self.io
743 .registration()
744 .try_io(interest, || self.io.try_io(f))
745 }
746
747 /// Reads or writes from the socket using a user-provided IO operation.
748 ///
749 /// The readiness of the socket is awaited and when the socket is ready,
750 /// the provided closure is called. The closure should attempt to perform
751 /// IO operation on the socket by manually calling the appropriate syscall.
752 /// If the operation fails because the socket is not actually ready,
753 /// then the closure should return a `WouldBlock` error. In such case the
754 /// readiness flag is cleared and the socket readiness is awaited again.
755 /// This loop is repeated until the closure returns an `Ok` or an error
756 /// other than `WouldBlock`.
757 ///
758 /// The closure should only return a `WouldBlock` error if it has performed
759 /// an IO operation on the socket that failed due to the socket not being
760 /// ready. Returning a `WouldBlock` error in any other situation will
761 /// incorrectly clear the readiness flag, which can cause the socket to
762 /// behave incorrectly.
763 ///
764 /// The closure should not perform the IO operation using any of the methods
765 /// defined on the Tokio `UnixStream` type, as this will mess with the
766 /// readiness flag and can cause the socket to behave incorrectly.
767 ///
768 /// This method is not intended to be used with combined interests.
769 /// The closure should perform only one type of IO operation, so it should not
770 /// require more than one ready state. This method may panic or sleep forever
771 /// if it is called with a combined interest.
772 pub async fn async_io<R>(
773 &self,
774 interest: Interest,
775 mut f: impl FnMut() -> io::Result<R>,
776 ) -> io::Result<R> {
777 self.io
778 .registration()
779 .async_io(interest, || self.io.try_io(&mut f))
780 .await
781 }
782
783 /// Creates new [`UnixStream`] from a [`std::os::unix::net::UnixStream`].
784 ///
785 /// This function is intended to be used to wrap a `UnixStream` from the
786 /// standard library in the Tokio equivalent.
787 ///
788 /// # Notes
789 ///
790 /// The caller is responsible for ensuring that the stream is in
791 /// non-blocking mode. Otherwise all I/O operations on the stream
792 /// will block the thread, which will cause unexpected behavior.
793 /// Non-blocking mode can be set using [`set_nonblocking`].
794 ///
795 /// Passing a listener in blocking mode is always erroneous,
796 /// and the behavior in that case may change in the future.
797 /// For example, it could panic.
798 ///
799 /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
800 ///
801 /// # Examples
802 ///
803 /// ```no_run
804 /// use tokio::net::UnixStream;
805 /// use std::os::unix::net::UnixStream as StdUnixStream;
806 /// # use std::error::Error;
807 ///
808 /// # async fn dox() -> Result<(), Box<dyn Error>> {
809 /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
810 /// std_stream.set_nonblocking(true)?;
811 /// let stream = UnixStream::from_std(std_stream)?;
812 /// # Ok(())
813 /// # }
814 /// ```
815 ///
816 /// # Panics
817 ///
818 /// This function panics if it is not called from within a runtime with
819 /// IO enabled.
820 ///
821 /// The runtime is usually set implicitly when this function is called
822 /// from a future driven by a tokio runtime, otherwise runtime can be set
823 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
824 #[track_caller]
825 pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
826 check_socket_for_blocking(&stream)?;
827
828 let stream = mio::net::UnixStream::from_std(stream);
829 let io = PollEvented::new(stream)?;
830
831 Ok(UnixStream { io })
832 }
833
834 /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
835 ///
836 /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
837 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking
838 /// mode if needed.
839 ///
840 /// # Examples
841 ///
842 /// ```
843 /// # if cfg!(miri) { return } // No `socket` in miri.
844 /// use std::error::Error;
845 /// use std::io::Read;
846 /// use tokio::net::UnixListener;
847 /// # use tokio::net::UnixStream;
848 /// # use tokio::io::AsyncWriteExt;
849 ///
850 /// #[tokio::main]
851 /// async fn main() -> Result<(), Box<dyn Error>> {
852 /// let dir = tempfile::tempdir().unwrap();
853 /// let bind_path = dir.path().join("bind_path");
854 ///
855 /// let mut data = [0u8; 12];
856 /// let listener = UnixListener::bind(&bind_path)?;
857 /// # let handle = tokio::spawn(async {
858 /// # let mut stream = UnixStream::connect(bind_path).await.unwrap();
859 /// # stream.write(b"Hello world!").await.unwrap();
860 /// # });
861 /// let (tokio_unix_stream, _) = listener.accept().await?;
862 /// let mut std_unix_stream = tokio_unix_stream.into_std()?;
863 /// # handle.await.expect("The task being joined has panicked");
864 /// std_unix_stream.set_nonblocking(false)?;
865 /// std_unix_stream.read_exact(&mut data)?;
866 /// # assert_eq!(b"Hello world!", &data);
867 /// Ok(())
868 /// }
869 /// ```
870 /// [`tokio::net::UnixStream`]: UnixStream
871 /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
872 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
873 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
874 self.io
875 .into_inner()
876 .map(IntoRawFd::into_raw_fd)
877 .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
878 }
879
880 /// Creates an unnamed pair of connected sockets.
881 ///
882 /// This function will create a pair of interconnected Unix sockets for
883 /// communicating back and forth between one another. Each socket will
884 /// be associated with the default event loop's handle.
885 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
886 let (a, b) = mio::net::UnixStream::pair()?;
887 let a = UnixStream::new(a)?;
888 let b = UnixStream::new(b)?;
889
890 Ok((a, b))
891 }
892
893 pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
894 let io = PollEvented::new(stream)?;
895 Ok(UnixStream { io })
896 }
897
898 /// Returns the socket address of the local half of this connection.
899 ///
900 /// # Examples
901 ///
902 /// ```no_run
903 /// use tokio::net::UnixStream;
904 ///
905 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
906 /// let dir = tempfile::tempdir().unwrap();
907 /// let bind_path = dir.path().join("bind_path");
908 /// let stream = UnixStream::connect(bind_path).await?;
909 ///
910 /// println!("{:?}", stream.local_addr()?);
911 /// # Ok(())
912 /// # }
913 /// ```
914 pub fn local_addr(&self) -> io::Result<SocketAddr> {
915 self.io.local_addr().map(SocketAddr)
916 }
917
918 /// Returns the socket address of the remote half of this connection.
919 ///
920 /// # Examples
921 ///
922 /// ```no_run
923 /// use tokio::net::UnixStream;
924 ///
925 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
926 /// let dir = tempfile::tempdir().unwrap();
927 /// let bind_path = dir.path().join("bind_path");
928 /// let stream = UnixStream::connect(bind_path).await?;
929 ///
930 /// println!("{:?}", stream.peer_addr()?);
931 /// # Ok(())
932 /// # }
933 /// ```
934 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
935 self.io.peer_addr().map(SocketAddr)
936 }
937
938 /// Returns effective credentials of the process which called `connect` or `pair`.
939 pub fn peer_cred(&self) -> io::Result<UCred> {
940 ucred::get_peer_cred(self)
941 }
942
943 /// Returns the value of the `SO_ERROR` option.
944 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
945 self.io.take_error()
946 }
947
948 /// Shuts down the read, write, or both halves of this connection.
949 ///
950 /// This function will cause all pending and future I/O calls on the
951 /// specified portions to immediately return with an appropriate value
952 /// (see the documentation of `Shutdown`).
953 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
954 self.io.shutdown(how)
955 }
956
957 // These lifetime markers also appear in the generated documentation, and make
958 // it more clear that this is a *borrowed* split.
959 #[allow(clippy::needless_lifetimes)]
960 /// Splits a `UnixStream` into a read half and a write half, which can be used
961 /// to read and write the stream concurrently.
962 ///
963 /// This method is more efficient than [`into_split`], but the halves cannot be
964 /// moved into independently spawned tasks.
965 ///
966 /// [`into_split`]: Self::into_split()
967 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
968 split(self)
969 }
970
971 /// Splits a `UnixStream` into a read half and a write half, which can be used
972 /// to read and write the stream concurrently.
973 ///
974 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
975 /// this comes at the cost of a heap allocation.
976 ///
977 /// **Note:** Dropping the write half will shut down the write half of the
978 /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
979 ///
980 /// [`split`]: Self::split()
981 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
982 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
983 split_owned(self)
984 }
985}
986
987impl TryFrom<net::UnixStream> for UnixStream {
988 type Error = io::Error;
989
990 /// Consumes stream, returning the tokio I/O object.
991 ///
992 /// This is equivalent to
993 /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
994 fn try_from(stream: net::UnixStream) -> io::Result<Self> {
995 Self::from_std(stream)
996 }
997}
998
999impl AsyncRead for UnixStream {
1000 fn poll_read(
1001 self: Pin<&mut Self>,
1002 cx: &mut Context<'_>,
1003 buf: &mut ReadBuf<'_>,
1004 ) -> Poll<io::Result<()>> {
1005 self.poll_read_priv(cx, buf)
1006 }
1007}
1008
1009impl AsyncWrite for UnixStream {
1010 fn poll_write(
1011 self: Pin<&mut Self>,
1012 cx: &mut Context<'_>,
1013 buf: &[u8],
1014 ) -> Poll<io::Result<usize>> {
1015 self.poll_write_priv(cx, buf)
1016 }
1017
1018 fn poll_write_vectored(
1019 self: Pin<&mut Self>,
1020 cx: &mut Context<'_>,
1021 bufs: &[io::IoSlice<'_>],
1022 ) -> Poll<io::Result<usize>> {
1023 self.poll_write_vectored_priv(cx, bufs)
1024 }
1025
1026 fn is_write_vectored(&self) -> bool {
1027 true
1028 }
1029
1030 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1031 Poll::Ready(Ok(()))
1032 }
1033
1034 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1035 self.shutdown_std(std::net::Shutdown::Write)?;
1036 Poll::Ready(Ok(()))
1037 }
1038}
1039
1040impl UnixStream {
1041 // == Poll IO functions that takes `&self` ==
1042 //
1043 // To read or write without mutable access to the `UnixStream`, combine the
1044 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1045 // `try_write` methods.
1046
1047 pub(crate) fn poll_read_priv(
1048 &self,
1049 cx: &mut Context<'_>,
1050 buf: &mut ReadBuf<'_>,
1051 ) -> Poll<io::Result<()>> {
1052 // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
1053 unsafe { self.io.poll_read(cx, buf) }
1054 }
1055
1056 pub(crate) fn poll_write_priv(
1057 &self,
1058 cx: &mut Context<'_>,
1059 buf: &[u8],
1060 ) -> Poll<io::Result<usize>> {
1061 self.io.poll_write(cx, buf)
1062 }
1063
1064 pub(super) fn poll_write_vectored_priv(
1065 &self,
1066 cx: &mut Context<'_>,
1067 bufs: &[io::IoSlice<'_>],
1068 ) -> Poll<io::Result<usize>> {
1069 self.io.poll_write_vectored(cx, bufs)
1070 }
1071}
1072
1073impl fmt::Debug for UnixStream {
1074 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1075 self.io.fmt(f)
1076 }
1077}
1078
1079impl AsRawFd for UnixStream {
1080 fn as_raw_fd(&self) -> RawFd {
1081 self.io.as_raw_fd()
1082 }
1083}
1084
1085impl AsFd for UnixStream {
1086 fn as_fd(&self) -> BorrowedFd<'_> {
1087 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1088 }
1089}