tokio_io_utility/async_read_utility/
inner.rs

1use std::{
2    cmp,
3    future::Future,
4    io::{ErrorKind, Result},
5    marker::Unpin,
6    mem::MaybeUninit,
7    ops::Bound::*,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12use tokio::io::{AsyncRead, ReadBuf};
13
14pub trait Container {
15    /// Reserve at least `n` bytes that can be used in
16    /// [`Container::spare_mut`].
17    fn reserve(&mut self, n: usize);
18
19    /// Number of initialized bytes.
20    fn len(&self) -> usize;
21
22    /// If there is no initialized bytes in the container, return `true`.
23    fn is_empty(&self) -> bool {
24        self.len() == 0
25    }
26
27    /// Return the capacity reserved.
28    fn capacity(&self) -> usize;
29
30    /// The returned uninit slice must not be empty.
31    ///
32    /// NOTE that the returned uninit slice might be smaller
33    /// than bytes reserved in [`Container::reserve`] or
34    /// ([`Container::capacity`] - [`Container::len`]).
35    ///
36    /// This is because that the container might be a ring buffer.
37    /// If you consume all uninit slices, then the sum of their lengths
38    /// must be equal to the spare capacity ([`Container::capacity`] -
39    /// [`Container::len`]).
40    ///
41    /// # Safety
42    ///
43    /// The slice returned must not be read from and users should
44    /// never write uninitialized bytes to it.
45    unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>];
46
47    /// # Safety
48    ///
49    /// The users must have actually initialized at least `n` bytes
50    /// in the uninit slice returned by [`Container::spare_mut`].
51    unsafe fn advance(&mut self, n: usize);
52}
53
54impl<T: Container> Container for &mut T {
55    fn reserve(&mut self, n: usize) {
56        (**self).reserve(n)
57    }
58
59    fn len(&self) -> usize {
60        (**self).len()
61    }
62
63    fn capacity(&self) -> usize {
64        (**self).capacity()
65    }
66
67    unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>] {
68        (**self).spare_mut()
69    }
70
71    unsafe fn advance(&mut self, n: usize) {
72        (**self).advance(n)
73    }
74}
75
76impl Container for Vec<u8> {
77    fn reserve(&mut self, n: usize) {
78        Vec::reserve_exact(self, n)
79    }
80
81    fn len(&self) -> usize {
82        Vec::len(self)
83    }
84
85    fn capacity(&self) -> usize {
86        Vec::capacity(self)
87    }
88
89    unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>] {
90        self.spare_capacity_mut()
91    }
92
93    unsafe fn advance(&mut self, n: usize) {
94        let len = self.len();
95        self.set_len(len + n)
96    }
97}
98
99#[derive(Debug)]
100pub struct ReadToContainerRngFuture<'a, C: ?Sized, Reader: ?Sized> {
101    reader: &'a mut Reader,
102    container: &'a mut C,
103    min: usize,
104    max: usize,
105}
106
107impl<C, Reader> Future for ReadToContainerRngFuture<'_, C, Reader>
108where
109    C: Container + ?Sized,
110    Reader: AsyncRead + ?Sized + Unpin,
111{
112    type Output = Result<()>;
113
114    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115        let this = &mut *self;
116
117        let reader = &mut *this.reader;
118        let container = &mut *this.container;
119        let min = &mut this.min;
120        let max = &mut this.max;
121
122        if *max == 0 {
123            return Poll::Ready(Ok(()));
124        }
125
126        // Do not test *min here so that if:
127        //
128        // ```rust
129        // read_to_container_rng(r, c, 0..10).await
130        // ```
131        //
132        // is called, then we would at least try toread in some bytes.
133        loop {
134            // safety:
135            //
136            // We will never read from it and never write uninitialized bytes
137            // to it.
138            let uninit_slice = unsafe { container.spare_mut() };
139            let len = cmp::min(uninit_slice.len(), *max);
140            let uninit_slice = &mut uninit_slice[..len];
141
142            debug_assert_ne!(uninit_slice.len(), 0);
143
144            let mut read_buf = ReadBuf::uninit(uninit_slice);
145            ready!(Pin::new(&mut *reader).poll_read(cx, &mut read_buf))?;
146
147            let filled = read_buf.filled().len();
148            if filled == 0 {
149                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
150            }
151
152            // safety:
153            //
154            // `read_buf.filled().len()` return number of bytes read in.
155            unsafe { container.advance(filled) };
156
157            *min = min.saturating_sub(filled);
158            *max -= filled;
159
160            if *min == 0 {
161                break;
162            }
163        }
164
165        Poll::Ready(Ok(()))
166    }
167}
168
169/// * `rng` - The start of the range specify the minimum of bytes to read in,
170///           while the end of the range specify the maximum of bytes that
171///           can be read in.
172///           If the lower bound is not specified, it is default to 0.
173///           If the upper bound is not specified, it is default to the
174///           capacity of `bytes`.
175///           The lower bound must not be larger than the upper bound.
176///
177/// Return [`ErrorKind::UnexpectedEof`] on Eof.
178///
179/// NOTE that this function does not modify any existing data.
180///
181/// # Cancel safety
182///
183/// It is cancel safe and dropping the returned future will not stop the
184/// wakeup from happening.
185pub fn read_to_container_rng<'a, C, Reader>(
186    reader: &'a mut Reader,
187    container: &'a mut C,
188    rng: impl std::ops::RangeBounds<usize>,
189) -> ReadToContainerRngFuture<'a, C, Reader>
190where
191    C: Container + ?Sized,
192    Reader: AsyncRead + ?Sized + Unpin,
193{
194    let min = match rng.start_bound().cloned() {
195        Included(val) => val,
196        Excluded(val) => val + 1,
197        Unbounded => 0,
198    };
199    let max = match rng.end_bound().cloned() {
200        Included(val) => val,
201        Excluded(val) => val - 1,
202        Unbounded => min.max(container.capacity() - container.len()),
203    };
204    container.reserve(max);
205
206    assert!(min <= max, "min {min} should be no larger than max {max}");
207
208    ReadToContainerRngFuture {
209        reader,
210        container,
211        min,
212        max,
213    }
214}