tokio_util/codec/framed.rs
1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_core::Stream;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 ///
20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21 /// by using the `new` function seen below.
22 ///
23 /// # Cancellation safety
24 ///
25 /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a
26 /// `tokio::select!` statement and some other branch completes first, then it is
27 /// guaranteed that the message was not sent, but the message itself is lost.
28 /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
29 /// future only holds onto a reference to the underlying stream, so dropping it will
30 /// never lose a value.
31 ///
32 /// [`Stream`]: futures_core::Stream
33 /// [`Sink`]: futures_sink::Sink
34 /// [`AsyncRead`]: tokio::io::AsyncRead
35 /// [`Decoder::framed`]: crate::codec::Decoder::framed()
36 /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send
37 /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
38 pub struct Framed<T, U> {
39 #[pin]
40 inner: FramedImpl<T, U, RWFrames>
41 }
42}
43
44impl<T, U> Framed<T, U> {
45 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
46 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
47 ///
48 /// Raw I/O objects work with byte sequences, but higher-level code usually
49 /// wants to batch these into meaningful chunks, called "frames". This
50 /// method layers framing on top of an I/O object, by using the codec
51 /// traits to handle encoding and decoding of messages frames. Note that
52 /// the incoming and outgoing frame types may be distinct.
53 ///
54 /// This function returns a *single* object that is both [`Stream`] and
55 /// [`Sink`]; grouping this into a single object is often useful for layering
56 /// things like gzip or TLS, which require both read and write access to the
57 /// underlying object.
58 ///
59 /// If you want to work more directly with the streams and sink, consider
60 /// calling [`split`] on the `Framed` returned by this method, which will
61 /// break them into separate objects, allowing them to interact more easily.
62 ///
63 /// Note that, for some byte sources, the stream can be resumed after an EOF
64 /// by reading from it, even after it has returned `None`. Repeated attempts
65 /// to do so, without new data available, continue to return `None` without
66 /// creating more (closing) frames.
67 ///
68 /// [`Stream`]: futures_core::Stream
69 /// [`Sink`]: futures_sink::Sink
70 /// [`Decode`]: crate::codec::Decoder
71 /// [`Encoder`]: crate::codec::Encoder
72 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
73 pub fn new(inner: T, codec: U) -> Framed<T, U> {
74 Framed {
75 inner: FramedImpl {
76 inner,
77 codec,
78 state: Default::default(),
79 },
80 }
81 }
82
83 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
84 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
85 /// with a specific read buffer initial capacity.
86 ///
87 /// Raw I/O objects work with byte sequences, but higher-level code usually
88 /// wants to batch these into meaningful chunks, called "frames". This
89 /// method layers framing on top of an I/O object, by using the codec
90 /// traits to handle encoding and decoding of messages frames. Note that
91 /// the incoming and outgoing frame types may be distinct.
92 ///
93 /// This function returns a *single* object that is both [`Stream`] and
94 /// [`Sink`]; grouping this into a single object is often useful for layering
95 /// things like gzip or TLS, which require both read and write access to the
96 /// underlying object.
97 ///
98 /// If you want to work more directly with the streams and sink, consider
99 /// calling [`split`] on the `Framed` returned by this method, which will
100 /// break them into separate objects, allowing them to interact more easily.
101 ///
102 /// [`Stream`]: futures_core::Stream
103 /// [`Sink`]: futures_sink::Sink
104 /// [`Decode`]: crate::codec::Decoder
105 /// [`Encoder`]: crate::codec::Encoder
106 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
107 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
108 Framed {
109 inner: FramedImpl {
110 inner,
111 codec,
112 state: RWFrames {
113 read: ReadFrame {
114 eof: false,
115 is_readable: false,
116 buffer: BytesMut::with_capacity(capacity),
117 has_errored: false,
118 },
119 write: WriteFrame {
120 buffer: BytesMut::with_capacity(capacity),
121 backpressure_boundary: capacity,
122 },
123 },
124 },
125 }
126 }
127
128 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
129 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
130 ///
131 /// Raw I/O objects work with byte sequences, but higher-level code usually
132 /// wants to batch these into meaningful chunks, called "frames". This
133 /// method layers framing on top of an I/O object, by using the `Codec`
134 /// traits to handle encoding and decoding of messages frames. Note that
135 /// the incoming and outgoing frame types may be distinct.
136 ///
137 /// This function returns a *single* object that is both [`Stream`] and
138 /// [`Sink`]; grouping this into a single object is often useful for layering
139 /// things like gzip or TLS, which require both read and write access to the
140 /// underlying object.
141 ///
142 /// This objects takes a stream and a `readbuffer` and a `writebuffer`. These field
143 /// can be obtained from an existing `Framed` with the [`into_parts`] method.
144 ///
145 /// If you want to work more directly with the streams and sink, consider
146 /// calling [`split`] on the `Framed` returned by this method, which will
147 /// break them into separate objects, allowing them to interact more easily.
148 ///
149 /// [`Stream`]: futures_core::Stream
150 /// [`Sink`]: futures_sink::Sink
151 /// [`Decoder`]: crate::codec::Decoder
152 /// [`Encoder`]: crate::codec::Encoder
153 /// [`into_parts`]: crate::codec::Framed::into_parts()
154 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
155 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
156 Framed {
157 inner: FramedImpl {
158 inner: parts.io,
159 codec: parts.codec,
160 state: RWFrames {
161 read: parts.read_buf.into(),
162 write: parts.write_buf.into(),
163 },
164 },
165 }
166 }
167
168 /// Returns a reference to the underlying I/O stream wrapped by
169 /// `Framed`.
170 ///
171 /// Note that care should be taken to not tamper with the underlying stream
172 /// of data coming in as it may corrupt the stream of frames otherwise
173 /// being worked with.
174 pub fn get_ref(&self) -> &T {
175 &self.inner.inner
176 }
177
178 /// Returns a mutable reference to the underlying I/O stream wrapped by
179 /// `Framed`.
180 ///
181 /// Note that care should be taken to not tamper with the underlying stream
182 /// of data coming in as it may corrupt the stream of frames otherwise
183 /// being worked with.
184 pub fn get_mut(&mut self) -> &mut T {
185 &mut self.inner.inner
186 }
187
188 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
189 /// `Framed`.
190 ///
191 /// Note that care should be taken to not tamper with the underlying stream
192 /// of data coming in as it may corrupt the stream of frames otherwise
193 /// being worked with.
194 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
195 self.project().inner.project().inner
196 }
197
198 /// Returns a reference to the underlying codec wrapped by
199 /// `Framed`.
200 ///
201 /// Note that care should be taken to not tamper with the underlying codec
202 /// as it may corrupt the stream of frames otherwise being worked with.
203 pub fn codec(&self) -> &U {
204 &self.inner.codec
205 }
206
207 /// Returns a mutable reference to the underlying codec wrapped by
208 /// `Framed`.
209 ///
210 /// Note that care should be taken to not tamper with the underlying codec
211 /// as it may corrupt the stream of frames otherwise being worked with.
212 pub fn codec_mut(&mut self) -> &mut U {
213 &mut self.inner.codec
214 }
215
216 /// Maps the codec `U` to `C`, preserving the read and write buffers
217 /// wrapped by `Framed`.
218 ///
219 /// Note that care should be taken to not tamper with the underlying codec
220 /// as it may corrupt the stream of frames otherwise being worked with.
221 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
222 where
223 F: FnOnce(U) -> C,
224 {
225 // This could be potentially simplified once rust-lang/rust#86555 hits stable
226 let parts = self.into_parts();
227 Framed::from_parts(FramedParts {
228 io: parts.io,
229 codec: map(parts.codec),
230 read_buf: parts.read_buf,
231 write_buf: parts.write_buf,
232 _priv: (),
233 })
234 }
235
236 /// Returns a mutable reference to the underlying codec wrapped by
237 /// `Framed`.
238 ///
239 /// Note that care should be taken to not tamper with the underlying codec
240 /// as it may corrupt the stream of frames otherwise being worked with.
241 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
242 self.project().inner.project().codec
243 }
244
245 /// Returns a reference to the read buffer.
246 pub fn read_buffer(&self) -> &BytesMut {
247 &self.inner.state.read.buffer
248 }
249
250 /// Returns a mutable reference to the read buffer.
251 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
252 &mut self.inner.state.read.buffer
253 }
254
255 /// Returns a reference to the write buffer.
256 pub fn write_buffer(&self) -> &BytesMut {
257 &self.inner.state.write.buffer
258 }
259
260 /// Returns a mutable reference to the write buffer.
261 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
262 &mut self.inner.state.write.buffer
263 }
264
265 /// Returns backpressure boundary
266 pub fn backpressure_boundary(&self) -> usize {
267 self.inner.state.write.backpressure_boundary
268 }
269
270 /// Updates backpressure boundary
271 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
272 self.inner.state.write.backpressure_boundary = boundary;
273 }
274
275 /// Consumes the `Framed`, returning its underlying I/O stream.
276 ///
277 /// Note that care should be taken to not tamper with the underlying stream
278 /// of data coming in as it may corrupt the stream of frames otherwise
279 /// being worked with.
280 pub fn into_inner(self) -> T {
281 self.inner.inner
282 }
283
284 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
285 /// with unprocessed data, and the codec.
286 ///
287 /// Note that care should be taken to not tamper with the underlying stream
288 /// of data coming in as it may corrupt the stream of frames otherwise
289 /// being worked with.
290 pub fn into_parts(self) -> FramedParts<T, U> {
291 FramedParts {
292 io: self.inner.inner,
293 codec: self.inner.codec,
294 read_buf: self.inner.state.read.buffer,
295 write_buf: self.inner.state.write.buffer,
296 _priv: (),
297 }
298 }
299}
300
301// This impl just defers to the underlying FramedImpl
302impl<T, U> Stream for Framed<T, U>
303where
304 T: AsyncRead,
305 U: Decoder,
306{
307 type Item = Result<U::Item, U::Error>;
308
309 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
310 self.project().inner.poll_next(cx)
311 }
312}
313
314// This impl just defers to the underlying FramedImpl
315impl<T, I, U> Sink<I> for Framed<T, U>
316where
317 T: AsyncWrite,
318 U: Encoder<I>,
319 U::Error: From<io::Error>,
320{
321 type Error = U::Error;
322
323 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
324 self.project().inner.poll_ready(cx)
325 }
326
327 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
328 self.project().inner.start_send(item)
329 }
330
331 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
332 self.project().inner.poll_flush(cx)
333 }
334
335 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
336 self.project().inner.poll_close(cx)
337 }
338}
339
340impl<T, U> fmt::Debug for Framed<T, U>
341where
342 T: fmt::Debug,
343 U: fmt::Debug,
344{
345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346 f.debug_struct("Framed")
347 .field("io", self.get_ref())
348 .field("codec", self.codec())
349 .finish()
350 }
351}
352
353/// `FramedParts` contains an export of the data of a Framed transport.
354/// It can be used to construct a new [`Framed`] with a different codec.
355/// It contains all current buffers and the inner transport.
356///
357/// [`Framed`]: crate::codec::Framed
358#[derive(Debug)]
359#[allow(clippy::manual_non_exhaustive)]
360pub struct FramedParts<T, U> {
361 /// The inner transport used to read bytes to and write bytes to
362 pub io: T,
363
364 /// The codec
365 pub codec: U,
366
367 /// The buffer with read but unprocessed data.
368 pub read_buf: BytesMut,
369
370 /// A buffer with unprocessed data which are not written yet.
371 pub write_buf: BytesMut,
372
373 /// This private field allows us to add additional fields in the future in a
374 /// backwards compatible way.
375 pub(crate) _priv: (),
376}
377
378impl<T, U> FramedParts<T, U> {
379 /// Create a new, default, `FramedParts`
380 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
381 where
382 U: Encoder<I>,
383 {
384 FramedParts {
385 io,
386 codec,
387 read_buf: BytesMut::new(),
388 write_buf: BytesMut::new(),
389 _priv: (),
390 }
391 }
392}