tokio_util/io/reader_stream.rs
1use bytes::{Bytes, BytesMut};
2use futures_core::stream::Stream;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::AsyncRead;
7
8const DEFAULT_CAPACITY: usize = 4096;
9
10pin_project! {
11 /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
12 ///
13 /// This stream is fused. It performs the inverse operation of
14 /// [`StreamReader`].
15 ///
16 /// # Example
17 ///
18 /// ```
19 /// # #[tokio::main(flavor = "current_thread")]
20 /// # async fn main() -> std::io::Result<()> {
21 /// use tokio_stream::StreamExt;
22 /// use tokio_util::io::ReaderStream;
23 ///
24 /// // Create a stream of data.
25 /// let data = b"hello, world!";
26 /// let mut stream = ReaderStream::new(&data[..]);
27 ///
28 /// // Read all of the chunks into a vector.
29 /// let mut stream_contents = Vec::new();
30 /// while let Some(chunk) = stream.next().await {
31 /// stream_contents.extend_from_slice(&chunk?);
32 /// }
33 ///
34 /// // Once the chunks are concatenated, we should have the
35 /// // original data.
36 /// assert_eq!(stream_contents, data);
37 /// # Ok(())
38 /// # }
39 /// ```
40 ///
41 /// [`AsyncRead`]: tokio::io::AsyncRead
42 /// [`StreamReader`]: crate::io::StreamReader
43 /// [`Stream`]: futures_core::Stream
44 #[derive(Debug)]
45 pub struct ReaderStream<R> {
46 // Reader itself.
47 //
48 // This value is `None` if the stream has terminated.
49 #[pin]
50 reader: Option<R>,
51 // Working buffer, used to optimize allocations.
52 buf: BytesMut,
53 capacity: usize,
54 }
55}
56
57impl<R: AsyncRead> ReaderStream<R> {
58 /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59 /// `Result<Bytes, std::io::Error>`.
60 ///
61 /// Currently, the default capacity 4096 bytes (4 KiB).
62 /// This capacity is not part of the semver contract
63 /// and may be tweaked in future releases without
64 /// requiring a major version bump.
65 ///
66 /// [`AsyncRead`]: tokio::io::AsyncRead
67 /// [`Stream`]: futures_core::Stream
68 pub fn new(reader: R) -> Self {
69 ReaderStream {
70 reader: Some(reader),
71 buf: BytesMut::new(),
72 capacity: DEFAULT_CAPACITY,
73 }
74 }
75
76 /// Convert an [`AsyncRead`] into a [`Stream`] with item type
77 /// `Result<Bytes, std::io::Error>`,
78 /// with a specific read buffer initial capacity.
79 ///
80 /// [`AsyncRead`]: tokio::io::AsyncRead
81 /// [`Stream`]: futures_core::Stream
82 pub fn with_capacity(reader: R, capacity: usize) -> Self {
83 ReaderStream {
84 reader: Some(reader),
85 buf: BytesMut::with_capacity(capacity),
86 capacity,
87 }
88 }
89}
90
91impl<R: AsyncRead> Stream for ReaderStream<R> {
92 type Item = std::io::Result<Bytes>;
93 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
94 use crate::util::poll_read_buf;
95
96 let mut this = self.as_mut().project();
97
98 let reader = match this.reader.as_pin_mut() {
99 Some(r) => r,
100 None => return Poll::Ready(None),
101 };
102
103 if this.buf.capacity() == 0 {
104 this.buf.reserve(*this.capacity);
105 }
106
107 match poll_read_buf(reader, cx, &mut this.buf) {
108 Poll::Pending => Poll::Pending,
109 Poll::Ready(Err(err)) => {
110 self.project().reader.set(None);
111 Poll::Ready(Some(Err(err)))
112 }
113 Poll::Ready(Ok(0)) => {
114 self.project().reader.set(None);
115 Poll::Ready(None)
116 }
117 Poll::Ready(Ok(_)) => {
118 let chunk = this.buf.split();
119 Poll::Ready(Some(Ok(chunk.freeze())))
120 }
121 }
122 }
123}