1use std::{
2 cmp,
3 future::Future,
4 io::{ErrorKind, Result},
5 marker::Unpin,
6 mem::MaybeUninit,
7 ops::Bound::*,
8 pin::Pin,
9 task::{Context, Poll},
10};
1112use tokio::io::{AsyncRead, ReadBuf};
1314pub trait Container {
15/// Reserve at least `n` bytes that can be used in
16 /// [`Container::spare_mut`].
17fn reserve(&mut self, n: usize);
1819/// Number of initialized bytes.
20fn len(&self) -> usize;
2122/// If there is no initialized bytes in the container, return `true`.
23fn is_empty(&self) -> bool {
24self.len() == 0
25}
2627/// Return the capacity reserved.
28fn capacity(&self) -> usize;
2930/// The returned uninit slice must not be empty.
31 ///
32 /// NOTE that the returned uninit slice might be smaller
33 /// than bytes reserved in [`Container::reserve`] or
34 /// ([`Container::capacity`] - [`Container::len`]).
35 ///
36 /// This is because that the container might be a ring buffer.
37 /// If you consume all uninit slices, then the sum of their lengths
38 /// must be equal to the spare capacity ([`Container::capacity`] -
39 /// [`Container::len`]).
40 ///
41 /// # Safety
42 ///
43 /// The slice returned must not be read from and users should
44 /// never write uninitialized bytes to it.
45unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>];
4647/// # Safety
48 ///
49 /// The users must have actually initialized at least `n` bytes
50 /// in the uninit slice returned by [`Container::spare_mut`].
51unsafe fn advance(&mut self, n: usize);
52}
5354impl<T: Container> Container for &mut T {
55fn reserve(&mut self, n: usize) {
56 (**self).reserve(n)
57 }
5859fn len(&self) -> usize {
60 (**self).len()
61 }
6263fn capacity(&self) -> usize {
64 (**self).capacity()
65 }
6667unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>] {
68 (**self).spare_mut()
69 }
7071unsafe fn advance(&mut self, n: usize) {
72 (**self).advance(n)
73 }
74}
7576impl Container for Vec<u8> {
77fn reserve(&mut self, n: usize) {
78 Vec::reserve_exact(self, n)
79 }
8081fn len(&self) -> usize {
82 Vec::len(self)
83 }
8485fn capacity(&self) -> usize {
86 Vec::capacity(self)
87 }
8889unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>] {
90self.spare_capacity_mut()
91 }
9293unsafe fn advance(&mut self, n: usize) {
94let len = self.len();
95self.set_len(len + n)
96 }
97}
9899#[derive(Debug)]
100pub struct ReadToContainerRngFuture<'a, C: ?Sized, Reader: ?Sized> {
101 reader: &'a mut Reader,
102 container: &'a mut C,
103 min: usize,
104 max: usize,
105}
106107impl<C, Reader> Future for ReadToContainerRngFuture<'_, C, Reader>
108where
109C: Container + ?Sized,
110 Reader: AsyncRead + ?Sized + Unpin,
111{
112type Output = Result<()>;
113114fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115let this = &mut *self;
116117let reader = &mut *this.reader;
118let container = &mut *this.container;
119let min = &mut this.min;
120let max = &mut this.max;
121122if *max == 0 {
123return Poll::Ready(Ok(()));
124 }
125126// Do not test *min here so that if:
127 //
128 // ```rust
129 // read_to_container_rng(r, c, 0..10).await
130 // ```
131 //
132 // is called, then we would at least try toread in some bytes.
133loop {
134// safety:
135 //
136 // We will never read from it and never write uninitialized bytes
137 // to it.
138let uninit_slice = unsafe { container.spare_mut() };
139let len = cmp::min(uninit_slice.len(), *max);
140let uninit_slice = &mut uninit_slice[..len];
141142debug_assert_ne!(uninit_slice.len(), 0);
143144let mut read_buf = ReadBuf::uninit(uninit_slice);
145ready!(Pin::new(&mut *reader).poll_read(cx, &mut read_buf))?;
146147let filled = read_buf.filled().len();
148if filled == 0 {
149return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
150 }
151152// safety:
153 //
154 // `read_buf.filled().len()` return number of bytes read in.
155unsafe { container.advance(filled) };
156157*min = min.saturating_sub(filled);
158*max -= filled;
159160if *min == 0 {
161break;
162 }
163 }
164165 Poll::Ready(Ok(()))
166 }
167}
168169/// * `rng` - The start of the range specify the minimum of bytes to read in,
170/// while the end of the range specify the maximum of bytes that
171/// can be read in.
172/// If the lower bound is not specified, it is default to 0.
173/// If the upper bound is not specified, it is default to the
174/// capacity of `bytes`.
175/// The lower bound must not be larger than the upper bound.
176///
177/// Return [`ErrorKind::UnexpectedEof`] on Eof.
178///
179/// NOTE that this function does not modify any existing data.
180///
181/// # Cancel safety
182///
183/// It is cancel safe and dropping the returned future will not stop the
184/// wakeup from happening.
185pub fn read_to_container_rng<'a, C, Reader>(
186 reader: &'a mut Reader,
187 container: &'a mut C,
188 rng: impl std::ops::RangeBounds<usize>,
189) -> ReadToContainerRngFuture<'a, C, Reader>
190where
191C: Container + ?Sized,
192 Reader: AsyncRead + ?Sized + Unpin,
193{
194let min = match rng.start_bound().cloned() {
195 Included(val) => val,
196 Excluded(val) => val + 1,
197 Unbounded => 0,
198 };
199let max = match rng.end_bound().cloned() {
200 Included(val) => val,
201 Excluded(val) => val - 1,
202 Unbounded => min.max(container.capacity() - container.len()),
203 };
204 container.reserve(max);
205206assert!(min <= max, "min {min} should be no larger than max {max}");
207208 ReadToContainerRngFuture {
209 reader,
210 container,
211 min,
212 max,
213 }
214}