tokio_io_utility/
async_read_utility.rs

1use std::{
2    future::Future,
3    io::Result,
4    marker::Unpin,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use tokio::io::AsyncRead;
10
11mod inner;
12pub use inner::*;
13
14#[cfg(feature = "bytes")]
15mod bytes_impl;
16#[cfg(feature = "bytes")]
17pub use bytes_impl::*;
18
19/// Returned future of [`read_to_vec`].
20#[derive(Debug)]
21pub struct ReadToVecFuture<'a, T: ?Sized>(ReadToVecRngFuture<'a, T>);
22impl<T: AsyncRead + ?Sized + Unpin> Future for ReadToVecFuture<'_, T> {
23    type Output = Result<()>;
24
25    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
26        Pin::new(&mut self.0).poll(cx)
27    }
28}
29
30/// Try to fill data from `reader` into the spare capacity of `vec`.
31///
32/// It can be used to implement buffering.
33///
34/// Return [`std::io::ErrorKind::UnexpectedEof`] on Eof.
35///
36/// NOTE that this function does not modify any existing data.
37///
38/// # Cancel safety
39///
40/// It is cancel safe and dropping the returned future will not stop the
41/// wakeup from happening.
42pub fn read_to_vec<'a, T: AsyncRead + ?Sized + Unpin>(
43    reader: &'a mut T,
44    vec: &'a mut Vec<u8>,
45) -> ReadToVecFuture<'a, T> {
46    ReadToVecFuture(read_to_vec_rng(reader, vec, ..))
47}
48
49/// Returned future of [`read_exact_to_vec`].
50#[derive(Debug)]
51pub struct ReadExactToVecFuture<'a, T: ?Sized>(ReadToVecRngFuture<'a, T>);
52impl<T: AsyncRead + ?Sized + Unpin> Future for ReadExactToVecFuture<'_, T> {
53    type Output = Result<()>;
54
55    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56        Pin::new(&mut self.0).poll(cx)
57    }
58}
59
60/// * `nread` - bytes to read in
61///
62/// Return [`std::io::ErrorKind::UnexpectedEof`] on Eof.
63///
64/// NOTE that this function does not modify any existing data.
65///
66/// # Cancel safety
67///
68/// It is cancel safe and dropping the returned future will not stop the
69/// wakeup from happening.
70pub fn read_exact_to_vec<'a, T: AsyncRead + ?Sized + Unpin>(
71    reader: &'a mut T,
72    vec: &'a mut Vec<u8>,
73    nread: usize,
74) -> ReadExactToVecFuture<'a, T> {
75    ReadExactToVecFuture(read_to_vec_rng(reader, vec, nread..=nread))
76}
77
78/// Returned future of [`read_to_vec_rng`].
79#[derive(Debug)]
80pub struct ReadToVecRngFuture<'a, Reader: ?Sized>(ReadToContainerRngFuture<'a, Vec<u8>, Reader>);
81
82impl<Reader: AsyncRead + ?Sized + Unpin> Future for ReadToVecRngFuture<'_, Reader> {
83    type Output = Result<()>;
84
85    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86        Pin::new(&mut self.0).poll(cx)
87    }
88}
89
90/// * `rng` - The start of the range specify the minimum of bytes to read in,
91///           while the end of the range specify the maximum of bytes that
92///           can be read in.
93///           If the lower bound is not specified, it is default to 0.
94///           If the upper bound is not specified, it is default to the
95///           capacity of `bytes`.
96///           The lower bound must not be larger than the upper bound.
97///
98/// Return [`std::io::ErrorKind::UnexpectedEof`] on Eof.
99///
100/// NOTE that this function does not modify any existing data.
101///
102/// # Cancel safety
103///
104/// It is cancel safe and dropping the returned future will not stop the
105/// wakeup from happening.
106pub fn read_to_vec_rng<'a, T: AsyncRead + ?Sized + Unpin>(
107    reader: &'a mut T,
108    vec: &'a mut Vec<u8>,
109    rng: impl std::ops::RangeBounds<usize>,
110) -> ReadToVecRngFuture<'a, T> {
111    ReadToVecRngFuture(read_to_container_rng(reader, vec, rng))
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    use tokio::io::AsyncWriteExt;
119
120    #[test]
121    fn test_read_to_vec() {
122        tokio::runtime::Builder::new_current_thread()
123            .enable_all()
124            .build()
125            .unwrap()
126            .block_on(async {
127                let (mut r, mut w) = tokio_pipe::pipe().unwrap();
128
129                for n in 1..=255 {
130                    w.write_u8(n).await.unwrap();
131                }
132
133                let mut buffer = Vec::with_capacity(255);
134
135                read_to_vec(&mut r, &mut buffer).await.unwrap();
136
137                assert_eq!(buffer.len(), 255);
138                for (i, each) in buffer.iter().enumerate() {
139                    assert_eq!(*each as usize, i + 1);
140                }
141            });
142    }
143
144    #[test]
145    fn test_read_exact_to_vec() {
146        tokio::runtime::Builder::new_current_thread()
147            .enable_all()
148            .build()
149            .unwrap()
150            .block_on(async {
151                let (mut r, mut w) = tokio_pipe::pipe().unwrap();
152
153                let w_task = tokio::spawn(async move {
154                    for n in 1..=255 {
155                        w.write_u8(n).await.unwrap();
156                    }
157                });
158
159                let r_task = tokio::spawn(async move {
160                    let mut buffer = vec![0];
161
162                    read_exact_to_vec(&mut r, &mut buffer, 255).await.unwrap();
163
164                    for (i, each) in buffer.iter().enumerate() {
165                        assert_eq!(*each as usize, i);
166                    }
167                });
168                r_task.await.unwrap();
169                w_task.await.unwrap();
170            });
171    }
172
173    #[test]
174    fn test_read_to_vec_rng() {
175        tokio::runtime::Builder::new_current_thread()
176            .enable_all()
177            .build()
178            .unwrap()
179            .block_on(async {
180                let (mut r, mut w) = tokio_pipe::pipe().unwrap();
181
182                for n in 1..=255 {
183                    w.write_u8(n).await.unwrap();
184                }
185                drop(w);
186
187                let mut buffer = vec![0];
188
189                read_to_vec_rng(&mut r, &mut buffer, 1..255).await.unwrap();
190
191                assert_eq!(buffer.len(), 255);
192                for (i, each) in buffer.iter().enumerate() {
193                    assert_eq!(*each as usize, i);
194                }
195            });
196    }
197}