tokio/net/tcp/stream.rs
1cfg_not_wasi! {
2 use crate::net::{to_socket_addrs, ToSocketAddrs};
3 use std::future::poll_fn;
4 use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10use crate::util::check_socket_for_blocking;
11
12use std::fmt;
13use std::io;
14use std::net::{Shutdown, SocketAddr};
15use std::pin::Pin;
16use std::task::{ready, Context, Poll};
17
18cfg_io_util! {
19 use bytes::BufMut;
20}
21
22cfg_net! {
23 /// A TCP stream between a local and a remote socket.
24 ///
25 /// A TCP stream can either be created by connecting to an endpoint, via the
26 /// [`connect`] method, or by [accepting] a connection from a [listener]. A
27 /// TCP stream can also be created via the [`TcpSocket`] type.
28 ///
29 /// Reading and writing to a `TcpStream` is usually done using the
30 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
31 /// traits.
32 ///
33 /// [`connect`]: method@TcpStream::connect
34 /// [accepting]: method@crate::net::TcpListener::accept
35 /// [listener]: struct@crate::net::TcpListener
36 /// [`TcpSocket`]: struct@crate::net::TcpSocket
37 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
38 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
39 ///
40 /// # Examples
41 ///
42 /// ```no_run
43 /// use tokio::net::TcpStream;
44 /// use tokio::io::AsyncWriteExt;
45 /// use std::error::Error;
46 ///
47 /// #[tokio::main]
48 /// async fn main() -> Result<(), Box<dyn Error>> {
49 /// // Connect to a peer
50 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
51 ///
52 /// // Write some data.
53 /// stream.write_all(b"hello world!").await?;
54 ///
55 /// Ok(())
56 /// }
57 /// ```
58 ///
59 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
60 ///
61 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
62 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
63 ///
64 /// To shut down the stream in the write direction, you can call the
65 /// [`shutdown()`] method. This will cause the other peer to receive a read of
66 /// length 0, indicating that no more data will be sent. This only closes
67 /// the stream in one direction.
68 ///
69 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
70 pub struct TcpStream {
71 io: PollEvented<mio::net::TcpStream>,
72 }
73}
74
75impl TcpStream {
76 cfg_not_wasi! {
77 /// Opens a TCP connection to a remote host.
78 ///
79 /// `addr` is an address of the remote host. Anything which implements the
80 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
81 /// yields multiple addresses, connect will be attempted with each of the
82 /// addresses until a connection is successful. If none of the addresses
83 /// result in a successful connection, the error returned from the last
84 /// connection attempt (the last address) is returned.
85 ///
86 /// To configure the socket before connecting, you can use the [`TcpSocket`]
87 /// type.
88 ///
89 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
90 /// [`TcpSocket`]: struct@crate::net::TcpSocket
91 ///
92 /// # Examples
93 ///
94 /// ```no_run
95 /// use tokio::net::TcpStream;
96 /// use tokio::io::AsyncWriteExt;
97 /// use std::error::Error;
98 ///
99 /// #[tokio::main]
100 /// async fn main() -> Result<(), Box<dyn Error>> {
101 /// // Connect to a peer
102 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
103 ///
104 /// // Write some data.
105 /// stream.write_all(b"hello world!").await?;
106 ///
107 /// Ok(())
108 /// }
109 /// ```
110 ///
111 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
112 ///
113 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
114 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
115 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
116 let addrs = to_socket_addrs(addr).await?;
117
118 let mut last_err = None;
119
120 for addr in addrs {
121 match TcpStream::connect_addr(addr).await {
122 Ok(stream) => return Ok(stream),
123 Err(e) => last_err = Some(e),
124 }
125 }
126
127 Err(last_err.unwrap_or_else(|| {
128 io::Error::new(
129 io::ErrorKind::InvalidInput,
130 "could not resolve to any address",
131 )
132 }))
133 }
134
135 /// Establishes a connection to the specified `addr`.
136 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
137 let sys = mio::net::TcpStream::connect(addr)?;
138 TcpStream::connect_mio(sys).await
139 }
140
141 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
142 let stream = TcpStream::new(sys)?;
143
144 // Once we've connected, wait for the stream to be writable as
145 // that's when the actual connection has been initiated. Once we're
146 // writable we check for `take_socket_error` to see if the connect
147 // actually hit an error or not.
148 //
149 // If all that succeeded then we ship everything on up.
150 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
151
152 if let Some(e) = stream.io.take_error()? {
153 return Err(e);
154 }
155
156 Ok(stream)
157 }
158 }
159
160 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
161 let io = PollEvented::new(connected)?;
162 Ok(TcpStream { io })
163 }
164
165 /// Creates new `TcpStream` from a `std::net::TcpStream`.
166 ///
167 /// This function is intended to be used to wrap a TCP stream from the
168 /// standard library in the Tokio equivalent.
169 ///
170 /// # Notes
171 ///
172 /// The caller is responsible for ensuring that the stream is in
173 /// non-blocking mode. Otherwise all I/O operations on the stream
174 /// will block the thread, which will cause unexpected behavior.
175 /// Non-blocking mode can be set using [`set_nonblocking`].
176 ///
177 /// Passing a listener in blocking mode is always erroneous,
178 /// and the behavior in that case may change in the future.
179 /// For example, it could panic.
180 ///
181 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
182 ///
183 /// # Examples
184 ///
185 /// ```rust,no_run
186 /// use std::error::Error;
187 /// use tokio::net::TcpStream;
188 ///
189 /// #[tokio::main]
190 /// async fn main() -> Result<(), Box<dyn Error>> {
191 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
192 /// std_stream.set_nonblocking(true)?;
193 /// let stream = TcpStream::from_std(std_stream)?;
194 /// Ok(())
195 /// }
196 /// ```
197 ///
198 /// # Panics
199 ///
200 /// This function panics if it is not called from within a runtime with
201 /// IO enabled.
202 ///
203 /// The runtime is usually set implicitly when this function is called
204 /// from a future driven by a tokio runtime, otherwise runtime can be set
205 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
206 #[track_caller]
207 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
208 check_socket_for_blocking(&stream)?;
209
210 let io = mio::net::TcpStream::from_std(stream);
211 let io = PollEvented::new(io)?;
212 Ok(TcpStream { io })
213 }
214
215 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
216 ///
217 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
218 /// Use [`set_nonblocking`] to change the blocking mode if needed.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// # if cfg!(miri) { return } // No `socket` in miri.
224 /// use std::error::Error;
225 /// use std::io::Read;
226 /// use tokio::net::TcpListener;
227 /// # use tokio::net::TcpStream;
228 /// # use tokio::io::AsyncWriteExt;
229 ///
230 /// #[tokio::main]
231 /// async fn main() -> Result<(), Box<dyn Error>> {
232 /// let mut data = [0u8; 12];
233 /// # if false {
234 /// let listener = TcpListener::bind("127.0.0.1:34254").await?;
235 /// # }
236 /// # let listener = TcpListener::bind("127.0.0.1:0").await?;
237 /// # let addr = listener.local_addr().unwrap();
238 /// # let handle = tokio::spawn(async move {
239 /// # let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
240 /// # stream.write_all(b"Hello world!").await.unwrap();
241 /// # });
242 /// let (tokio_tcp_stream, _) = listener.accept().await?;
243 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
244 /// # handle.await.expect("The task being joined has panicked");
245 /// std_tcp_stream.set_nonblocking(false)?;
246 /// std_tcp_stream.read_exact(&mut data)?;
247 /// # assert_eq!(b"Hello world!", &data);
248 /// Ok(())
249 /// }
250 /// ```
251 /// [`tokio::net::TcpStream`]: TcpStream
252 /// [`std::net::TcpStream`]: std::net::TcpStream
253 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
254 pub fn into_std(self) -> io::Result<std::net::TcpStream> {
255 #[cfg(unix)]
256 {
257 use std::os::unix::io::{FromRawFd, IntoRawFd};
258 self.io
259 .into_inner()
260 .map(IntoRawFd::into_raw_fd)
261 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
262 }
263
264 #[cfg(windows)]
265 {
266 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
267 self.io
268 .into_inner()
269 .map(|io| io.into_raw_socket())
270 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
271 }
272
273 #[cfg(target_os = "wasi")]
274 {
275 use std::os::wasi::io::{FromRawFd, IntoRawFd};
276 self.io
277 .into_inner()
278 .map(|io| io.into_raw_fd())
279 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
280 }
281 }
282
283 /// Returns the local address that this stream is bound to.
284 ///
285 /// # Examples
286 ///
287 /// ```no_run
288 /// use tokio::net::TcpStream;
289 ///
290 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
291 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
292 ///
293 /// println!("{:?}", stream.local_addr()?);
294 /// # Ok(())
295 /// # }
296 /// ```
297 pub fn local_addr(&self) -> io::Result<SocketAddr> {
298 self.io.local_addr()
299 }
300
301 /// Returns the value of the `SO_ERROR` option.
302 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
303 self.io.take_error()
304 }
305
306 /// Returns the remote address that this stream is connected to.
307 ///
308 /// # Examples
309 ///
310 /// ```no_run
311 /// use tokio::net::TcpStream;
312 ///
313 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
314 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
315 ///
316 /// println!("{:?}", stream.peer_addr()?);
317 /// # Ok(())
318 /// # }
319 /// ```
320 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
321 self.io.peer_addr()
322 }
323
324 /// Attempts to receive data on the socket, without removing that data from
325 /// the queue, registering the current task for wakeup if data is not yet
326 /// available.
327 ///
328 /// Note that on multiple calls to `poll_peek`, `poll_read` or
329 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
330 /// most recent call is scheduled to receive a wakeup. (However,
331 /// `poll_write` retains a second, independent waker.)
332 ///
333 /// # Return value
334 ///
335 /// The function returns:
336 ///
337 /// * `Poll::Pending` if data is not yet available.
338 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
339 /// * `Poll::Ready(Err(e))` if an error is encountered.
340 ///
341 /// # Errors
342 ///
343 /// This function may encounter any standard I/O error except `WouldBlock`.
344 ///
345 /// # Examples
346 ///
347 /// ```no_run
348 /// use tokio::io::{self, ReadBuf};
349 /// use tokio::net::TcpStream;
350 ///
351 /// use std::future::poll_fn;
352 ///
353 /// #[tokio::main]
354 /// async fn main() -> io::Result<()> {
355 /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
356 /// let mut buf = [0; 10];
357 /// let mut buf = ReadBuf::new(&mut buf);
358 ///
359 /// poll_fn(|cx| {
360 /// stream.poll_peek(cx, &mut buf)
361 /// }).await?;
362 ///
363 /// Ok(())
364 /// }
365 /// ```
366 pub fn poll_peek(
367 &self,
368 cx: &mut Context<'_>,
369 buf: &mut ReadBuf<'_>,
370 ) -> Poll<io::Result<usize>> {
371 loop {
372 let ev = ready!(self.io.registration().poll_read_ready(cx))?;
373
374 let b = unsafe {
375 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
376 };
377
378 match self.io.peek(b) {
379 Ok(ret) => {
380 unsafe { buf.assume_init(ret) };
381 buf.advance(ret);
382 return Poll::Ready(Ok(ret));
383 }
384 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
385 self.io.registration().clear_readiness(ev);
386 }
387 Err(e) => return Poll::Ready(Err(e)),
388 }
389 }
390 }
391
392 /// Waits for any of the requested ready states.
393 ///
394 /// This function is usually paired with `try_read()` or `try_write()`. It
395 /// can be used to concurrently read / write to the same socket on a single
396 /// task without splitting the socket.
397 ///
398 /// The function may complete without the socket being ready. This is a
399 /// false-positive and attempting an operation will return with
400 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
401 /// [`Ready`] set, so you should always check the returned value and possibly
402 /// wait again if the requested states are not set.
403 ///
404 /// # Cancel safety
405 ///
406 /// This method is cancel safe. Once a readiness event occurs, the method
407 /// will continue to return immediately until the readiness event is
408 /// consumed by an attempt to read or write that fails with `WouldBlock` or
409 /// `Poll::Pending`.
410 ///
411 /// # Examples
412 ///
413 /// Concurrently read and write to the stream on the same task without
414 /// splitting.
415 ///
416 /// ```no_run
417 /// use tokio::io::Interest;
418 /// use tokio::net::TcpStream;
419 /// use std::error::Error;
420 /// use std::io;
421 ///
422 /// #[tokio::main]
423 /// async fn main() -> Result<(), Box<dyn Error>> {
424 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
425 ///
426 /// loop {
427 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
428 ///
429 /// if ready.is_readable() {
430 /// let mut data = vec![0; 1024];
431 /// // Try to read data, this may still fail with `WouldBlock`
432 /// // if the readiness event is a false positive.
433 /// match stream.try_read(&mut data) {
434 /// Ok(n) => {
435 /// println!("read {} bytes", n);
436 /// }
437 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
438 /// continue;
439 /// }
440 /// Err(e) => {
441 /// return Err(e.into());
442 /// }
443 /// }
444 ///
445 /// }
446 ///
447 /// if ready.is_writable() {
448 /// // Try to write data, this may still fail with `WouldBlock`
449 /// // if the readiness event is a false positive.
450 /// match stream.try_write(b"hello world") {
451 /// Ok(n) => {
452 /// println!("write {} bytes", n);
453 /// }
454 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
455 /// continue
456 /// }
457 /// Err(e) => {
458 /// return Err(e.into());
459 /// }
460 /// }
461 /// }
462 /// }
463 /// }
464 /// ```
465 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
466 let event = self.io.registration().readiness(interest).await?;
467 Ok(event.ready)
468 }
469
470 /// Waits for the socket to become readable.
471 ///
472 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
473 /// paired with `try_read()`.
474 ///
475 /// # Cancel safety
476 ///
477 /// This method is cancel safe. Once a readiness event occurs, the method
478 /// will continue to return immediately until the readiness event is
479 /// consumed by an attempt to read that fails with `WouldBlock` or
480 /// `Poll::Pending`.
481 ///
482 /// # Examples
483 ///
484 /// ```no_run
485 /// use tokio::net::TcpStream;
486 /// use std::error::Error;
487 /// use std::io;
488 ///
489 /// #[tokio::main]
490 /// async fn main() -> Result<(), Box<dyn Error>> {
491 /// // Connect to a peer
492 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
493 ///
494 /// let mut msg = vec![0; 1024];
495 ///
496 /// loop {
497 /// // Wait for the socket to be readable
498 /// stream.readable().await?;
499 ///
500 /// // Try to read data, this may still fail with `WouldBlock`
501 /// // if the readiness event is a false positive.
502 /// match stream.try_read(&mut msg) {
503 /// Ok(n) => {
504 /// msg.truncate(n);
505 /// break;
506 /// }
507 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
508 /// continue;
509 /// }
510 /// Err(e) => {
511 /// return Err(e.into());
512 /// }
513 /// }
514 /// }
515 ///
516 /// println!("GOT = {:?}", msg);
517 /// Ok(())
518 /// }
519 /// ```
520 pub async fn readable(&self) -> io::Result<()> {
521 self.ready(Interest::READABLE).await?;
522 Ok(())
523 }
524
525 /// Polls for read readiness.
526 ///
527 /// If the tcp stream is not currently ready for reading, this method will
528 /// store a clone of the `Waker` from the provided `Context`. When the tcp
529 /// stream becomes ready for reading, `Waker::wake` will be called on the
530 /// waker.
531 ///
532 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
533 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
534 /// recent call is scheduled to receive a wakeup. (However,
535 /// `poll_write_ready` retains a second, independent waker.)
536 ///
537 /// This function is intended for cases where creating and pinning a future
538 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
539 /// preferred, as this supports polling from multiple tasks at once.
540 ///
541 /// # Return value
542 ///
543 /// The function returns:
544 ///
545 /// * `Poll::Pending` if the tcp stream is not ready for reading.
546 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
547 /// * `Poll::Ready(Err(e))` if an error is encountered.
548 ///
549 /// # Errors
550 ///
551 /// This function may encounter any standard I/O error except `WouldBlock`.
552 ///
553 /// [`readable`]: method@Self::readable
554 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
555 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
556 }
557
558 /// Tries to read data from the stream into the provided buffer, returning how
559 /// many bytes were read.
560 ///
561 /// Receives any pending data from the socket but does not wait for new data
562 /// to arrive. On success, returns the number of bytes read. Because
563 /// `try_read()` is non-blocking, the buffer does not have to be stored by
564 /// the async task and can exist entirely on the stack.
565 ///
566 /// Usually, [`readable()`] or [`ready()`] is used with this function.
567 ///
568 /// [`readable()`]: TcpStream::readable()
569 /// [`ready()`]: TcpStream::ready()
570 ///
571 /// # Return
572 ///
573 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
574 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
575 ///
576 /// 1. The stream's read half is closed and will no longer yield data.
577 /// 2. The specified buffer was 0 bytes in length.
578 ///
579 /// If the stream is not ready to read data,
580 /// `Err(io::ErrorKind::WouldBlock)` is returned.
581 ///
582 /// # Examples
583 ///
584 /// ```no_run
585 /// use tokio::net::TcpStream;
586 /// use std::error::Error;
587 /// use std::io;
588 ///
589 /// #[tokio::main]
590 /// async fn main() -> Result<(), Box<dyn Error>> {
591 /// // Connect to a peer
592 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
593 ///
594 /// loop {
595 /// // Wait for the socket to be readable
596 /// stream.readable().await?;
597 ///
598 /// // Creating the buffer **after** the `await` prevents it from
599 /// // being stored in the async task.
600 /// let mut buf = [0; 4096];
601 ///
602 /// // Try to read data, this may still fail with `WouldBlock`
603 /// // if the readiness event is a false positive.
604 /// match stream.try_read(&mut buf) {
605 /// Ok(0) => break,
606 /// Ok(n) => {
607 /// println!("read {} bytes", n);
608 /// }
609 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
610 /// continue;
611 /// }
612 /// Err(e) => {
613 /// return Err(e.into());
614 /// }
615 /// }
616 /// }
617 ///
618 /// Ok(())
619 /// }
620 /// ```
621 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
622 use std::io::Read;
623
624 self.io
625 .registration()
626 .try_io(Interest::READABLE, || (&*self.io).read(buf))
627 }
628
629 /// Tries to read data from the stream into the provided buffers, returning
630 /// how many bytes were read.
631 ///
632 /// Data is copied to fill each buffer in order, with the final buffer
633 /// written to possibly being only partially filled. This method behaves
634 /// equivalently to a single call to [`try_read()`] with concatenated
635 /// buffers.
636 ///
637 /// Receives any pending data from the socket but does not wait for new data
638 /// to arrive. On success, returns the number of bytes read. Because
639 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
640 /// stored by the async task and can exist entirely on the stack.
641 ///
642 /// Usually, [`readable()`] or [`ready()`] is used with this function.
643 ///
644 /// [`try_read()`]: TcpStream::try_read()
645 /// [`readable()`]: TcpStream::readable()
646 /// [`ready()`]: TcpStream::ready()
647 ///
648 /// # Return
649 ///
650 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
651 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
652 /// and will no longer yield data. If the stream is not ready to read data
653 /// `Err(io::ErrorKind::WouldBlock)` is returned.
654 ///
655 /// # Examples
656 ///
657 /// ```no_run
658 /// use tokio::net::TcpStream;
659 /// use std::error::Error;
660 /// use std::io::{self, IoSliceMut};
661 ///
662 /// #[tokio::main]
663 /// async fn main() -> Result<(), Box<dyn Error>> {
664 /// // Connect to a peer
665 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
666 ///
667 /// loop {
668 /// // Wait for the socket to be readable
669 /// stream.readable().await?;
670 ///
671 /// // Creating the buffer **after** the `await` prevents it from
672 /// // being stored in the async task.
673 /// let mut buf_a = [0; 512];
674 /// let mut buf_b = [0; 1024];
675 /// let mut bufs = [
676 /// IoSliceMut::new(&mut buf_a),
677 /// IoSliceMut::new(&mut buf_b),
678 /// ];
679 ///
680 /// // Try to read data, this may still fail with `WouldBlock`
681 /// // if the readiness event is a false positive.
682 /// match stream.try_read_vectored(&mut bufs) {
683 /// Ok(0) => break,
684 /// Ok(n) => {
685 /// println!("read {} bytes", n);
686 /// }
687 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
688 /// continue;
689 /// }
690 /// Err(e) => {
691 /// return Err(e.into());
692 /// }
693 /// }
694 /// }
695 ///
696 /// Ok(())
697 /// }
698 /// ```
699 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
700 use std::io::Read;
701
702 self.io
703 .registration()
704 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
705 }
706
707 cfg_io_util! {
708 /// Tries to read data from the stream into the provided buffer, advancing the
709 /// buffer's internal cursor, returning how many bytes were read.
710 ///
711 /// Receives any pending data from the socket but does not wait for new data
712 /// to arrive. On success, returns the number of bytes read. Because
713 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
714 /// the async task and can exist entirely on the stack.
715 ///
716 /// Usually, [`readable()`] or [`ready()`] is used with this function.
717 ///
718 /// [`readable()`]: TcpStream::readable()
719 /// [`ready()`]: TcpStream::ready()
720 ///
721 /// # Return
722 ///
723 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
724 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
725 /// and will no longer yield data. If the stream is not ready to read data
726 /// `Err(io::ErrorKind::WouldBlock)` is returned.
727 ///
728 /// # Examples
729 ///
730 /// ```no_run
731 /// use tokio::net::TcpStream;
732 /// use std::error::Error;
733 /// use std::io;
734 ///
735 /// #[tokio::main]
736 /// async fn main() -> Result<(), Box<dyn Error>> {
737 /// // Connect to a peer
738 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
739 ///
740 /// loop {
741 /// // Wait for the socket to be readable
742 /// stream.readable().await?;
743 ///
744 /// let mut buf = Vec::with_capacity(4096);
745 ///
746 /// // Try to read data, this may still fail with `WouldBlock`
747 /// // if the readiness event is a false positive.
748 /// match stream.try_read_buf(&mut buf) {
749 /// Ok(0) => break,
750 /// Ok(n) => {
751 /// println!("read {} bytes", n);
752 /// }
753 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
754 /// continue;
755 /// }
756 /// Err(e) => {
757 /// return Err(e.into());
758 /// }
759 /// }
760 /// }
761 ///
762 /// Ok(())
763 /// }
764 /// ```
765 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
766 self.io.registration().try_io(Interest::READABLE, || {
767 use std::io::Read;
768
769 let dst = buf.chunk_mut();
770 let dst =
771 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
772
773 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
774 // buffer.
775 let n = (&*self.io).read(dst)?;
776
777 unsafe {
778 buf.advance_mut(n);
779 }
780
781 Ok(n)
782 })
783 }
784 }
785
786 /// Waits for the socket to become writable.
787 ///
788 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
789 /// paired with `try_write()`.
790 ///
791 /// # Cancel safety
792 ///
793 /// This method is cancel safe. Once a readiness event occurs, the method
794 /// will continue to return immediately until the readiness event is
795 /// consumed by an attempt to write that fails with `WouldBlock` or
796 /// `Poll::Pending`.
797 ///
798 /// # Examples
799 ///
800 /// ```no_run
801 /// use tokio::net::TcpStream;
802 /// use std::error::Error;
803 /// use std::io;
804 ///
805 /// #[tokio::main]
806 /// async fn main() -> Result<(), Box<dyn Error>> {
807 /// // Connect to a peer
808 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
809 ///
810 /// loop {
811 /// // Wait for the socket to be writable
812 /// stream.writable().await?;
813 ///
814 /// // Try to write data, this may still fail with `WouldBlock`
815 /// // if the readiness event is a false positive.
816 /// match stream.try_write(b"hello world") {
817 /// Ok(n) => {
818 /// break;
819 /// }
820 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
821 /// continue;
822 /// }
823 /// Err(e) => {
824 /// return Err(e.into());
825 /// }
826 /// }
827 /// }
828 ///
829 /// Ok(())
830 /// }
831 /// ```
832 pub async fn writable(&self) -> io::Result<()> {
833 self.ready(Interest::WRITABLE).await?;
834 Ok(())
835 }
836
837 /// Polls for write readiness.
838 ///
839 /// If the tcp stream is not currently ready for writing, this method will
840 /// store a clone of the `Waker` from the provided `Context`. When the tcp
841 /// stream becomes ready for writing, `Waker::wake` will be called on the
842 /// waker.
843 ///
844 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
845 /// the `Waker` from the `Context` passed to the most recent call is
846 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
847 /// second, independent waker.)
848 ///
849 /// This function is intended for cases where creating and pinning a future
850 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
851 /// preferred, as this supports polling from multiple tasks at once.
852 ///
853 /// # Return value
854 ///
855 /// The function returns:
856 ///
857 /// * `Poll::Pending` if the tcp stream is not ready for writing.
858 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
859 /// * `Poll::Ready(Err(e))` if an error is encountered.
860 ///
861 /// # Errors
862 ///
863 /// This function may encounter any standard I/O error except `WouldBlock`.
864 ///
865 /// [`writable`]: method@Self::writable
866 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
867 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
868 }
869
870 /// Try to write a buffer to the stream, returning how many bytes were
871 /// written.
872 ///
873 /// The function will attempt to write the entire contents of `buf`, but
874 /// only part of the buffer may be written.
875 ///
876 /// This function is usually paired with `writable()`.
877 ///
878 /// # Return
879 ///
880 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
881 /// number of bytes written. If the stream is not ready to write data,
882 /// `Err(io::ErrorKind::WouldBlock)` is returned.
883 ///
884 /// # Examples
885 ///
886 /// ```no_run
887 /// use tokio::net::TcpStream;
888 /// use std::error::Error;
889 /// use std::io;
890 ///
891 /// #[tokio::main]
892 /// async fn main() -> Result<(), Box<dyn Error>> {
893 /// // Connect to a peer
894 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
895 ///
896 /// loop {
897 /// // Wait for the socket to be writable
898 /// stream.writable().await?;
899 ///
900 /// // Try to write data, this may still fail with `WouldBlock`
901 /// // if the readiness event is a false positive.
902 /// match stream.try_write(b"hello world") {
903 /// Ok(n) => {
904 /// break;
905 /// }
906 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
907 /// continue;
908 /// }
909 /// Err(e) => {
910 /// return Err(e.into());
911 /// }
912 /// }
913 /// }
914 ///
915 /// Ok(())
916 /// }
917 /// ```
918 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
919 use std::io::Write;
920
921 self.io
922 .registration()
923 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
924 }
925
926 /// Tries to write several buffers to the stream, returning how many bytes
927 /// were written.
928 ///
929 /// Data is written from each buffer in order, with the final buffer read
930 /// from possibly being only partially consumed. This method behaves
931 /// equivalently to a single call to [`try_write()`] with concatenated
932 /// buffers.
933 ///
934 /// This function is usually paired with `writable()`.
935 ///
936 /// [`try_write()`]: TcpStream::try_write()
937 ///
938 /// # Return
939 ///
940 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
941 /// number of bytes written. If the stream is not ready to write data,
942 /// `Err(io::ErrorKind::WouldBlock)` is returned.
943 ///
944 /// # Examples
945 ///
946 /// ```no_run
947 /// use tokio::net::TcpStream;
948 /// use std::error::Error;
949 /// use std::io;
950 ///
951 /// #[tokio::main]
952 /// async fn main() -> Result<(), Box<dyn Error>> {
953 /// // Connect to a peer
954 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
955 ///
956 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
957 ///
958 /// loop {
959 /// // Wait for the socket to be writable
960 /// stream.writable().await?;
961 ///
962 /// // Try to write data, this may still fail with `WouldBlock`
963 /// // if the readiness event is a false positive.
964 /// match stream.try_write_vectored(&bufs) {
965 /// Ok(n) => {
966 /// break;
967 /// }
968 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
969 /// continue;
970 /// }
971 /// Err(e) => {
972 /// return Err(e.into());
973 /// }
974 /// }
975 /// }
976 ///
977 /// Ok(())
978 /// }
979 /// ```
980 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
981 use std::io::Write;
982
983 self.io
984 .registration()
985 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
986 }
987
988 /// Tries to read or write from the socket using a user-provided IO operation.
989 ///
990 /// If the socket is ready, the provided closure is called. The closure
991 /// should attempt to perform IO operation on the socket by manually
992 /// calling the appropriate syscall. If the operation fails because the
993 /// socket is not actually ready, then the closure should return a
994 /// `WouldBlock` error and the readiness flag is cleared. The return value
995 /// of the closure is then returned by `try_io`.
996 ///
997 /// If the socket is not ready, then the closure is not called
998 /// and a `WouldBlock` error is returned.
999 ///
1000 /// The closure should only return a `WouldBlock` error if it has performed
1001 /// an IO operation on the socket that failed due to the socket not being
1002 /// ready. Returning a `WouldBlock` error in any other situation will
1003 /// incorrectly clear the readiness flag, which can cause the socket to
1004 /// behave incorrectly.
1005 ///
1006 /// The closure should not perform the IO operation using any of the methods
1007 /// defined on the Tokio `TcpStream` type, as this will mess with the
1008 /// readiness flag and can cause the socket to behave incorrectly.
1009 ///
1010 /// This method is not intended to be used with combined interests.
1011 /// The closure should perform only one type of IO operation, so it should not
1012 /// require more than one ready state. This method may panic or sleep forever
1013 /// if it is called with a combined interest.
1014 ///
1015 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1016 ///
1017 /// [`readable()`]: TcpStream::readable()
1018 /// [`writable()`]: TcpStream::writable()
1019 /// [`ready()`]: TcpStream::ready()
1020 pub fn try_io<R>(
1021 &self,
1022 interest: Interest,
1023 f: impl FnOnce() -> io::Result<R>,
1024 ) -> io::Result<R> {
1025 self.io
1026 .registration()
1027 .try_io(interest, || self.io.try_io(f))
1028 }
1029
1030 /// Reads or writes from the socket using a user-provided IO operation.
1031 ///
1032 /// The readiness of the socket is awaited and when the socket is ready,
1033 /// the provided closure is called. The closure should attempt to perform
1034 /// IO operation on the socket by manually calling the appropriate syscall.
1035 /// If the operation fails because the socket is not actually ready,
1036 /// then the closure should return a `WouldBlock` error. In such case the
1037 /// readiness flag is cleared and the socket readiness is awaited again.
1038 /// This loop is repeated until the closure returns an `Ok` or an error
1039 /// other than `WouldBlock`.
1040 ///
1041 /// The closure should only return a `WouldBlock` error if it has performed
1042 /// an IO operation on the socket that failed due to the socket not being
1043 /// ready. Returning a `WouldBlock` error in any other situation will
1044 /// incorrectly clear the readiness flag, which can cause the socket to
1045 /// behave incorrectly.
1046 ///
1047 /// The closure should not perform the IO operation using any of the methods
1048 /// defined on the Tokio `TcpStream` type, as this will mess with the
1049 /// readiness flag and can cause the socket to behave incorrectly.
1050 ///
1051 /// This method is not intended to be used with combined interests.
1052 /// The closure should perform only one type of IO operation, so it should not
1053 /// require more than one ready state. This method may panic or sleep forever
1054 /// if it is called with a combined interest.
1055 pub async fn async_io<R>(
1056 &self,
1057 interest: Interest,
1058 mut f: impl FnMut() -> io::Result<R>,
1059 ) -> io::Result<R> {
1060 self.io
1061 .registration()
1062 .async_io(interest, || self.io.try_io(&mut f))
1063 .await
1064 }
1065
1066 /// Receives data on the socket from the remote address to which it is
1067 /// connected, without removing that data from the queue. On success,
1068 /// returns the number of bytes peeked.
1069 ///
1070 /// Successive calls return the same data. This is accomplished by passing
1071 /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1072 ///
1073 /// # Examples
1074 ///
1075 /// ```no_run
1076 /// use tokio::net::TcpStream;
1077 /// use tokio::io::AsyncReadExt;
1078 /// use std::error::Error;
1079 ///
1080 /// #[tokio::main]
1081 /// async fn main() -> Result<(), Box<dyn Error>> {
1082 /// // Connect to a peer
1083 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1084 ///
1085 /// let mut b1 = [0; 10];
1086 /// let mut b2 = [0; 10];
1087 ///
1088 /// // Peek at the data
1089 /// let n = stream.peek(&mut b1).await?;
1090 ///
1091 /// // Read the data
1092 /// assert_eq!(n, stream.read(&mut b2[..n]).await?);
1093 /// assert_eq!(&b1[..n], &b2[..n]);
1094 ///
1095 /// Ok(())
1096 /// }
1097 /// ```
1098 ///
1099 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1100 ///
1101 /// [`read`]: fn@crate::io::AsyncReadExt::read
1102 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1103 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1104 self.io
1105 .registration()
1106 .async_io(Interest::READABLE, || self.io.peek(buf))
1107 .await
1108 }
1109
1110 /// Shuts down the read, write, or both halves of this connection.
1111 ///
1112 /// This function will cause all pending and future I/O on the specified
1113 /// portions to return immediately with an appropriate value (see the
1114 /// documentation of `Shutdown`).
1115 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1116 self.io.shutdown(how)
1117 }
1118
1119 /// Gets the value of the `TCP_NODELAY` option on this socket.
1120 ///
1121 /// For more information about this option, see [`set_nodelay`].
1122 ///
1123 /// [`set_nodelay`]: TcpStream::set_nodelay
1124 ///
1125 /// # Examples
1126 ///
1127 /// ```no_run
1128 /// use tokio::net::TcpStream;
1129 ///
1130 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1131 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1132 ///
1133 /// println!("{:?}", stream.nodelay()?);
1134 /// # Ok(())
1135 /// # }
1136 /// ```
1137 pub fn nodelay(&self) -> io::Result<bool> {
1138 self.io.nodelay()
1139 }
1140
1141 /// Sets the value of the `TCP_NODELAY` option on this socket.
1142 ///
1143 /// If set, this option disables the Nagle algorithm. This means that
1144 /// segments are always sent as soon as possible, even if there is only a
1145 /// small amount of data. When not set, data is buffered until there is a
1146 /// sufficient amount to send out, thereby avoiding the frequent sending of
1147 /// small packets.
1148 ///
1149 /// # Examples
1150 ///
1151 /// ```no_run
1152 /// use tokio::net::TcpStream;
1153 ///
1154 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1155 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1156 ///
1157 /// stream.set_nodelay(true)?;
1158 /// # Ok(())
1159 /// # }
1160 /// ```
1161 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1162 self.io.set_nodelay(nodelay)
1163 }
1164
1165 cfg_not_wasi! {
1166 /// Reads the linger duration for this socket by getting the `SO_LINGER`
1167 /// option.
1168 ///
1169 /// For more information about this option, see [`set_linger`].
1170 ///
1171 /// [`set_linger`]: TcpStream::set_linger
1172 ///
1173 /// # Examples
1174 ///
1175 /// ```no_run
1176 /// use tokio::net::TcpStream;
1177 ///
1178 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1179 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1180 ///
1181 /// println!("{:?}", stream.linger()?);
1182 /// # Ok(())
1183 /// # }
1184 /// ```
1185 pub fn linger(&self) -> io::Result<Option<Duration>> {
1186 socket2::SockRef::from(self).linger()
1187 }
1188
1189 /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1190 ///
1191 /// This option controls the action taken when a stream has unsent messages and the stream is
1192 /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1193 /// data or until the time expires.
1194 ///
1195 /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1196 /// way that allows the process to continue as quickly as possible.
1197 ///
1198 /// # Examples
1199 ///
1200 /// ```no_run
1201 /// use tokio::net::TcpStream;
1202 ///
1203 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1204 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1205 ///
1206 /// stream.set_linger(None)?;
1207 /// # Ok(())
1208 /// # }
1209 /// ```
1210 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1211 socket2::SockRef::from(self).set_linger(dur)
1212 }
1213 }
1214
1215 /// Gets the value of the `IP_TTL` option for this socket.
1216 ///
1217 /// For more information about this option, see [`set_ttl`].
1218 ///
1219 /// [`set_ttl`]: TcpStream::set_ttl
1220 ///
1221 /// # Examples
1222 ///
1223 /// ```no_run
1224 /// use tokio::net::TcpStream;
1225 ///
1226 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1227 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1228 ///
1229 /// println!("{:?}", stream.ttl()?);
1230 /// # Ok(())
1231 /// # }
1232 /// ```
1233 pub fn ttl(&self) -> io::Result<u32> {
1234 self.io.ttl()
1235 }
1236
1237 /// Sets the value for the `IP_TTL` option on this socket.
1238 ///
1239 /// This value sets the time-to-live field that is used in every packet sent
1240 /// from this socket.
1241 ///
1242 /// # Examples
1243 ///
1244 /// ```no_run
1245 /// use tokio::net::TcpStream;
1246 ///
1247 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1248 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1249 ///
1250 /// stream.set_ttl(123)?;
1251 /// # Ok(())
1252 /// # }
1253 /// ```
1254 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1255 self.io.set_ttl(ttl)
1256 }
1257
1258 // These lifetime markers also appear in the generated documentation, and make
1259 // it more clear that this is a *borrowed* split.
1260 #[allow(clippy::needless_lifetimes)]
1261 /// Splits a `TcpStream` into a read half and a write half, which can be used
1262 /// to read and write the stream concurrently.
1263 ///
1264 /// This method is more efficient than [`into_split`], but the halves cannot be
1265 /// moved into independently spawned tasks.
1266 ///
1267 /// [`into_split`]: TcpStream::into_split()
1268 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1269 split(self)
1270 }
1271
1272 /// Splits a `TcpStream` into a read half and a write half, which can be used
1273 /// to read and write the stream concurrently.
1274 ///
1275 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1276 /// this comes at the cost of a heap allocation.
1277 ///
1278 /// **Note:** Dropping the write half will shut down the write half of the TCP
1279 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1280 ///
1281 /// [`split`]: TcpStream::split()
1282 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1283 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1284 split_owned(self)
1285 }
1286
1287 // == Poll IO functions that takes `&self` ==
1288 //
1289 // To read or write without mutable access to the `TcpStream`, combine the
1290 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1291 // `try_write` methods.
1292
1293 pub(crate) fn poll_read_priv(
1294 &self,
1295 cx: &mut Context<'_>,
1296 buf: &mut ReadBuf<'_>,
1297 ) -> Poll<io::Result<()>> {
1298 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1299 unsafe { self.io.poll_read(cx, buf) }
1300 }
1301
1302 pub(super) fn poll_write_priv(
1303 &self,
1304 cx: &mut Context<'_>,
1305 buf: &[u8],
1306 ) -> Poll<io::Result<usize>> {
1307 self.io.poll_write(cx, buf)
1308 }
1309
1310 pub(super) fn poll_write_vectored_priv(
1311 &self,
1312 cx: &mut Context<'_>,
1313 bufs: &[io::IoSlice<'_>],
1314 ) -> Poll<io::Result<usize>> {
1315 self.io.poll_write_vectored(cx, bufs)
1316 }
1317}
1318
1319impl TryFrom<std::net::TcpStream> for TcpStream {
1320 type Error = io::Error;
1321
1322 /// Consumes stream, returning the tokio I/O object.
1323 ///
1324 /// This is equivalent to
1325 /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1326 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1327 Self::from_std(stream)
1328 }
1329}
1330
1331// ===== impl Read / Write =====
1332
1333impl AsyncRead for TcpStream {
1334 fn poll_read(
1335 self: Pin<&mut Self>,
1336 cx: &mut Context<'_>,
1337 buf: &mut ReadBuf<'_>,
1338 ) -> Poll<io::Result<()>> {
1339 self.poll_read_priv(cx, buf)
1340 }
1341}
1342
1343impl AsyncWrite for TcpStream {
1344 fn poll_write(
1345 self: Pin<&mut Self>,
1346 cx: &mut Context<'_>,
1347 buf: &[u8],
1348 ) -> Poll<io::Result<usize>> {
1349 self.poll_write_priv(cx, buf)
1350 }
1351
1352 fn poll_write_vectored(
1353 self: Pin<&mut Self>,
1354 cx: &mut Context<'_>,
1355 bufs: &[io::IoSlice<'_>],
1356 ) -> Poll<io::Result<usize>> {
1357 self.poll_write_vectored_priv(cx, bufs)
1358 }
1359
1360 fn is_write_vectored(&self) -> bool {
1361 true
1362 }
1363
1364 #[inline]
1365 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1366 // tcp flush is a no-op
1367 Poll::Ready(Ok(()))
1368 }
1369
1370 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1371 self.shutdown_std(std::net::Shutdown::Write)?;
1372 Poll::Ready(Ok(()))
1373 }
1374}
1375
1376impl fmt::Debug for TcpStream {
1377 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1378 self.io.fmt(f)
1379 }
1380}
1381
1382#[cfg(unix)]
1383mod sys {
1384 use super::TcpStream;
1385 use std::os::unix::prelude::*;
1386
1387 impl AsRawFd for TcpStream {
1388 fn as_raw_fd(&self) -> RawFd {
1389 self.io.as_raw_fd()
1390 }
1391 }
1392
1393 impl AsFd for TcpStream {
1394 fn as_fd(&self) -> BorrowedFd<'_> {
1395 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1396 }
1397 }
1398}
1399
1400cfg_windows! {
1401 use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1402
1403 impl AsRawSocket for TcpStream {
1404 fn as_raw_socket(&self) -> RawSocket {
1405 self.io.as_raw_socket()
1406 }
1407 }
1408
1409 impl AsSocket for TcpStream {
1410 fn as_socket(&self) -> BorrowedSocket<'_> {
1411 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1412 }
1413 }
1414}
1415
1416#[cfg(all(tokio_unstable, target_os = "wasi"))]
1417mod sys {
1418 use super::TcpStream;
1419 use std::os::wasi::prelude::*;
1420
1421 impl AsRawFd for TcpStream {
1422 fn as_raw_fd(&self) -> RawFd {
1423 self.io.as_raw_fd()
1424 }
1425 }
1426
1427 impl AsFd for TcpStream {
1428 fn as_fd(&self) -> BorrowedFd<'_> {
1429 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1430 }
1431 }
1432}