tokio/net/tcp/
stream.rs

1cfg_not_wasi! {
2    use crate::net::{to_socket_addrs, ToSocketAddrs};
3    use std::future::poll_fn;
4    use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10use crate::util::check_socket_for_blocking;
11
12use std::fmt;
13use std::io;
14use std::net::{Shutdown, SocketAddr};
15use std::pin::Pin;
16use std::task::{ready, Context, Poll};
17
18cfg_io_util! {
19    use bytes::BufMut;
20}
21
22cfg_net! {
23    /// A TCP stream between a local and a remote socket.
24    ///
25    /// A TCP stream can either be created by connecting to an endpoint, via the
26    /// [`connect`] method, or by [accepting] a connection from a [listener]. A
27    /// TCP stream can also be created via the [`TcpSocket`] type.
28    ///
29    /// Reading and writing to a `TcpStream` is usually done using the
30    /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
31    /// traits.
32    ///
33    /// [`connect`]: method@TcpStream::connect
34    /// [accepting]: method@crate::net::TcpListener::accept
35    /// [listener]: struct@crate::net::TcpListener
36    /// [`TcpSocket`]: struct@crate::net::TcpSocket
37    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
38    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
39    ///
40    /// # Examples
41    ///
42    /// ```no_run
43    /// use tokio::net::TcpStream;
44    /// use tokio::io::AsyncWriteExt;
45    /// use std::error::Error;
46    ///
47    /// #[tokio::main]
48    /// async fn main() -> Result<(), Box<dyn Error>> {
49    ///     // Connect to a peer
50    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
51    ///
52    ///     // Write some data.
53    ///     stream.write_all(b"hello world!").await?;
54    ///
55    ///     Ok(())
56    /// }
57    /// ```
58    ///
59    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
60    ///
61    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
62    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
63    ///
64    /// To shut down the stream in the write direction, you can call the
65    /// [`shutdown()`] method. This will cause the other peer to receive a read of
66    /// length 0, indicating that no more data will be sent. This only closes
67    /// the stream in one direction.
68    ///
69    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
70    pub struct TcpStream {
71        io: PollEvented<mio::net::TcpStream>,
72    }
73}
74
75impl TcpStream {
76    cfg_not_wasi! {
77        /// Opens a TCP connection to a remote host.
78        ///
79        /// `addr` is an address of the remote host. Anything which implements the
80        /// [`ToSocketAddrs`] trait can be supplied as the address.  If `addr`
81        /// yields multiple addresses, connect will be attempted with each of the
82        /// addresses until a connection is successful. If none of the addresses
83        /// result in a successful connection, the error returned from the last
84        /// connection attempt (the last address) is returned.
85        ///
86        /// To configure the socket before connecting, you can use the [`TcpSocket`]
87        /// type.
88        ///
89        /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
90        /// [`TcpSocket`]: struct@crate::net::TcpSocket
91        ///
92        /// # Examples
93        ///
94        /// ```no_run
95        /// use tokio::net::TcpStream;
96        /// use tokio::io::AsyncWriteExt;
97        /// use std::error::Error;
98        ///
99        /// #[tokio::main]
100        /// async fn main() -> Result<(), Box<dyn Error>> {
101        ///     // Connect to a peer
102        ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
103        ///
104        ///     // Write some data.
105        ///     stream.write_all(b"hello world!").await?;
106        ///
107        ///     Ok(())
108        /// }
109        /// ```
110        ///
111        /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
112        ///
113        /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
114        /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
115        pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
116            let addrs = to_socket_addrs(addr).await?;
117
118            let mut last_err = None;
119
120            for addr in addrs {
121                match TcpStream::connect_addr(addr).await {
122                    Ok(stream) => return Ok(stream),
123                    Err(e) => last_err = Some(e),
124                }
125            }
126
127            Err(last_err.unwrap_or_else(|| {
128                io::Error::new(
129                    io::ErrorKind::InvalidInput,
130                    "could not resolve to any address",
131                )
132            }))
133        }
134
135        /// Establishes a connection to the specified `addr`.
136        async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
137            let sys = mio::net::TcpStream::connect(addr)?;
138            TcpStream::connect_mio(sys).await
139        }
140
141        pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
142            let stream = TcpStream::new(sys)?;
143
144            // Once we've connected, wait for the stream to be writable as
145            // that's when the actual connection has been initiated. Once we're
146            // writable we check for `take_socket_error` to see if the connect
147            // actually hit an error or not.
148            //
149            // If all that succeeded then we ship everything on up.
150            poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
151
152            if let Some(e) = stream.io.take_error()? {
153                return Err(e);
154            }
155
156            Ok(stream)
157        }
158    }
159
160    pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
161        let io = PollEvented::new(connected)?;
162        Ok(TcpStream { io })
163    }
164
165    /// Creates new `TcpStream` from a `std::net::TcpStream`.
166    ///
167    /// This function is intended to be used to wrap a TCP stream from the
168    /// standard library in the Tokio equivalent.
169    ///
170    /// # Notes
171    ///
172    /// The caller is responsible for ensuring that the stream is in
173    /// non-blocking mode. Otherwise all I/O operations on the stream
174    /// will block the thread, which will cause unexpected behavior.
175    /// Non-blocking mode can be set using [`set_nonblocking`].
176    ///
177    /// Passing a listener in blocking mode is always erroneous,
178    /// and the behavior in that case may change in the future.
179    /// For example, it could panic.
180    ///
181    /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
182    ///
183    /// # Examples
184    ///
185    /// ```rust,no_run
186    /// use std::error::Error;
187    /// use tokio::net::TcpStream;
188    ///
189    /// #[tokio::main]
190    /// async fn main() -> Result<(), Box<dyn Error>> {
191    ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
192    ///     std_stream.set_nonblocking(true)?;
193    ///     let stream = TcpStream::from_std(std_stream)?;
194    ///     Ok(())
195    /// }
196    /// ```
197    ///
198    /// # Panics
199    ///
200    /// This function panics if it is not called from within a runtime with
201    /// IO enabled.
202    ///
203    /// The runtime is usually set implicitly when this function is called
204    /// from a future driven by a tokio runtime, otherwise runtime can be set
205    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
206    #[track_caller]
207    pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
208        check_socket_for_blocking(&stream)?;
209
210        let io = mio::net::TcpStream::from_std(stream);
211        let io = PollEvented::new(io)?;
212        Ok(TcpStream { io })
213    }
214
215    /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
216    ///
217    /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
218    /// Use [`set_nonblocking`] to change the blocking mode if needed.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// # if cfg!(miri) { return } // No `socket` in miri.
224    /// use std::error::Error;
225    /// use std::io::Read;
226    /// use tokio::net::TcpListener;
227    /// # use tokio::net::TcpStream;
228    /// # use tokio::io::AsyncWriteExt;
229    ///
230    /// #[tokio::main]
231    /// async fn main() -> Result<(), Box<dyn Error>> {
232    ///     let mut data = [0u8; 12];
233    /// #   if false {
234    ///     let listener = TcpListener::bind("127.0.0.1:34254").await?;
235    /// #   }
236    /// #   let listener = TcpListener::bind("127.0.0.1:0").await?;
237    /// #   let addr = listener.local_addr().unwrap();
238    /// #   let handle = tokio::spawn(async move {
239    /// #       let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
240    /// #       stream.write_all(b"Hello world!").await.unwrap();
241    /// #   });
242    ///     let (tokio_tcp_stream, _) = listener.accept().await?;
243    ///     let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
244    /// #   handle.await.expect("The task being joined has panicked");
245    ///     std_tcp_stream.set_nonblocking(false)?;
246    ///     std_tcp_stream.read_exact(&mut data)?;
247    /// #   assert_eq!(b"Hello world!", &data);
248    ///     Ok(())
249    /// }
250    /// ```
251    /// [`tokio::net::TcpStream`]: TcpStream
252    /// [`std::net::TcpStream`]: std::net::TcpStream
253    /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
254    pub fn into_std(self) -> io::Result<std::net::TcpStream> {
255        #[cfg(unix)]
256        {
257            use std::os::unix::io::{FromRawFd, IntoRawFd};
258            self.io
259                .into_inner()
260                .map(IntoRawFd::into_raw_fd)
261                .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
262        }
263
264        #[cfg(windows)]
265        {
266            use std::os::windows::io::{FromRawSocket, IntoRawSocket};
267            self.io
268                .into_inner()
269                .map(|io| io.into_raw_socket())
270                .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
271        }
272
273        #[cfg(target_os = "wasi")]
274        {
275            use std::os::wasi::io::{FromRawFd, IntoRawFd};
276            self.io
277                .into_inner()
278                .map(|io| io.into_raw_fd())
279                .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
280        }
281    }
282
283    /// Returns the local address that this stream is bound to.
284    ///
285    /// # Examples
286    ///
287    /// ```no_run
288    /// use tokio::net::TcpStream;
289    ///
290    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
291    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
292    ///
293    /// println!("{:?}", stream.local_addr()?);
294    /// # Ok(())
295    /// # }
296    /// ```
297    pub fn local_addr(&self) -> io::Result<SocketAddr> {
298        self.io.local_addr()
299    }
300
301    /// Returns the value of the `SO_ERROR` option.
302    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
303        self.io.take_error()
304    }
305
306    /// Returns the remote address that this stream is connected to.
307    ///
308    /// # Examples
309    ///
310    /// ```no_run
311    /// use tokio::net::TcpStream;
312    ///
313    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
314    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
315    ///
316    /// println!("{:?}", stream.peer_addr()?);
317    /// # Ok(())
318    /// # }
319    /// ```
320    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
321        self.io.peer_addr()
322    }
323
324    /// Attempts to receive data on the socket, without removing that data from
325    /// the queue, registering the current task for wakeup if data is not yet
326    /// available.
327    ///
328    /// Note that on multiple calls to `poll_peek`, `poll_read` or
329    /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
330    /// most recent call is scheduled to receive a wakeup. (However,
331    /// `poll_write` retains a second, independent waker.)
332    ///
333    /// # Return value
334    ///
335    /// The function returns:
336    ///
337    /// * `Poll::Pending` if data is not yet available.
338    /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
339    /// * `Poll::Ready(Err(e))` if an error is encountered.
340    ///
341    /// # Errors
342    ///
343    /// This function may encounter any standard I/O error except `WouldBlock`.
344    ///
345    /// # Examples
346    ///
347    /// ```no_run
348    /// use tokio::io::{self, ReadBuf};
349    /// use tokio::net::TcpStream;
350    ///
351    /// use std::future::poll_fn;
352    ///
353    /// #[tokio::main]
354    /// async fn main() -> io::Result<()> {
355    ///     let stream = TcpStream::connect("127.0.0.1:8000").await?;
356    ///     let mut buf = [0; 10];
357    ///     let mut buf = ReadBuf::new(&mut buf);
358    ///
359    ///     poll_fn(|cx| {
360    ///         stream.poll_peek(cx, &mut buf)
361    ///     }).await?;
362    ///
363    ///     Ok(())
364    /// }
365    /// ```
366    pub fn poll_peek(
367        &self,
368        cx: &mut Context<'_>,
369        buf: &mut ReadBuf<'_>,
370    ) -> Poll<io::Result<usize>> {
371        loop {
372            let ev = ready!(self.io.registration().poll_read_ready(cx))?;
373
374            let b = unsafe {
375                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
376            };
377
378            match self.io.peek(b) {
379                Ok(ret) => {
380                    unsafe { buf.assume_init(ret) };
381                    buf.advance(ret);
382                    return Poll::Ready(Ok(ret));
383                }
384                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
385                    self.io.registration().clear_readiness(ev);
386                }
387                Err(e) => return Poll::Ready(Err(e)),
388            }
389        }
390    }
391
392    /// Waits for any of the requested ready states.
393    ///
394    /// This function is usually paired with `try_read()` or `try_write()`. It
395    /// can be used to concurrently read / write to the same socket on a single
396    /// task without splitting the socket.
397    ///
398    /// The function may complete without the socket being ready. This is a
399    /// false-positive and attempting an operation will return with
400    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
401    /// [`Ready`] set, so you should always check the returned value and possibly
402    /// wait again if the requested states are not set.
403    ///
404    /// # Cancel safety
405    ///
406    /// This method is cancel safe. Once a readiness event occurs, the method
407    /// will continue to return immediately until the readiness event is
408    /// consumed by an attempt to read or write that fails with `WouldBlock` or
409    /// `Poll::Pending`.
410    ///
411    /// # Examples
412    ///
413    /// Concurrently read and write to the stream on the same task without
414    /// splitting.
415    ///
416    /// ```no_run
417    /// use tokio::io::Interest;
418    /// use tokio::net::TcpStream;
419    /// use std::error::Error;
420    /// use std::io;
421    ///
422    /// #[tokio::main]
423    /// async fn main() -> Result<(), Box<dyn Error>> {
424    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
425    ///
426    ///     loop {
427    ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
428    ///
429    ///         if ready.is_readable() {
430    ///             let mut data = vec![0; 1024];
431    ///             // Try to read data, this may still fail with `WouldBlock`
432    ///             // if the readiness event is a false positive.
433    ///             match stream.try_read(&mut data) {
434    ///                 Ok(n) => {
435    ///                     println!("read {} bytes", n);
436    ///                 }
437    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
438    ///                     continue;
439    ///                 }
440    ///                 Err(e) => {
441    ///                     return Err(e.into());
442    ///                 }
443    ///             }
444    ///
445    ///         }
446    ///
447    ///         if ready.is_writable() {
448    ///             // Try to write data, this may still fail with `WouldBlock`
449    ///             // if the readiness event is a false positive.
450    ///             match stream.try_write(b"hello world") {
451    ///                 Ok(n) => {
452    ///                     println!("write {} bytes", n);
453    ///                 }
454    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
455    ///                     continue
456    ///                 }
457    ///                 Err(e) => {
458    ///                     return Err(e.into());
459    ///                 }
460    ///             }
461    ///         }
462    ///     }
463    /// }
464    /// ```
465    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
466        let event = self.io.registration().readiness(interest).await?;
467        Ok(event.ready)
468    }
469
470    /// Waits for the socket to become readable.
471    ///
472    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
473    /// paired with `try_read()`.
474    ///
475    /// # Cancel safety
476    ///
477    /// This method is cancel safe. Once a readiness event occurs, the method
478    /// will continue to return immediately until the readiness event is
479    /// consumed by an attempt to read that fails with `WouldBlock` or
480    /// `Poll::Pending`.
481    ///
482    /// # Examples
483    ///
484    /// ```no_run
485    /// use tokio::net::TcpStream;
486    /// use std::error::Error;
487    /// use std::io;
488    ///
489    /// #[tokio::main]
490    /// async fn main() -> Result<(), Box<dyn Error>> {
491    ///     // Connect to a peer
492    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
493    ///
494    ///     let mut msg = vec![0; 1024];
495    ///
496    ///     loop {
497    ///         // Wait for the socket to be readable
498    ///         stream.readable().await?;
499    ///
500    ///         // Try to read data, this may still fail with `WouldBlock`
501    ///         // if the readiness event is a false positive.
502    ///         match stream.try_read(&mut msg) {
503    ///             Ok(n) => {
504    ///                 msg.truncate(n);
505    ///                 break;
506    ///             }
507    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
508    ///                 continue;
509    ///             }
510    ///             Err(e) => {
511    ///                 return Err(e.into());
512    ///             }
513    ///         }
514    ///     }
515    ///
516    ///     println!("GOT = {:?}", msg);
517    ///     Ok(())
518    /// }
519    /// ```
520    pub async fn readable(&self) -> io::Result<()> {
521        self.ready(Interest::READABLE).await?;
522        Ok(())
523    }
524
525    /// Polls for read readiness.
526    ///
527    /// If the tcp stream is not currently ready for reading, this method will
528    /// store a clone of the `Waker` from the provided `Context`. When the tcp
529    /// stream becomes ready for reading, `Waker::wake` will be called on the
530    /// waker.
531    ///
532    /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
533    /// `poll_peek`, only the `Waker` from the `Context` passed to the most
534    /// recent call is scheduled to receive a wakeup. (However,
535    /// `poll_write_ready` retains a second, independent waker.)
536    ///
537    /// This function is intended for cases where creating and pinning a future
538    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
539    /// preferred, as this supports polling from multiple tasks at once.
540    ///
541    /// # Return value
542    ///
543    /// The function returns:
544    ///
545    /// * `Poll::Pending` if the tcp stream is not ready for reading.
546    /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
547    /// * `Poll::Ready(Err(e))` if an error is encountered.
548    ///
549    /// # Errors
550    ///
551    /// This function may encounter any standard I/O error except `WouldBlock`.
552    ///
553    /// [`readable`]: method@Self::readable
554    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
555        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
556    }
557
558    /// Tries to read data from the stream into the provided buffer, returning how
559    /// many bytes were read.
560    ///
561    /// Receives any pending data from the socket but does not wait for new data
562    /// to arrive. On success, returns the number of bytes read. Because
563    /// `try_read()` is non-blocking, the buffer does not have to be stored by
564    /// the async task and can exist entirely on the stack.
565    ///
566    /// Usually, [`readable()`] or [`ready()`] is used with this function.
567    ///
568    /// [`readable()`]: TcpStream::readable()
569    /// [`ready()`]: TcpStream::ready()
570    ///
571    /// # Return
572    ///
573    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
574    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
575    ///
576    /// 1. The stream's read half is closed and will no longer yield data.
577    /// 2. The specified buffer was 0 bytes in length.
578    ///
579    /// If the stream is not ready to read data,
580    /// `Err(io::ErrorKind::WouldBlock)` is returned.
581    ///
582    /// # Examples
583    ///
584    /// ```no_run
585    /// use tokio::net::TcpStream;
586    /// use std::error::Error;
587    /// use std::io;
588    ///
589    /// #[tokio::main]
590    /// async fn main() -> Result<(), Box<dyn Error>> {
591    ///     // Connect to a peer
592    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
593    ///
594    ///     loop {
595    ///         // Wait for the socket to be readable
596    ///         stream.readable().await?;
597    ///
598    ///         // Creating the buffer **after** the `await` prevents it from
599    ///         // being stored in the async task.
600    ///         let mut buf = [0; 4096];
601    ///
602    ///         // Try to read data, this may still fail with `WouldBlock`
603    ///         // if the readiness event is a false positive.
604    ///         match stream.try_read(&mut buf) {
605    ///             Ok(0) => break,
606    ///             Ok(n) => {
607    ///                 println!("read {} bytes", n);
608    ///             }
609    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
610    ///                 continue;
611    ///             }
612    ///             Err(e) => {
613    ///                 return Err(e.into());
614    ///             }
615    ///         }
616    ///     }
617    ///
618    ///     Ok(())
619    /// }
620    /// ```
621    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
622        use std::io::Read;
623
624        self.io
625            .registration()
626            .try_io(Interest::READABLE, || (&*self.io).read(buf))
627    }
628
629    /// Tries to read data from the stream into the provided buffers, returning
630    /// how many bytes were read.
631    ///
632    /// Data is copied to fill each buffer in order, with the final buffer
633    /// written to possibly being only partially filled. This method behaves
634    /// equivalently to a single call to [`try_read()`] with concatenated
635    /// buffers.
636    ///
637    /// Receives any pending data from the socket but does not wait for new data
638    /// to arrive. On success, returns the number of bytes read. Because
639    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
640    /// stored by the async task and can exist entirely on the stack.
641    ///
642    /// Usually, [`readable()`] or [`ready()`] is used with this function.
643    ///
644    /// [`try_read()`]: TcpStream::try_read()
645    /// [`readable()`]: TcpStream::readable()
646    /// [`ready()`]: TcpStream::ready()
647    ///
648    /// # Return
649    ///
650    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
651    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
652    /// and will no longer yield data. If the stream is not ready to read data
653    /// `Err(io::ErrorKind::WouldBlock)` is returned.
654    ///
655    /// # Examples
656    ///
657    /// ```no_run
658    /// use tokio::net::TcpStream;
659    /// use std::error::Error;
660    /// use std::io::{self, IoSliceMut};
661    ///
662    /// #[tokio::main]
663    /// async fn main() -> Result<(), Box<dyn Error>> {
664    ///     // Connect to a peer
665    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
666    ///
667    ///     loop {
668    ///         // Wait for the socket to be readable
669    ///         stream.readable().await?;
670    ///
671    ///         // Creating the buffer **after** the `await` prevents it from
672    ///         // being stored in the async task.
673    ///         let mut buf_a = [0; 512];
674    ///         let mut buf_b = [0; 1024];
675    ///         let mut bufs = [
676    ///             IoSliceMut::new(&mut buf_a),
677    ///             IoSliceMut::new(&mut buf_b),
678    ///         ];
679    ///
680    ///         // Try to read data, this may still fail with `WouldBlock`
681    ///         // if the readiness event is a false positive.
682    ///         match stream.try_read_vectored(&mut bufs) {
683    ///             Ok(0) => break,
684    ///             Ok(n) => {
685    ///                 println!("read {} bytes", n);
686    ///             }
687    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
688    ///                 continue;
689    ///             }
690    ///             Err(e) => {
691    ///                 return Err(e.into());
692    ///             }
693    ///         }
694    ///     }
695    ///
696    ///     Ok(())
697    /// }
698    /// ```
699    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
700        use std::io::Read;
701
702        self.io
703            .registration()
704            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
705    }
706
707    cfg_io_util! {
708        /// Tries to read data from the stream into the provided buffer, advancing the
709        /// buffer's internal cursor, returning how many bytes were read.
710        ///
711        /// Receives any pending data from the socket but does not wait for new data
712        /// to arrive. On success, returns the number of bytes read. Because
713        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
714        /// the async task and can exist entirely on the stack.
715        ///
716        /// Usually, [`readable()`] or [`ready()`] is used with this function.
717        ///
718        /// [`readable()`]: TcpStream::readable()
719        /// [`ready()`]: TcpStream::ready()
720        ///
721        /// # Return
722        ///
723        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
724        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
725        /// and will no longer yield data. If the stream is not ready to read data
726        /// `Err(io::ErrorKind::WouldBlock)` is returned.
727        ///
728        /// # Examples
729        ///
730        /// ```no_run
731        /// use tokio::net::TcpStream;
732        /// use std::error::Error;
733        /// use std::io;
734        ///
735        /// #[tokio::main]
736        /// async fn main() -> Result<(), Box<dyn Error>> {
737        ///     // Connect to a peer
738        ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
739        ///
740        ///     loop {
741        ///         // Wait for the socket to be readable
742        ///         stream.readable().await?;
743        ///
744        ///         let mut buf = Vec::with_capacity(4096);
745        ///
746        ///         // Try to read data, this may still fail with `WouldBlock`
747        ///         // if the readiness event is a false positive.
748        ///         match stream.try_read_buf(&mut buf) {
749        ///             Ok(0) => break,
750        ///             Ok(n) => {
751        ///                 println!("read {} bytes", n);
752        ///             }
753        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
754        ///                 continue;
755        ///             }
756        ///             Err(e) => {
757        ///                 return Err(e.into());
758        ///             }
759        ///         }
760        ///     }
761        ///
762        ///     Ok(())
763        /// }
764        /// ```
765        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
766            self.io.registration().try_io(Interest::READABLE, || {
767                use std::io::Read;
768
769                let dst = buf.chunk_mut();
770                let dst =
771                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
772
773                // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
774                // buffer.
775                let n = (&*self.io).read(dst)?;
776
777                unsafe {
778                    buf.advance_mut(n);
779                }
780
781                Ok(n)
782            })
783        }
784    }
785
786    /// Waits for the socket to become writable.
787    ///
788    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
789    /// paired with `try_write()`.
790    ///
791    /// # Cancel safety
792    ///
793    /// This method is cancel safe. Once a readiness event occurs, the method
794    /// will continue to return immediately until the readiness event is
795    /// consumed by an attempt to write that fails with `WouldBlock` or
796    /// `Poll::Pending`.
797    ///
798    /// # Examples
799    ///
800    /// ```no_run
801    /// use tokio::net::TcpStream;
802    /// use std::error::Error;
803    /// use std::io;
804    ///
805    /// #[tokio::main]
806    /// async fn main() -> Result<(), Box<dyn Error>> {
807    ///     // Connect to a peer
808    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
809    ///
810    ///     loop {
811    ///         // Wait for the socket to be writable
812    ///         stream.writable().await?;
813    ///
814    ///         // Try to write data, this may still fail with `WouldBlock`
815    ///         // if the readiness event is a false positive.
816    ///         match stream.try_write(b"hello world") {
817    ///             Ok(n) => {
818    ///                 break;
819    ///             }
820    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
821    ///                 continue;
822    ///             }
823    ///             Err(e) => {
824    ///                 return Err(e.into());
825    ///             }
826    ///         }
827    ///     }
828    ///
829    ///     Ok(())
830    /// }
831    /// ```
832    pub async fn writable(&self) -> io::Result<()> {
833        self.ready(Interest::WRITABLE).await?;
834        Ok(())
835    }
836
837    /// Polls for write readiness.
838    ///
839    /// If the tcp stream is not currently ready for writing, this method will
840    /// store a clone of the `Waker` from the provided `Context`. When the tcp
841    /// stream becomes ready for writing, `Waker::wake` will be called on the
842    /// waker.
843    ///
844    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
845    /// the `Waker` from the `Context` passed to the most recent call is
846    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
847    /// second, independent waker.)
848    ///
849    /// This function is intended for cases where creating and pinning a future
850    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
851    /// preferred, as this supports polling from multiple tasks at once.
852    ///
853    /// # Return value
854    ///
855    /// The function returns:
856    ///
857    /// * `Poll::Pending` if the tcp stream is not ready for writing.
858    /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
859    /// * `Poll::Ready(Err(e))` if an error is encountered.
860    ///
861    /// # Errors
862    ///
863    /// This function may encounter any standard I/O error except `WouldBlock`.
864    ///
865    /// [`writable`]: method@Self::writable
866    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
867        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
868    }
869
870    /// Try to write a buffer to the stream, returning how many bytes were
871    /// written.
872    ///
873    /// The function will attempt to write the entire contents of `buf`, but
874    /// only part of the buffer may be written.
875    ///
876    /// This function is usually paired with `writable()`.
877    ///
878    /// # Return
879    ///
880    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
881    /// number of bytes written. If the stream is not ready to write data,
882    /// `Err(io::ErrorKind::WouldBlock)` is returned.
883    ///
884    /// # Examples
885    ///
886    /// ```no_run
887    /// use tokio::net::TcpStream;
888    /// use std::error::Error;
889    /// use std::io;
890    ///
891    /// #[tokio::main]
892    /// async fn main() -> Result<(), Box<dyn Error>> {
893    ///     // Connect to a peer
894    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
895    ///
896    ///     loop {
897    ///         // Wait for the socket to be writable
898    ///         stream.writable().await?;
899    ///
900    ///         // Try to write data, this may still fail with `WouldBlock`
901    ///         // if the readiness event is a false positive.
902    ///         match stream.try_write(b"hello world") {
903    ///             Ok(n) => {
904    ///                 break;
905    ///             }
906    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
907    ///                 continue;
908    ///             }
909    ///             Err(e) => {
910    ///                 return Err(e.into());
911    ///             }
912    ///         }
913    ///     }
914    ///
915    ///     Ok(())
916    /// }
917    /// ```
918    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
919        use std::io::Write;
920
921        self.io
922            .registration()
923            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
924    }
925
926    /// Tries to write several buffers to the stream, returning how many bytes
927    /// were written.
928    ///
929    /// Data is written from each buffer in order, with the final buffer read
930    /// from possibly being only partially consumed. This method behaves
931    /// equivalently to a single call to [`try_write()`] with concatenated
932    /// buffers.
933    ///
934    /// This function is usually paired with `writable()`.
935    ///
936    /// [`try_write()`]: TcpStream::try_write()
937    ///
938    /// # Return
939    ///
940    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
941    /// number of bytes written. If the stream is not ready to write data,
942    /// `Err(io::ErrorKind::WouldBlock)` is returned.
943    ///
944    /// # Examples
945    ///
946    /// ```no_run
947    /// use tokio::net::TcpStream;
948    /// use std::error::Error;
949    /// use std::io;
950    ///
951    /// #[tokio::main]
952    /// async fn main() -> Result<(), Box<dyn Error>> {
953    ///     // Connect to a peer
954    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
955    ///
956    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
957    ///
958    ///     loop {
959    ///         // Wait for the socket to be writable
960    ///         stream.writable().await?;
961    ///
962    ///         // Try to write data, this may still fail with `WouldBlock`
963    ///         // if the readiness event is a false positive.
964    ///         match stream.try_write_vectored(&bufs) {
965    ///             Ok(n) => {
966    ///                 break;
967    ///             }
968    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
969    ///                 continue;
970    ///             }
971    ///             Err(e) => {
972    ///                 return Err(e.into());
973    ///             }
974    ///         }
975    ///     }
976    ///
977    ///     Ok(())
978    /// }
979    /// ```
980    pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
981        use std::io::Write;
982
983        self.io
984            .registration()
985            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
986    }
987
988    /// Tries to read or write from the socket using a user-provided IO operation.
989    ///
990    /// If the socket is ready, the provided closure is called. The closure
991    /// should attempt to perform IO operation on the socket by manually
992    /// calling the appropriate syscall. If the operation fails because the
993    /// socket is not actually ready, then the closure should return a
994    /// `WouldBlock` error and the readiness flag is cleared. The return value
995    /// of the closure is then returned by `try_io`.
996    ///
997    /// If the socket is not ready, then the closure is not called
998    /// and a `WouldBlock` error is returned.
999    ///
1000    /// The closure should only return a `WouldBlock` error if it has performed
1001    /// an IO operation on the socket that failed due to the socket not being
1002    /// ready. Returning a `WouldBlock` error in any other situation will
1003    /// incorrectly clear the readiness flag, which can cause the socket to
1004    /// behave incorrectly.
1005    ///
1006    /// The closure should not perform the IO operation using any of the methods
1007    /// defined on the Tokio `TcpStream` type, as this will mess with the
1008    /// readiness flag and can cause the socket to behave incorrectly.
1009    ///
1010    /// This method is not intended to be used with combined interests.
1011    /// The closure should perform only one type of IO operation, so it should not
1012    /// require more than one ready state. This method may panic or sleep forever
1013    /// if it is called with a combined interest.
1014    ///
1015    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1016    ///
1017    /// [`readable()`]: TcpStream::readable()
1018    /// [`writable()`]: TcpStream::writable()
1019    /// [`ready()`]: TcpStream::ready()
1020    pub fn try_io<R>(
1021        &self,
1022        interest: Interest,
1023        f: impl FnOnce() -> io::Result<R>,
1024    ) -> io::Result<R> {
1025        self.io
1026            .registration()
1027            .try_io(interest, || self.io.try_io(f))
1028    }
1029
1030    /// Reads or writes from the socket using a user-provided IO operation.
1031    ///
1032    /// The readiness of the socket is awaited and when the socket is ready,
1033    /// the provided closure is called. The closure should attempt to perform
1034    /// IO operation on the socket by manually calling the appropriate syscall.
1035    /// If the operation fails because the socket is not actually ready,
1036    /// then the closure should return a `WouldBlock` error. In such case the
1037    /// readiness flag is cleared and the socket readiness is awaited again.
1038    /// This loop is repeated until the closure returns an `Ok` or an error
1039    /// other than `WouldBlock`.
1040    ///
1041    /// The closure should only return a `WouldBlock` error if it has performed
1042    /// an IO operation on the socket that failed due to the socket not being
1043    /// ready. Returning a `WouldBlock` error in any other situation will
1044    /// incorrectly clear the readiness flag, which can cause the socket to
1045    /// behave incorrectly.
1046    ///
1047    /// The closure should not perform the IO operation using any of the methods
1048    /// defined on the Tokio `TcpStream` type, as this will mess with the
1049    /// readiness flag and can cause the socket to behave incorrectly.
1050    ///
1051    /// This method is not intended to be used with combined interests.
1052    /// The closure should perform only one type of IO operation, so it should not
1053    /// require more than one ready state. This method may panic or sleep forever
1054    /// if it is called with a combined interest.
1055    pub async fn async_io<R>(
1056        &self,
1057        interest: Interest,
1058        mut f: impl FnMut() -> io::Result<R>,
1059    ) -> io::Result<R> {
1060        self.io
1061            .registration()
1062            .async_io(interest, || self.io.try_io(&mut f))
1063            .await
1064    }
1065
1066    /// Receives data on the socket from the remote address to which it is
1067    /// connected, without removing that data from the queue. On success,
1068    /// returns the number of bytes peeked.
1069    ///
1070    /// Successive calls return the same data. This is accomplished by passing
1071    /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1072    ///
1073    /// # Examples
1074    ///
1075    /// ```no_run
1076    /// use tokio::net::TcpStream;
1077    /// use tokio::io::AsyncReadExt;
1078    /// use std::error::Error;
1079    ///
1080    /// #[tokio::main]
1081    /// async fn main() -> Result<(), Box<dyn Error>> {
1082    ///     // Connect to a peer
1083    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1084    ///
1085    ///     let mut b1 = [0; 10];
1086    ///     let mut b2 = [0; 10];
1087    ///
1088    ///     // Peek at the data
1089    ///     let n = stream.peek(&mut b1).await?;
1090    ///
1091    ///     // Read the data
1092    ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
1093    ///     assert_eq!(&b1[..n], &b2[..n]);
1094    ///
1095    ///     Ok(())
1096    /// }
1097    /// ```
1098    ///
1099    /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1100    ///
1101    /// [`read`]: fn@crate::io::AsyncReadExt::read
1102    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1103    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1104        self.io
1105            .registration()
1106            .async_io(Interest::READABLE, || self.io.peek(buf))
1107            .await
1108    }
1109
1110    /// Shuts down the read, write, or both halves of this connection.
1111    ///
1112    /// This function will cause all pending and future I/O on the specified
1113    /// portions to return immediately with an appropriate value (see the
1114    /// documentation of `Shutdown`).
1115    pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1116        self.io.shutdown(how)
1117    }
1118
1119    /// Gets the value of the `TCP_NODELAY` option on this socket.
1120    ///
1121    /// For more information about this option, see [`set_nodelay`].
1122    ///
1123    /// [`set_nodelay`]: TcpStream::set_nodelay
1124    ///
1125    /// # Examples
1126    ///
1127    /// ```no_run
1128    /// use tokio::net::TcpStream;
1129    ///
1130    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1131    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1132    ///
1133    /// println!("{:?}", stream.nodelay()?);
1134    /// # Ok(())
1135    /// # }
1136    /// ```
1137    pub fn nodelay(&self) -> io::Result<bool> {
1138        self.io.nodelay()
1139    }
1140
1141    /// Sets the value of the `TCP_NODELAY` option on this socket.
1142    ///
1143    /// If set, this option disables the Nagle algorithm. This means that
1144    /// segments are always sent as soon as possible, even if there is only a
1145    /// small amount of data. When not set, data is buffered until there is a
1146    /// sufficient amount to send out, thereby avoiding the frequent sending of
1147    /// small packets.
1148    ///
1149    /// # Examples
1150    ///
1151    /// ```no_run
1152    /// use tokio::net::TcpStream;
1153    ///
1154    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1155    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1156    ///
1157    /// stream.set_nodelay(true)?;
1158    /// # Ok(())
1159    /// # }
1160    /// ```
1161    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1162        self.io.set_nodelay(nodelay)
1163    }
1164
1165    cfg_not_wasi! {
1166        /// Reads the linger duration for this socket by getting the `SO_LINGER`
1167        /// option.
1168        ///
1169        /// For more information about this option, see [`set_linger`].
1170        ///
1171        /// [`set_linger`]: TcpStream::set_linger
1172        ///
1173        /// # Examples
1174        ///
1175        /// ```no_run
1176        /// use tokio::net::TcpStream;
1177        ///
1178        /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1179        /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1180        ///
1181        /// println!("{:?}", stream.linger()?);
1182        /// # Ok(())
1183        /// # }
1184        /// ```
1185        pub fn linger(&self) -> io::Result<Option<Duration>> {
1186            socket2::SockRef::from(self).linger()
1187        }
1188
1189        /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1190        ///
1191        /// This option controls the action taken when a stream has unsent messages and the stream is
1192        /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1193        /// data or until the time expires.
1194        ///
1195        /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1196        /// way that allows the process to continue as quickly as possible.
1197        ///
1198        /// # Examples
1199        ///
1200        /// ```no_run
1201        /// use tokio::net::TcpStream;
1202        ///
1203        /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1204        /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1205        ///
1206        /// stream.set_linger(None)?;
1207        /// # Ok(())
1208        /// # }
1209        /// ```
1210        pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1211            socket2::SockRef::from(self).set_linger(dur)
1212        }
1213    }
1214
1215    /// Gets the value of the `IP_TTL` option for this socket.
1216    ///
1217    /// For more information about this option, see [`set_ttl`].
1218    ///
1219    /// [`set_ttl`]: TcpStream::set_ttl
1220    ///
1221    /// # Examples
1222    ///
1223    /// ```no_run
1224    /// use tokio::net::TcpStream;
1225    ///
1226    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1227    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1228    ///
1229    /// println!("{:?}", stream.ttl()?);
1230    /// # Ok(())
1231    /// # }
1232    /// ```
1233    pub fn ttl(&self) -> io::Result<u32> {
1234        self.io.ttl()
1235    }
1236
1237    /// Sets the value for the `IP_TTL` option on this socket.
1238    ///
1239    /// This value sets the time-to-live field that is used in every packet sent
1240    /// from this socket.
1241    ///
1242    /// # Examples
1243    ///
1244    /// ```no_run
1245    /// use tokio::net::TcpStream;
1246    ///
1247    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1248    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1249    ///
1250    /// stream.set_ttl(123)?;
1251    /// # Ok(())
1252    /// # }
1253    /// ```
1254    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1255        self.io.set_ttl(ttl)
1256    }
1257
1258    // These lifetime markers also appear in the generated documentation, and make
1259    // it more clear that this is a *borrowed* split.
1260    #[allow(clippy::needless_lifetimes)]
1261    /// Splits a `TcpStream` into a read half and a write half, which can be used
1262    /// to read and write the stream concurrently.
1263    ///
1264    /// This method is more efficient than [`into_split`], but the halves cannot be
1265    /// moved into independently spawned tasks.
1266    ///
1267    /// [`into_split`]: TcpStream::into_split()
1268    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1269        split(self)
1270    }
1271
1272    /// Splits a `TcpStream` into a read half and a write half, which can be used
1273    /// to read and write the stream concurrently.
1274    ///
1275    /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1276    /// this comes at the cost of a heap allocation.
1277    ///
1278    /// **Note:** Dropping the write half will shut down the write half of the TCP
1279    /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1280    ///
1281    /// [`split`]: TcpStream::split()
1282    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1283    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1284        split_owned(self)
1285    }
1286
1287    // == Poll IO functions that takes `&self` ==
1288    //
1289    // To read or write without mutable access to the `TcpStream`, combine the
1290    // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1291    // `try_write` methods.
1292
1293    pub(crate) fn poll_read_priv(
1294        &self,
1295        cx: &mut Context<'_>,
1296        buf: &mut ReadBuf<'_>,
1297    ) -> Poll<io::Result<()>> {
1298        // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1299        unsafe { self.io.poll_read(cx, buf) }
1300    }
1301
1302    pub(super) fn poll_write_priv(
1303        &self,
1304        cx: &mut Context<'_>,
1305        buf: &[u8],
1306    ) -> Poll<io::Result<usize>> {
1307        self.io.poll_write(cx, buf)
1308    }
1309
1310    pub(super) fn poll_write_vectored_priv(
1311        &self,
1312        cx: &mut Context<'_>,
1313        bufs: &[io::IoSlice<'_>],
1314    ) -> Poll<io::Result<usize>> {
1315        self.io.poll_write_vectored(cx, bufs)
1316    }
1317}
1318
1319impl TryFrom<std::net::TcpStream> for TcpStream {
1320    type Error = io::Error;
1321
1322    /// Consumes stream, returning the tokio I/O object.
1323    ///
1324    /// This is equivalent to
1325    /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1326    fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1327        Self::from_std(stream)
1328    }
1329}
1330
1331// ===== impl Read / Write =====
1332
1333impl AsyncRead for TcpStream {
1334    fn poll_read(
1335        self: Pin<&mut Self>,
1336        cx: &mut Context<'_>,
1337        buf: &mut ReadBuf<'_>,
1338    ) -> Poll<io::Result<()>> {
1339        self.poll_read_priv(cx, buf)
1340    }
1341}
1342
1343impl AsyncWrite for TcpStream {
1344    fn poll_write(
1345        self: Pin<&mut Self>,
1346        cx: &mut Context<'_>,
1347        buf: &[u8],
1348    ) -> Poll<io::Result<usize>> {
1349        self.poll_write_priv(cx, buf)
1350    }
1351
1352    fn poll_write_vectored(
1353        self: Pin<&mut Self>,
1354        cx: &mut Context<'_>,
1355        bufs: &[io::IoSlice<'_>],
1356    ) -> Poll<io::Result<usize>> {
1357        self.poll_write_vectored_priv(cx, bufs)
1358    }
1359
1360    fn is_write_vectored(&self) -> bool {
1361        true
1362    }
1363
1364    #[inline]
1365    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1366        // tcp flush is a no-op
1367        Poll::Ready(Ok(()))
1368    }
1369
1370    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1371        self.shutdown_std(std::net::Shutdown::Write)?;
1372        Poll::Ready(Ok(()))
1373    }
1374}
1375
1376impl fmt::Debug for TcpStream {
1377    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1378        self.io.fmt(f)
1379    }
1380}
1381
1382#[cfg(unix)]
1383mod sys {
1384    use super::TcpStream;
1385    use std::os::unix::prelude::*;
1386
1387    impl AsRawFd for TcpStream {
1388        fn as_raw_fd(&self) -> RawFd {
1389            self.io.as_raw_fd()
1390        }
1391    }
1392
1393    impl AsFd for TcpStream {
1394        fn as_fd(&self) -> BorrowedFd<'_> {
1395            unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1396        }
1397    }
1398}
1399
1400cfg_windows! {
1401    use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1402
1403    impl AsRawSocket for TcpStream {
1404        fn as_raw_socket(&self) -> RawSocket {
1405            self.io.as_raw_socket()
1406        }
1407    }
1408
1409    impl AsSocket for TcpStream {
1410        fn as_socket(&self) -> BorrowedSocket<'_> {
1411            unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1412        }
1413    }
1414}
1415
1416#[cfg(all(tokio_unstable, target_os = "wasi"))]
1417mod sys {
1418    use super::TcpStream;
1419    use std::os::wasi::prelude::*;
1420
1421    impl AsRawFd for TcpStream {
1422        fn as_raw_fd(&self) -> RawFd {
1423            self.io.as_raw_fd()
1424        }
1425    }
1426
1427    impl AsFd for TcpStream {
1428        fn as_fd(&self) -> BorrowedFd<'_> {
1429            unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1430        }
1431    }
1432}