hyper_util/rt/tokio/
with_tokio_io.rs

1use pin_project_lite::pin_project;
2use std::{
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7pin_project! {
8    /// Extends an underlying [`hyper`] I/O with [`tokio`] I/O implementations.
9    ///
10    /// This implements [`AsyncRead`] and [`AsyncWrite`] given an inner type that implements
11    /// [`Read`] and [`Write`], respectively.
12    #[derive(Debug)]
13    pub struct WithTokioIo<I> {
14        #[pin]
15        inner: I,
16    }
17}
18
19// ==== impl WithTokioIo =====
20
21/// [`WithTokioIo<I>`] is [`AsyncRead`] if `I` is [`Read`].
22///
23/// [`AsyncRead`]: tokio::io::AsyncRead
24/// [`Read`]: hyper::rt::Read
25impl<I> tokio::io::AsyncRead for WithTokioIo<I>
26where
27    I: hyper::rt::Read,
28{
29    fn poll_read(
30        self: Pin<&mut Self>,
31        cx: &mut Context<'_>,
32        tbuf: &mut tokio::io::ReadBuf<'_>,
33    ) -> Poll<Result<(), std::io::Error>> {
34        //let init = tbuf.initialized().len();
35        let filled = tbuf.filled().len();
36        let sub_filled = unsafe {
37            let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
38
39            match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
40                Poll::Ready(Ok(())) => buf.filled().len(),
41                other => return other,
42            }
43        };
44
45        let n_filled = filled + sub_filled;
46        // At least sub_filled bytes had to have been initialized.
47        let n_init = sub_filled;
48        unsafe {
49            tbuf.assume_init(n_init);
50            tbuf.set_filled(n_filled);
51        }
52
53        Poll::Ready(Ok(()))
54    }
55}
56
57/// [`WithTokioIo<I>`] is [`AsyncWrite`] if `I` is [`Write`].
58///
59/// [`AsyncWrite`]: tokio::io::AsyncWrite
60/// [`Write`]: hyper::rt::Write
61impl<I> tokio::io::AsyncWrite for WithTokioIo<I>
62where
63    I: hyper::rt::Write,
64{
65    fn poll_write(
66        self: Pin<&mut Self>,
67        cx: &mut Context<'_>,
68        buf: &[u8],
69    ) -> Poll<Result<usize, std::io::Error>> {
70        hyper::rt::Write::poll_write(self.project().inner, cx, buf)
71    }
72
73    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
74        hyper::rt::Write::poll_flush(self.project().inner, cx)
75    }
76
77    fn poll_shutdown(
78        self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80    ) -> Poll<Result<(), std::io::Error>> {
81        hyper::rt::Write::poll_shutdown(self.project().inner, cx)
82    }
83
84    fn is_write_vectored(&self) -> bool {
85        hyper::rt::Write::is_write_vectored(&self.inner)
86    }
87
88    fn poll_write_vectored(
89        self: Pin<&mut Self>,
90        cx: &mut Context<'_>,
91        bufs: &[std::io::IoSlice<'_>],
92    ) -> Poll<Result<usize, std::io::Error>> {
93        hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
94    }
95}
96
97/// [`WithTokioIo<I>`] exposes its inner `I`'s [`Write`] implementation.
98///
99/// [`Write`]: hyper::rt::Write
100impl<I> hyper::rt::Write for WithTokioIo<I>
101where
102    I: hyper::rt::Write,
103{
104    #[inline]
105    fn poll_write(
106        self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108        buf: &[u8],
109    ) -> Poll<Result<usize, std::io::Error>> {
110        self.project().inner.poll_write(cx, buf)
111    }
112
113    #[inline]
114    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
115        self.project().inner.poll_flush(cx)
116    }
117
118    #[inline]
119    fn poll_shutdown(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122    ) -> Poll<Result<(), std::io::Error>> {
123        self.project().inner.poll_shutdown(cx)
124    }
125
126    #[inline]
127    fn is_write_vectored(&self) -> bool {
128        self.inner.is_write_vectored()
129    }
130
131    #[inline]
132    fn poll_write_vectored(
133        self: Pin<&mut Self>,
134        cx: &mut Context<'_>,
135        bufs: &[std::io::IoSlice<'_>],
136    ) -> Poll<Result<usize, std::io::Error>> {
137        self.project().inner.poll_write_vectored(cx, bufs)
138    }
139}
140
141impl<I> WithTokioIo<I> {
142    /// Wraps the inner I/O in an [`WithTokioIo<I>`]
143    pub fn new(inner: I) -> Self {
144        Self { inner }
145    }
146
147    /// Returns a reference to the inner type.
148    pub fn inner(&self) -> &I {
149        &self.inner
150    }
151
152    /// Returns a mutable reference to the inner type.
153    pub fn inner_mut(&mut self) -> &mut I {
154        &mut self.inner
155    }
156
157    /// Consumes this wrapper and returns the inner type.
158    pub fn into_inner(self) -> I {
159        self.inner
160    }
161}
162
163/// [`WithTokioIo<I>`] exposes its inner `I`'s [`Read`] implementation.
164///
165/// [`Read`]: hyper::rt::Read
166impl<I> hyper::rt::Read for WithTokioIo<I>
167where
168    I: hyper::rt::Read,
169{
170    #[inline]
171    fn poll_read(
172        self: Pin<&mut Self>,
173        cx: &mut Context<'_>,
174        buf: hyper::rt::ReadBufCursor<'_>,
175    ) -> Poll<Result<(), std::io::Error>> {
176        self.project().inner.poll_read(cx, buf)
177    }
178}