tokio_util/
compat.rs

1//! Compatibility between the `tokio::io` and `futures-io` versions of the
2//! `AsyncRead` and `AsyncWrite` traits.
3//!
4//! ## Bridging Tokio and Futures I/O with `compat()`
5//!
6//! The [`compat()`] function provides a compatibility layer that allows types implementing
7//! [`tokio::io::AsyncRead`] or [`tokio::io::AsyncWrite`] to be used as their
8//! [`futures::io::AsyncRead`] or [`futures::io::AsyncWrite`] counterparts — and vice versa.
9//!
10//! This is especially useful when working with libraries that expect I/O types from one ecosystem
11//! (usually `futures`) but you are using types from the other (usually `tokio`).
12//!
13//! ## Compatibility Overview
14//!
15//! | Inner Type Implements...    | `Compat<T>` Implements...   |
16//! |-----------------------------|-----------------------------|
17//! | [`tokio::io::AsyncRead`]    | [`futures::io::AsyncRead`]  |
18//! | [`futures::io::AsyncRead`]  | [`tokio::io::AsyncRead`]    |
19//! | [`tokio::io::AsyncWrite`]   | [`futures::io::AsyncWrite`] |
20//! | [`futures::io::AsyncWrite`] | [`tokio::io::AsyncWrite`]   |
21//!
22//! ## Feature Flag
23//!
24//! This functionality is available through the `compat` feature flag:
25//!
26//! ```toml
27//! tokio-util = { version = "...", features = ["compat"] }
28//! ```
29//!
30//! ## Example 1: Tokio -> Futures (`AsyncRead`)
31//!
32//! This example demonstrates sending data over a [`tokio::net::TcpStream`] and using
33//! [`futures::io::AsyncReadExt::read`] from the `futures` crate to read it after adapting the
34//! stream via [`compat()`].
35//!
36//! ```no_run
37//! # #[cfg(not(target_family = "wasm"))]
38//! # {
39//! use tokio::net::{TcpListener, TcpStream};
40//! use tokio::io::AsyncWriteExt;
41//! use tokio_util::compat::TokioAsyncReadCompatExt;
42//! use futures::io::AsyncReadExt;
43//!
44//! #[tokio::main]
45//! async fn main() -> std::io::Result<()> {
46//!     let listener = TcpListener::bind("127.0.0.1:8081").await?;
47//!
48//!     tokio::spawn(async {
49//!         let mut client = TcpStream::connect("127.0.0.1:8081").await.unwrap();
50//!         client.write_all(b"Hello World").await.unwrap();
51//!     });
52//!
53//!     let (stream, _) = listener.accept().await?;
54//!
55//!     // Adapt `tokio::TcpStream` to be used with `futures::io::AsyncReadExt`
56//!     let mut compat_stream = stream.compat();
57//!     let mut buffer = [0; 20];
58//!     let n = compat_stream.read(&mut buffer).await?;
59//!     println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));
60//!
61//!     Ok(())
62//! }
63//! # }
64//! ```
65//!
66//! ## Example 2: Futures -> Tokio (`AsyncRead`)
67//!
68//! The reverse is also possible: you can take a [`futures::io::AsyncRead`] (e.g. a cursor) and
69//! adapt it to be used with [`tokio::io::AsyncReadExt::read_to_end`]
70//!
71//! ```
72//! # #[cfg(not(target_family = "wasm"))]
73//! # {
74//! use futures::io::Cursor;
75//! use tokio_util::compat::FuturesAsyncReadCompatExt;
76//! use tokio::io::AsyncReadExt;
77//!
78//! fn main() {
79//!     let future = async {
80//!         let reader = Cursor::new(b"Hello from futures");
81//!         let mut compat_reader = reader.compat();
82//!         let mut buf = Vec::new();
83//!         compat_reader.read_to_end(&mut buf).await.unwrap();
84//!         assert_eq!(&buf, b"Hello from futures");
85//!     };
86//!
87//!     // Run the future inside a Tokio runtime
88//!     tokio::runtime::Runtime::new().unwrap().block_on(future);
89//! }
90//! # }
91//! ```
92//!
93//! ## Common Use Cases
94//!
95//! - Using `tokio` sockets with `async-tungstenite`, `async-compression`, or `futures-rs`-based
96//!   libraries.
97//! - Bridging I/O interfaces between mixed-ecosystem libraries.
98//! - Avoiding rewrites or duplication of I/O code in async environments.
99//!
100//! ## See Also
101//!
102//! - [`Compat`] type
103//! - [`TokioAsyncReadCompatExt`]
104//! - [`FuturesAsyncReadCompatExt`]
105//! - [`tokio::io`]
106//! - [`futures::io`]
107//!
108//! [`futures::io`]: https://docs.rs/futures/latest/futures/io/
109//! [`futures::io::AsyncRead`]: https://docs.rs/futures/latest/futures/io/trait.AsyncRead.html
110//! [`futures::io::AsyncWrite`]: https://docs.rs/futures/latest/futures/io/trait.AsyncWrite.html
111//! [`futures::io::AsyncReadExt::read`]: https://docs.rs/futures/latest/futures/io/trait.AsyncReadExt.html#method.read
112//! [`compat()`]: TokioAsyncReadCompatExt::compat
113
114use pin_project_lite::pin_project;
115use std::io;
116use std::pin::Pin;
117use std::task::{ready, Context, Poll};
118
119pin_project! {
120    /// A compatibility layer that allows conversion between the
121    /// `tokio::io` and `futures-io` `AsyncRead` and `AsyncWrite` traits.
122    #[derive(Copy, Clone, Debug)]
123    pub struct Compat<T> {
124        #[pin]
125        inner: T,
126        seek_pos: Option<io::SeekFrom>,
127    }
128}
129
130/// Extension trait that allows converting a type implementing
131/// `futures_io::AsyncRead` to implement `tokio::io::AsyncRead`.
132pub trait FuturesAsyncReadCompatExt: futures_io::AsyncRead {
133    /// Wraps `self` with a compatibility layer that implements
134    /// `tokio_io::AsyncRead`.
135    fn compat(self) -> Compat<Self>
136    where
137        Self: Sized,
138    {
139        Compat::new(self)
140    }
141}
142
143impl<T: futures_io::AsyncRead> FuturesAsyncReadCompatExt for T {}
144
145/// Extension trait that allows converting a type implementing
146/// `futures_io::AsyncWrite` to implement `tokio::io::AsyncWrite`.
147pub trait FuturesAsyncWriteCompatExt: futures_io::AsyncWrite {
148    /// Wraps `self` with a compatibility layer that implements
149    /// `tokio::io::AsyncWrite`.
150    fn compat_write(self) -> Compat<Self>
151    where
152        Self: Sized,
153    {
154        Compat::new(self)
155    }
156}
157
158impl<T: futures_io::AsyncWrite> FuturesAsyncWriteCompatExt for T {}
159
160/// Extension trait that allows converting a type implementing
161/// `tokio::io::AsyncRead` to implement `futures_io::AsyncRead`.
162pub trait TokioAsyncReadCompatExt: tokio::io::AsyncRead {
163    /// Wraps `self` with a compatibility layer that implements
164    /// `futures_io::AsyncRead`.
165    fn compat(self) -> Compat<Self>
166    where
167        Self: Sized,
168    {
169        Compat::new(self)
170    }
171}
172
173impl<T: tokio::io::AsyncRead> TokioAsyncReadCompatExt for T {}
174
175/// Extension trait that allows converting a type implementing
176/// `tokio::io::AsyncWrite` to implement `futures_io::AsyncWrite`.
177pub trait TokioAsyncWriteCompatExt: tokio::io::AsyncWrite {
178    /// Wraps `self` with a compatibility layer that implements
179    /// `futures_io::AsyncWrite`.
180    fn compat_write(self) -> Compat<Self>
181    where
182        Self: Sized,
183    {
184        Compat::new(self)
185    }
186}
187
188impl<T: tokio::io::AsyncWrite> TokioAsyncWriteCompatExt for T {}
189
190// === impl Compat ===
191
192impl<T> Compat<T> {
193    fn new(inner: T) -> Self {
194        Self {
195            inner,
196            seek_pos: None,
197        }
198    }
199
200    /// Get a reference to the `Future`, `Stream`, `AsyncRead`, or `AsyncWrite` object
201    /// contained within.
202    pub fn get_ref(&self) -> &T {
203        &self.inner
204    }
205
206    /// Get a mutable reference to the `Future`, `Stream`, `AsyncRead`, or `AsyncWrite` object
207    /// contained within.
208    pub fn get_mut(&mut self) -> &mut T {
209        &mut self.inner
210    }
211
212    /// Returns the wrapped item.
213    pub fn into_inner(self) -> T {
214        self.inner
215    }
216}
217
218impl<T> tokio::io::AsyncRead for Compat<T>
219where
220    T: futures_io::AsyncRead,
221{
222    fn poll_read(
223        self: Pin<&mut Self>,
224        cx: &mut Context<'_>,
225        buf: &mut tokio::io::ReadBuf<'_>,
226    ) -> Poll<io::Result<()>> {
227        // We can't trust the inner type to not peak at the bytes,
228        // so we must defensively initialize the buffer.
229        let slice = buf.initialize_unfilled();
230        let n = ready!(futures_io::AsyncRead::poll_read(
231            self.project().inner,
232            cx,
233            slice
234        ))?;
235        buf.advance(n);
236        Poll::Ready(Ok(()))
237    }
238}
239
240impl<T> futures_io::AsyncRead for Compat<T>
241where
242    T: tokio::io::AsyncRead,
243{
244    fn poll_read(
245        self: Pin<&mut Self>,
246        cx: &mut Context<'_>,
247        slice: &mut [u8],
248    ) -> Poll<io::Result<usize>> {
249        let mut buf = tokio::io::ReadBuf::new(slice);
250        ready!(tokio::io::AsyncRead::poll_read(
251            self.project().inner,
252            cx,
253            &mut buf
254        ))?;
255        Poll::Ready(Ok(buf.filled().len()))
256    }
257}
258
259impl<T> tokio::io::AsyncBufRead for Compat<T>
260where
261    T: futures_io::AsyncBufRead,
262{
263    fn poll_fill_buf<'a>(
264        self: Pin<&'a mut Self>,
265        cx: &mut Context<'_>,
266    ) -> Poll<io::Result<&'a [u8]>> {
267        futures_io::AsyncBufRead::poll_fill_buf(self.project().inner, cx)
268    }
269
270    fn consume(self: Pin<&mut Self>, amt: usize) {
271        futures_io::AsyncBufRead::consume(self.project().inner, amt)
272    }
273}
274
275impl<T> futures_io::AsyncBufRead for Compat<T>
276where
277    T: tokio::io::AsyncBufRead,
278{
279    fn poll_fill_buf<'a>(
280        self: Pin<&'a mut Self>,
281        cx: &mut Context<'_>,
282    ) -> Poll<io::Result<&'a [u8]>> {
283        tokio::io::AsyncBufRead::poll_fill_buf(self.project().inner, cx)
284    }
285
286    fn consume(self: Pin<&mut Self>, amt: usize) {
287        tokio::io::AsyncBufRead::consume(self.project().inner, amt)
288    }
289}
290
291impl<T> tokio::io::AsyncWrite for Compat<T>
292where
293    T: futures_io::AsyncWrite,
294{
295    fn poll_write(
296        self: Pin<&mut Self>,
297        cx: &mut Context<'_>,
298        buf: &[u8],
299    ) -> Poll<io::Result<usize>> {
300        futures_io::AsyncWrite::poll_write(self.project().inner, cx, buf)
301    }
302
303    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
304        futures_io::AsyncWrite::poll_flush(self.project().inner, cx)
305    }
306
307    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
308        futures_io::AsyncWrite::poll_close(self.project().inner, cx)
309    }
310}
311
312impl<T> futures_io::AsyncWrite for Compat<T>
313where
314    T: tokio::io::AsyncWrite,
315{
316    fn poll_write(
317        self: Pin<&mut Self>,
318        cx: &mut Context<'_>,
319        buf: &[u8],
320    ) -> Poll<io::Result<usize>> {
321        tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
322    }
323
324    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
325        tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
326    }
327
328    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
329        tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
330    }
331}
332
333impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
334    fn poll_seek(
335        mut self: Pin<&mut Self>,
336        cx: &mut Context<'_>,
337        pos: io::SeekFrom,
338    ) -> Poll<io::Result<u64>> {
339        if self.seek_pos != Some(pos) {
340            // Ensure previous seeks have finished before starting a new one
341            ready!(self.as_mut().project().inner.poll_complete(cx))?;
342            self.as_mut().project().inner.start_seek(pos)?;
343            *self.as_mut().project().seek_pos = Some(pos);
344        }
345        let res = ready!(self.as_mut().project().inner.poll_complete(cx));
346        *self.as_mut().project().seek_pos = None;
347        Poll::Ready(res)
348    }
349}
350
351impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
352    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
353        *self.as_mut().project().seek_pos = Some(pos);
354        Ok(())
355    }
356
357    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
358        let pos = match self.seek_pos {
359            None => {
360                // tokio 1.x AsyncSeek recommends calling poll_complete before start_seek.
361                // We don't have to guarantee that the value returned by
362                // poll_complete called without start_seek is correct,
363                // so we'll return 0.
364                return Poll::Ready(Ok(0));
365            }
366            Some(pos) => pos,
367        };
368        let res = ready!(self.as_mut().project().inner.poll_seek(cx, pos));
369        *self.as_mut().project().seek_pos = None;
370        Poll::Ready(res)
371    }
372}
373
374#[cfg(unix)]
375impl<T: std::os::unix::io::AsRawFd> std::os::unix::io::AsRawFd for Compat<T> {
376    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
377        self.inner.as_raw_fd()
378    }
379}
380
381#[cfg(windows)]
382impl<T: std::os::windows::io::AsRawHandle> std::os::windows::io::AsRawHandle for Compat<T> {
383    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
384        self.inner.as_raw_handle()
385    }
386}