tokio_io_utility/async_read_utility/
inner.rs1use std::{
2 cmp,
3 future::Future,
4 io::{ErrorKind, Result},
5 marker::Unpin,
6 mem::MaybeUninit,
7 ops::Bound::*,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12use tokio::io::{AsyncRead, ReadBuf};
13
14pub trait Container {
15 fn reserve(&mut self, n: usize);
18
19 fn len(&self) -> usize;
21
22 fn is_empty(&self) -> bool {
24 self.len() == 0
25 }
26
27 fn capacity(&self) -> usize;
29
30 unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>];
46
47 unsafe fn advance(&mut self, n: usize);
52}
53
54impl<T: Container> Container for &mut T {
55 fn reserve(&mut self, n: usize) {
56 (**self).reserve(n)
57 }
58
59 fn len(&self) -> usize {
60 (**self).len()
61 }
62
63 fn capacity(&self) -> usize {
64 (**self).capacity()
65 }
66
67 unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>] {
68 (**self).spare_mut()
69 }
70
71 unsafe fn advance(&mut self, n: usize) {
72 (**self).advance(n)
73 }
74}
75
76impl Container for Vec<u8> {
77 fn reserve(&mut self, n: usize) {
78 Vec::reserve_exact(self, n)
79 }
80
81 fn len(&self) -> usize {
82 Vec::len(self)
83 }
84
85 fn capacity(&self) -> usize {
86 Vec::capacity(self)
87 }
88
89 unsafe fn spare_mut(&mut self) -> &mut [MaybeUninit<u8>] {
90 self.spare_capacity_mut()
91 }
92
93 unsafe fn advance(&mut self, n: usize) {
94 let len = self.len();
95 self.set_len(len + n)
96 }
97}
98
99#[derive(Debug)]
100pub struct ReadToContainerRngFuture<'a, C: ?Sized, Reader: ?Sized> {
101 reader: &'a mut Reader,
102 container: &'a mut C,
103 min: usize,
104 max: usize,
105}
106
107impl<C, Reader> Future for ReadToContainerRngFuture<'_, C, Reader>
108where
109 C: Container + ?Sized,
110 Reader: AsyncRead + ?Sized + Unpin,
111{
112 type Output = Result<()>;
113
114 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115 let this = &mut *self;
116
117 let reader = &mut *this.reader;
118 let container = &mut *this.container;
119 let min = &mut this.min;
120 let max = &mut this.max;
121
122 if *max == 0 {
123 return Poll::Ready(Ok(()));
124 }
125
126 loop {
134 let uninit_slice = unsafe { container.spare_mut() };
139 let len = cmp::min(uninit_slice.len(), *max);
140 let uninit_slice = &mut uninit_slice[..len];
141
142 debug_assert_ne!(uninit_slice.len(), 0);
143
144 let mut read_buf = ReadBuf::uninit(uninit_slice);
145 ready!(Pin::new(&mut *reader).poll_read(cx, &mut read_buf))?;
146
147 let filled = read_buf.filled().len();
148 if filled == 0 {
149 return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
150 }
151
152 unsafe { container.advance(filled) };
156
157 *min = min.saturating_sub(filled);
158 *max -= filled;
159
160 if *min == 0 {
161 break;
162 }
163 }
164
165 Poll::Ready(Ok(()))
166 }
167}
168
169pub fn read_to_container_rng<'a, C, Reader>(
186 reader: &'a mut Reader,
187 container: &'a mut C,
188 rng: impl std::ops::RangeBounds<usize>,
189) -> ReadToContainerRngFuture<'a, C, Reader>
190where
191 C: Container + ?Sized,
192 Reader: AsyncRead + ?Sized + Unpin,
193{
194 let min = match rng.start_bound().cloned() {
195 Included(val) => val,
196 Excluded(val) => val + 1,
197 Unbounded => 0,
198 };
199 let max = match rng.end_bound().cloned() {
200 Included(val) => val,
201 Excluded(val) => val - 1,
202 Unbounded => min.max(container.capacity() - container.len()),
203 };
204 container.reserve(max);
205
206 assert!(min <= max, "min {min} should be no larger than max {max}");
207
208 ReadToContainerRngFuture {
209 reader,
210 container,
211 min,
212 max,
213 }
214}