asynchronous_codec/
framed_read.rs

1use super::fuse::Fuse;
2use super::Decoder;
3
4use bytes::BytesMut;
5use futures_sink::Sink;
6use futures_util::io::AsyncRead;
7use futures_util::ready;
8use futures_util::stream::{Stream, TryStreamExt};
9use pin_project_lite::pin_project;
10use std::io;
11use std::marker::Unpin;
12use std::ops::{Deref, DerefMut};
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16/// A `Stream` of messages decoded from an `AsyncRead`.
17///
18/// # Example
19/// ```
20/// use asynchronous_codec::{BytesCodec, FramedRead};
21/// use futures::TryStreamExt;
22/// use bytes::{Bytes};
23///
24/// let buf = [3u8; 3];
25/// let mut framed = FramedRead::new(&buf[..], BytesCodec);
26///
27/// # futures::executor::block_on(async move {
28/// if let Some(bytes) = framed.try_next().await? {
29///     assert_eq!(bytes, Bytes::copy_from_slice(&buf[..]));
30/// }
31/// # Ok::<_, std::io::Error>(())
32/// # }).unwrap();
33/// ```
34#[derive(Debug)]
35pub struct FramedRead<T, D> {
36    inner: FramedRead2<Fuse<T, D>>,
37}
38
39impl<T, D> Deref for FramedRead<T, D> {
40    type Target = T;
41
42    fn deref(&self) -> &T {
43        &self.inner
44    }
45}
46
47impl<T, D> DerefMut for FramedRead<T, D> {
48    fn deref_mut(&mut self) -> &mut T {
49        &mut self.inner
50    }
51}
52
53impl<T, D> FramedRead<T, D>
54where
55    T: AsyncRead,
56    D: Decoder,
57{
58    /// Creates a new `FramedRead` transport with the given `Decoder`.
59    pub fn new(inner: T, decoder: D) -> Self {
60        Self {
61            inner: framed_read_2(Fuse::new(inner, decoder), None),
62        }
63    }
64
65    /// Creates a new `FramedRead` from [`FramedReadParts`].
66    ///
67    /// See also [`FramedRead::into_parts`].
68    pub fn from_parts(
69        FramedReadParts {
70            io,
71            decoder,
72            buffer,
73            ..
74        }: FramedReadParts<T, D>,
75    ) -> Self {
76        Self {
77            inner: framed_read_2(Fuse::new(io, decoder), Some(buffer)),
78        }
79    }
80
81    /// Consumes the `FramedRead`, returning its parts such that a
82    /// new `FramedRead` may be constructed, possibly with a different decoder.
83    ///
84    /// See also [`FramedRead::from_parts`].
85    pub fn into_parts(self) -> FramedReadParts<T, D> {
86        let (fuse, buffer) = self.inner.into_parts();
87        FramedReadParts {
88            io: fuse.t,
89            decoder: fuse.u,
90            buffer,
91            _priv: (),
92        }
93    }
94
95    /// Consumes the `FramedRead`, returning its underlying I/O stream.
96    ///
97    /// Note that data that has already been read but not yet consumed
98    /// by the decoder is dropped. To retain any such potentially
99    /// buffered data, use [`FramedRead::into_parts()`].
100    pub fn into_inner(self) -> T {
101        self.into_parts().io
102    }
103
104    /// Returns a reference to the underlying decoder.
105    ///
106    /// Note that care should be taken to not tamper with the underlying decoder
107    /// as it may corrupt the stream of frames otherwise being worked with.
108    pub fn decoder(&self) -> &D {
109        &self.inner.u
110    }
111
112    /// Returns a mutable reference to the underlying decoder.
113    ///
114    /// Note that care should be taken to not tamper with the underlying decoder
115    /// as it may corrupt the stream of frames otherwise being worked with.
116    pub fn decoder_mut(&mut self) -> &mut D {
117        &mut self.inner.u
118    }
119
120    /// Returns a reference to the read buffer.
121    pub fn read_buffer(&self) -> &BytesMut {
122        &self.inner.buffer
123    }
124}
125
126impl<T, D> Stream for FramedRead<T, D>
127where
128    T: AsyncRead + Unpin,
129    D: Decoder,
130{
131    type Item = Result<D::Item, D::Error>;
132
133    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134        self.inner.try_poll_next_unpin(cx)
135    }
136}
137
138pin_project! {
139    #[derive(Debug)]
140    pub struct FramedRead2<T> {
141        #[pin]
142        inner: T,
143        buffer: BytesMut,
144    }
145}
146
147impl<T> Deref for FramedRead2<T> {
148    type Target = T;
149
150    fn deref(&self) -> &T {
151        &self.inner
152    }
153}
154
155impl<T> DerefMut for FramedRead2<T> {
156    fn deref_mut(&mut self) -> &mut T {
157        &mut self.inner
158    }
159}
160
161const INITIAL_CAPACITY: usize = 8 * 1024;
162
163pub fn framed_read_2<T>(inner: T, buffer: Option<BytesMut>) -> FramedRead2<T> {
164    FramedRead2 {
165        inner,
166        buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(INITIAL_CAPACITY)),
167    }
168}
169
170impl<T> Stream for FramedRead2<T>
171where
172    T: AsyncRead + Decoder + Unpin,
173{
174    type Item = Result<T::Item, T::Error>;
175
176    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177        let this = &mut *self;
178
179        if let Some(item) = this.inner.decode(&mut this.buffer)? {
180            return Poll::Ready(Some(Ok(item)));
181        }
182
183        let mut buf = [0u8; INITIAL_CAPACITY];
184
185        loop {
186            let n = ready!(Pin::new(&mut this.inner).poll_read(cx, &mut buf))?;
187            this.buffer.extend_from_slice(&buf[..n]);
188
189            let ended = n == 0;
190
191            match this.inner.decode(&mut this.buffer)? {
192                Some(item) => return Poll::Ready(Some(Ok(item))),
193                None if ended => {
194                    if this.buffer.is_empty() {
195                        return Poll::Ready(None);
196                    } else {
197                        match this.inner.decode_eof(&mut this.buffer)? {
198                            Some(item) => return Poll::Ready(Some(Ok(item))),
199                            None if this.buffer.is_empty() => return Poll::Ready(None),
200                            None => {
201                                return Poll::Ready(Some(Err(io::Error::new(
202                                    io::ErrorKind::UnexpectedEof,
203                                    "bytes remaining in stream",
204                                )
205                                .into())));
206                            }
207                        }
208                    }
209                }
210                _ => continue,
211            }
212        }
213    }
214}
215
216impl<T, I> Sink<I> for FramedRead2<T>
217where
218    T: Sink<I> + Unpin,
219{
220    type Error = T::Error;
221
222    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
223        self.project().inner.poll_ready(cx)
224    }
225    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
226        self.project().inner.start_send(item)
227    }
228    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
229        self.project().inner.poll_flush(cx)
230    }
231    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
232        self.project().inner.poll_close(cx)
233    }
234}
235
236impl<T> FramedRead2<T> {
237    pub fn into_parts(self) -> (T, BytesMut) {
238        (self.inner, self.buffer)
239    }
240
241    pub fn buffer(&self) -> &BytesMut {
242        &self.buffer
243    }
244}
245
246/// The parts obtained from (FramedRead::into_parts).
247pub struct FramedReadParts<T, D> {
248    /// The underlying I/O stream.
249    pub io: T,
250    /// The frame decoder.
251    pub decoder: D,
252    /// The buffer of data that has been read from `io` but not
253    /// yet consumed by `decoder`.
254    pub buffer: BytesMut,
255    /// Keep the constructor private.
256    _priv: (),
257}
258
259impl<T, D> FramedReadParts<T, D> {
260    /// Changes the decoder in `FramedReadParts`.
261    pub fn map_decoder<E, F>(self, f: F) -> FramedReadParts<T, E>
262    where
263        E: Decoder,
264        F: FnOnce(D) -> E,
265    {
266        FramedReadParts {
267            io: self.io,
268            decoder: f(self.decoder),
269            buffer: self.buffer,
270            _priv: (),
271        }
272    }
273}