asynchronous_codec/
framed_read.rs
1use super::fuse::Fuse;
2use super::Decoder;
3
4use bytes::BytesMut;
5use futures_sink::Sink;
6use futures_util::io::AsyncRead;
7use futures_util::ready;
8use futures_util::stream::{Stream, TryStreamExt};
9use pin_project_lite::pin_project;
10use std::io;
11use std::marker::Unpin;
12use std::ops::{Deref, DerefMut};
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16#[derive(Debug)]
35pub struct FramedRead<T, D> {
36 inner: FramedRead2<Fuse<T, D>>,
37}
38
39impl<T, D> Deref for FramedRead<T, D> {
40 type Target = T;
41
42 fn deref(&self) -> &T {
43 &self.inner
44 }
45}
46
47impl<T, D> DerefMut for FramedRead<T, D> {
48 fn deref_mut(&mut self) -> &mut T {
49 &mut self.inner
50 }
51}
52
53impl<T, D> FramedRead<T, D>
54where
55 T: AsyncRead,
56 D: Decoder,
57{
58 pub fn new(inner: T, decoder: D) -> Self {
60 Self {
61 inner: framed_read_2(Fuse::new(inner, decoder), None),
62 }
63 }
64
65 pub fn from_parts(
69 FramedReadParts {
70 io,
71 decoder,
72 buffer,
73 ..
74 }: FramedReadParts<T, D>,
75 ) -> Self {
76 Self {
77 inner: framed_read_2(Fuse::new(io, decoder), Some(buffer)),
78 }
79 }
80
81 pub fn into_parts(self) -> FramedReadParts<T, D> {
86 let (fuse, buffer) = self.inner.into_parts();
87 FramedReadParts {
88 io: fuse.t,
89 decoder: fuse.u,
90 buffer,
91 _priv: (),
92 }
93 }
94
95 pub fn into_inner(self) -> T {
101 self.into_parts().io
102 }
103
104 pub fn decoder(&self) -> &D {
109 &self.inner.u
110 }
111
112 pub fn decoder_mut(&mut self) -> &mut D {
117 &mut self.inner.u
118 }
119
120 pub fn read_buffer(&self) -> &BytesMut {
122 &self.inner.buffer
123 }
124}
125
126impl<T, D> Stream for FramedRead<T, D>
127where
128 T: AsyncRead + Unpin,
129 D: Decoder,
130{
131 type Item = Result<D::Item, D::Error>;
132
133 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134 self.inner.try_poll_next_unpin(cx)
135 }
136}
137
138pin_project! {
139 #[derive(Debug)]
140 pub struct FramedRead2<T> {
141 #[pin]
142 inner: T,
143 buffer: BytesMut,
144 }
145}
146
147impl<T> Deref for FramedRead2<T> {
148 type Target = T;
149
150 fn deref(&self) -> &T {
151 &self.inner
152 }
153}
154
155impl<T> DerefMut for FramedRead2<T> {
156 fn deref_mut(&mut self) -> &mut T {
157 &mut self.inner
158 }
159}
160
161const INITIAL_CAPACITY: usize = 8 * 1024;
162
163pub fn framed_read_2<T>(inner: T, buffer: Option<BytesMut>) -> FramedRead2<T> {
164 FramedRead2 {
165 inner,
166 buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(INITIAL_CAPACITY)),
167 }
168}
169
170impl<T> Stream for FramedRead2<T>
171where
172 T: AsyncRead + Decoder + Unpin,
173{
174 type Item = Result<T::Item, T::Error>;
175
176 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177 let this = &mut *self;
178
179 if let Some(item) = this.inner.decode(&mut this.buffer)? {
180 return Poll::Ready(Some(Ok(item)));
181 }
182
183 let mut buf = [0u8; INITIAL_CAPACITY];
184
185 loop {
186 let n = ready!(Pin::new(&mut this.inner).poll_read(cx, &mut buf))?;
187 this.buffer.extend_from_slice(&buf[..n]);
188
189 let ended = n == 0;
190
191 match this.inner.decode(&mut this.buffer)? {
192 Some(item) => return Poll::Ready(Some(Ok(item))),
193 None if ended => {
194 if this.buffer.is_empty() {
195 return Poll::Ready(None);
196 } else {
197 match this.inner.decode_eof(&mut this.buffer)? {
198 Some(item) => return Poll::Ready(Some(Ok(item))),
199 None if this.buffer.is_empty() => return Poll::Ready(None),
200 None => {
201 return Poll::Ready(Some(Err(io::Error::new(
202 io::ErrorKind::UnexpectedEof,
203 "bytes remaining in stream",
204 )
205 .into())));
206 }
207 }
208 }
209 }
210 _ => continue,
211 }
212 }
213 }
214}
215
216impl<T, I> Sink<I> for FramedRead2<T>
217where
218 T: Sink<I> + Unpin,
219{
220 type Error = T::Error;
221
222 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
223 self.project().inner.poll_ready(cx)
224 }
225 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
226 self.project().inner.start_send(item)
227 }
228 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
229 self.project().inner.poll_flush(cx)
230 }
231 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
232 self.project().inner.poll_close(cx)
233 }
234}
235
236impl<T> FramedRead2<T> {
237 pub fn into_parts(self) -> (T, BytesMut) {
238 (self.inner, self.buffer)
239 }
240
241 pub fn buffer(&self) -> &BytesMut {
242 &self.buffer
243 }
244}
245
246pub struct FramedReadParts<T, D> {
248 pub io: T,
250 pub decoder: D,
252 pub buffer: BytesMut,
255 _priv: (),
257}
258
259impl<T, D> FramedReadParts<T, D> {
260 pub fn map_decoder<E, F>(self, f: F) -> FramedReadParts<T, E>
262 where
263 E: Decoder,
264 F: FnOnce(D) -> E,
265 {
266 FramedReadParts {
267 io: self.io,
268 decoder: f(self.decoder),
269 buffer: self.buffer,
270 _priv: (),
271 }
272 }
273}