tokio_util/codec/
framed_read.rs

1use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2use crate::codec::Decoder;
3
4use futures_core::Stream;
5use tokio::io::AsyncRead;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use super::FramedParts;
15
16pin_project! {
17    /// A [`Stream`] of messages decoded from an [`AsyncRead`].
18    ///
19    /// For examples of how to use `FramedRead` with a codec, see the
20    /// examples on the [`codec`] module.
21    ///
22    /// # Cancellation safety
23    /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
24    /// future only holds onto a reference to the underlying stream, so dropping it will
25    /// never lose a value.
26    ///
27    /// [`Stream`]: futures_core::Stream
28    /// [`AsyncRead`]: tokio::io::AsyncRead
29    /// [`codec`]: crate::codec
30    /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
31    pub struct FramedRead<T, D> {
32        #[pin]
33        inner: FramedImpl<T, D, ReadFrame>,
34    }
35}
36
37// ===== impl FramedRead =====
38
39impl<T, D> FramedRead<T, D> {
40    /// Creates a new `FramedRead` with the given `decoder`.
41    pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
42        FramedRead {
43            inner: FramedImpl {
44                inner,
45                codec: decoder,
46                state: Default::default(),
47            },
48        }
49    }
50
51    /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity`
52    /// initial size.
53    pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
54        FramedRead {
55            inner: FramedImpl {
56                inner,
57                codec: decoder,
58                state: ReadFrame {
59                    eof: false,
60                    is_readable: false,
61                    buffer: BytesMut::with_capacity(capacity),
62                    has_errored: false,
63                },
64            },
65        }
66    }
67
68    /// Returns a reference to the underlying I/O stream wrapped by
69    /// `FramedRead`.
70    ///
71    /// Note that care should be taken to not tamper with the underlying stream
72    /// of data coming in as it may corrupt the stream of frames otherwise
73    /// being worked with.
74    pub fn get_ref(&self) -> &T {
75        &self.inner.inner
76    }
77
78    /// Returns a mutable reference to the underlying I/O stream wrapped by
79    /// `FramedRead`.
80    ///
81    /// Note that care should be taken to not tamper with the underlying stream
82    /// of data coming in as it may corrupt the stream of frames otherwise
83    /// being worked with.
84    pub fn get_mut(&mut self) -> &mut T {
85        &mut self.inner.inner
86    }
87
88    /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
89    /// `FramedRead`.
90    ///
91    /// Note that care should be taken to not tamper with the underlying stream
92    /// of data coming in as it may corrupt the stream of frames otherwise
93    /// being worked with.
94    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
95        self.project().inner.project().inner
96    }
97
98    /// Consumes the `FramedRead`, returning its underlying I/O stream.
99    ///
100    /// Note that care should be taken to not tamper with the underlying stream
101    /// of data coming in as it may corrupt the stream of frames otherwise
102    /// being worked with.
103    pub fn into_inner(self) -> T {
104        self.inner.inner
105    }
106
107    /// Returns a reference to the underlying decoder.
108    pub fn decoder(&self) -> &D {
109        &self.inner.codec
110    }
111
112    /// Returns a mutable reference to the underlying decoder.
113    pub fn decoder_mut(&mut self) -> &mut D {
114        &mut self.inner.codec
115    }
116
117    /// Maps the decoder `D` to `C`, preserving the read buffer
118    /// wrapped by `Framed`.
119    pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
120    where
121        F: FnOnce(D) -> C,
122    {
123        // This could be potentially simplified once rust-lang/rust#86555 hits stable
124        let FramedImpl {
125            inner,
126            state,
127            codec,
128        } = self.inner;
129        FramedRead {
130            inner: FramedImpl {
131                inner,
132                state,
133                codec: map(codec),
134            },
135        }
136    }
137
138    /// Returns a mutable reference to the underlying decoder.
139    pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
140        self.project().inner.project().codec
141    }
142
143    /// Returns a reference to the read buffer.
144    pub fn read_buffer(&self) -> &BytesMut {
145        &self.inner.state.buffer
146    }
147
148    /// Returns a mutable reference to the read buffer.
149    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
150        &mut self.inner.state.buffer
151    }
152
153    /// Consumes the `FramedRead`, returning its underlying I/O stream, the buffer
154    /// with unprocessed data, and the codec.
155    pub fn into_parts(self) -> FramedParts<T, D> {
156        FramedParts {
157            io: self.inner.inner,
158            codec: self.inner.codec,
159            read_buf: self.inner.state.buffer,
160            write_buf: BytesMut::new(),
161            _priv: (),
162        }
163    }
164}
165
166// This impl just defers to the underlying FramedImpl
167impl<T, D> Stream for FramedRead<T, D>
168where
169    T: AsyncRead,
170    D: Decoder,
171{
172    type Item = Result<D::Item, D::Error>;
173
174    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175        self.project().inner.poll_next(cx)
176    }
177}
178
179// This impl just defers to the underlying T: Sink
180impl<T, I, D> Sink<I> for FramedRead<T, D>
181where
182    T: Sink<I>,
183{
184    type Error = T::Error;
185
186    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187        self.project().inner.project().inner.poll_ready(cx)
188    }
189
190    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
191        self.project().inner.project().inner.start_send(item)
192    }
193
194    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195        self.project().inner.project().inner.poll_flush(cx)
196    }
197
198    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199        self.project().inner.project().inner.poll_close(cx)
200    }
201}
202
203impl<T, D> fmt::Debug for FramedRead<T, D>
204where
205    T: fmt::Debug,
206    D: fmt::Debug,
207{
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("FramedRead")
210            .field("inner", &self.get_ref())
211            .field("decoder", &self.decoder())
212            .field("eof", &self.inner.state.eof)
213            .field("is_readable", &self.inner.state.is_readable)
214            .field("buffer", &self.read_buffer())
215            .finish()
216    }
217}