tokio_util/io/
reader_stream.rs

1use bytes::{Bytes, BytesMut};
2use futures_core::stream::Stream;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::AsyncRead;
7
8const DEFAULT_CAPACITY: usize = 4096;
9
10pin_project! {
11    /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
12    ///
13    /// This stream is fused. It performs the inverse operation of
14    /// [`StreamReader`].
15    ///
16    /// # Example
17    ///
18    /// ```
19    /// # #[tokio::main(flavor = "current_thread")]
20    /// # async fn main() -> std::io::Result<()> {
21    /// use tokio_stream::StreamExt;
22    /// use tokio_util::io::ReaderStream;
23    ///
24    /// // Create a stream of data.
25    /// let data = b"hello, world!";
26    /// let mut stream = ReaderStream::new(&data[..]);
27    ///
28    /// // Read all of the chunks into a vector.
29    /// let mut stream_contents = Vec::new();
30    /// while let Some(chunk) = stream.next().await {
31    ///    stream_contents.extend_from_slice(&chunk?);
32    /// }
33    ///
34    /// // Once the chunks are concatenated, we should have the
35    /// // original data.
36    /// assert_eq!(stream_contents, data);
37    /// # Ok(())
38    /// # }
39    /// ```
40    ///
41    /// [`AsyncRead`]: tokio::io::AsyncRead
42    /// [`StreamReader`]: crate::io::StreamReader
43    /// [`Stream`]: futures_core::Stream
44    #[derive(Debug)]
45    pub struct ReaderStream<R> {
46        // Reader itself.
47        //
48        // This value is `None` if the stream has terminated.
49        #[pin]
50        reader: Option<R>,
51        // Working buffer, used to optimize allocations.
52        buf: BytesMut,
53        capacity: usize,
54    }
55}
56
57impl<R: AsyncRead> ReaderStream<R> {
58    /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59    /// `Result<Bytes, std::io::Error>`.
60    ///
61    /// Currently, the default capacity 4096 bytes (4 KiB).
62    /// This capacity is not part of the semver contract
63    /// and may be tweaked in future releases without
64    /// requiring a major version bump.
65    ///
66    /// [`AsyncRead`]: tokio::io::AsyncRead
67    /// [`Stream`]: futures_core::Stream
68    pub fn new(reader: R) -> Self {
69        ReaderStream {
70            reader: Some(reader),
71            buf: BytesMut::new(),
72            capacity: DEFAULT_CAPACITY,
73        }
74    }
75
76    /// Convert an [`AsyncRead`] into a [`Stream`] with item type
77    /// `Result<Bytes, std::io::Error>`,
78    /// with a specific read buffer initial capacity.
79    ///
80    /// [`AsyncRead`]: tokio::io::AsyncRead
81    /// [`Stream`]: futures_core::Stream
82    pub fn with_capacity(reader: R, capacity: usize) -> Self {
83        ReaderStream {
84            reader: Some(reader),
85            buf: BytesMut::with_capacity(capacity),
86            capacity,
87        }
88    }
89}
90
91impl<R: AsyncRead> Stream for ReaderStream<R> {
92    type Item = std::io::Result<Bytes>;
93    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
94        use crate::util::poll_read_buf;
95
96        let mut this = self.as_mut().project();
97
98        let reader = match this.reader.as_pin_mut() {
99            Some(r) => r,
100            None => return Poll::Ready(None),
101        };
102
103        if this.buf.capacity() == 0 {
104            this.buf.reserve(*this.capacity);
105        }
106
107        match poll_read_buf(reader, cx, &mut this.buf) {
108            Poll::Pending => Poll::Pending,
109            Poll::Ready(Err(err)) => {
110                self.project().reader.set(None);
111                Poll::Ready(Some(Err(err)))
112            }
113            Poll::Ready(Ok(0)) => {
114                self.project().reader.set(None);
115                Poll::Ready(None)
116            }
117            Poll::Ready(Ok(_)) => {
118                let chunk = this.buf.split();
119                Poll::Ready(Some(Ok(chunk.freeze())))
120            }
121        }
122    }
123}