tokio_util/codec/
framed_read.rs1use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2use crate::codec::Decoder;
3
4use futures_core::Stream;
5use tokio::io::AsyncRead;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use super::FramedParts;
15
16pin_project! {
17 pub struct FramedRead<T, D> {
32 #[pin]
33 inner: FramedImpl<T, D, ReadFrame>,
34 }
35}
36
37impl<T, D> FramedRead<T, D> {
40 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
42 FramedRead {
43 inner: FramedImpl {
44 inner,
45 codec: decoder,
46 state: Default::default(),
47 },
48 }
49 }
50
51 pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
54 FramedRead {
55 inner: FramedImpl {
56 inner,
57 codec: decoder,
58 state: ReadFrame {
59 eof: false,
60 is_readable: false,
61 buffer: BytesMut::with_capacity(capacity),
62 has_errored: false,
63 },
64 },
65 }
66 }
67
68 pub fn get_ref(&self) -> &T {
75 &self.inner.inner
76 }
77
78 pub fn get_mut(&mut self) -> &mut T {
85 &mut self.inner.inner
86 }
87
88 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
95 self.project().inner.project().inner
96 }
97
98 pub fn into_inner(self) -> T {
104 self.inner.inner
105 }
106
107 pub fn decoder(&self) -> &D {
109 &self.inner.codec
110 }
111
112 pub fn decoder_mut(&mut self) -> &mut D {
114 &mut self.inner.codec
115 }
116
117 pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
120 where
121 F: FnOnce(D) -> C,
122 {
123 let FramedImpl {
125 inner,
126 state,
127 codec,
128 } = self.inner;
129 FramedRead {
130 inner: FramedImpl {
131 inner,
132 state,
133 codec: map(codec),
134 },
135 }
136 }
137
138 pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
140 self.project().inner.project().codec
141 }
142
143 pub fn read_buffer(&self) -> &BytesMut {
145 &self.inner.state.buffer
146 }
147
148 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
150 &mut self.inner.state.buffer
151 }
152
153 pub fn into_parts(self) -> FramedParts<T, D> {
156 FramedParts {
157 io: self.inner.inner,
158 codec: self.inner.codec,
159 read_buf: self.inner.state.buffer,
160 write_buf: BytesMut::new(),
161 _priv: (),
162 }
163 }
164}
165
166impl<T, D> Stream for FramedRead<T, D>
168where
169 T: AsyncRead,
170 D: Decoder,
171{
172 type Item = Result<D::Item, D::Error>;
173
174 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
175 self.project().inner.poll_next(cx)
176 }
177}
178
179impl<T, I, D> Sink<I> for FramedRead<T, D>
181where
182 T: Sink<I>,
183{
184 type Error = T::Error;
185
186 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
187 self.project().inner.project().inner.poll_ready(cx)
188 }
189
190 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
191 self.project().inner.project().inner.start_send(item)
192 }
193
194 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 self.project().inner.project().inner.poll_flush(cx)
196 }
197
198 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 self.project().inner.project().inner.poll_close(cx)
200 }
201}
202
203impl<T, D> fmt::Debug for FramedRead<T, D>
204where
205 T: fmt::Debug,
206 D: fmt::Debug,
207{
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 f.debug_struct("FramedRead")
210 .field("inner", &self.get_ref())
211 .field("decoder", &self.decoder())
212 .field("eof", &self.inner.state.eof)
213 .field("is_readable", &self.inner.state.is_readable)
214 .field("buffer", &self.read_buffer())
215 .finish()
216 }
217}