asynchronous_codec/
framed.rs

1use super::framed_read::{framed_read_2, FramedRead2};
2use super::framed_write::{framed_write_2, FramedWrite2};
3use super::fuse::Fuse;
4use super::{Decoder, Encoder};
5use bytes::BytesMut;
6use futures_sink::Sink;
7use futures_util::io::{AsyncRead, AsyncWrite};
8use futures_util::stream::{Stream, TryStreamExt};
9use pin_project_lite::pin_project;
10use std::marker::Unpin;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16    /// A unified `Stream` and `Sink` interface to an underlying I/O object,
17    /// using the `Encoder` and `Decoder` traits to encode and decode frames.
18    ///
19    /// # Example
20    /// ```
21    /// use bytes::Bytes;
22    /// use futures::{SinkExt, TryStreamExt};
23    /// use futures::io::Cursor;
24    /// use asynchronous_codec::{BytesCodec, Framed};
25    ///
26    /// # futures::executor::block_on(async move {
27    /// let cur = Cursor::new(vec![0u8; 12]);
28    /// let mut framed = Framed::new(cur, BytesCodec {});
29    ///
30    /// // Send bytes to `buf` through the `BytesCodec`
31    /// let bytes = Bytes::from("Hello world!");
32    /// framed.send(bytes).await?;
33    ///
34    /// // Drop down to the underlying I/O stream.
35    /// let cur = framed.into_inner();
36    /// assert_eq!(cur.get_ref(), b"Hello world!");
37    /// # Ok::<_, std::io::Error>(())
38    /// # }).unwrap();
39    /// ```
40    #[derive(Debug)]
41    pub struct Framed<T, U> {
42        #[pin]
43        inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
44    }
45}
46
47impl<T, U> Deref for Framed<T, U> {
48    type Target = T;
49
50    fn deref(&self) -> &T {
51        &self.inner
52    }
53}
54
55impl<T, U> DerefMut for Framed<T, U> {
56    fn deref_mut(&mut self) -> &mut T {
57        &mut self.inner
58    }
59}
60
61impl<T, U> Framed<T, U>
62where
63    T: AsyncRead + AsyncWrite,
64    U: Decoder + Encoder,
65{
66    /// Creates a new `Framed` transport with the given codec.
67    /// A codec is a type which implements `Decoder` and `Encoder`.
68    pub fn new(inner: T, codec: U) -> Self {
69        Self {
70            inner: framed_read_2(framed_write_2(Fuse::new(inner, codec), None), None),
71        }
72    }
73
74    /// Creates a new `Framed` from [`FramedParts`].
75    ///
76    /// See also [`Framed::into_parts`].
77    pub fn from_parts(
78        FramedParts {
79            io,
80            codec,
81            write_buffer,
82            read_buffer,
83            ..
84        }: FramedParts<T, U>,
85    ) -> Self {
86        let framed_write = framed_write_2(Fuse::new(io, codec), Some(write_buffer));
87        let framed_read = framed_read_2(framed_write, Some(read_buffer));
88        Self { inner: framed_read }
89    }
90
91    /// Consumes the `Framed`, returning its parts, such that a new
92    /// `Framed` may be constructed, possibly with a different codec.
93    ///
94    /// See also [`Framed::from_parts`].
95    pub fn into_parts(self) -> FramedParts<T, U> {
96        let (framed_write, read_buffer) = self.inner.into_parts();
97        let (fuse, write_buffer) = framed_write.into_parts();
98        FramedParts {
99            io: fuse.t,
100            codec: fuse.u,
101            read_buffer,
102            write_buffer,
103            _priv: (),
104        }
105    }
106
107    /// Consumes the `Framed`, returning its underlying I/O stream.
108    ///
109    /// Note that data that has already been read or written but not yet
110    /// consumed by the decoder or flushed, respectively, is dropped.
111    /// To retain any such potentially buffered data, use [`Framed::into_parts()`].
112    pub fn into_inner(self) -> T {
113        self.into_parts().io
114    }
115
116    /// Returns a reference to the underlying codec wrapped by
117    /// `Framed`.
118    ///
119    /// Note that care should be taken to not tamper with the underlying codec
120    /// as it may corrupt the stream of frames otherwise being worked with.
121    pub fn codec(&self) -> &U {
122        &self.inner.u
123    }
124
125    /// Returns a mutable reference to the underlying codec wrapped by
126    /// `Framed`.
127    ///
128    /// Note that care should be taken to not tamper with the underlying codec
129    /// as it may corrupt the stream of frames otherwise being worked with.
130    pub fn codec_mut(&mut self) -> &mut U {
131        &mut self.inner.u
132    }
133
134    /// Returns a reference to the read buffer.
135    pub fn read_buffer(&self) -> &BytesMut {
136        self.inner.buffer()
137    }
138}
139
140impl<T, U> Stream for Framed<T, U>
141where
142    T: AsyncRead + Unpin,
143    U: Decoder,
144{
145    type Item = Result<U::Item, U::Error>;
146
147    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148        self.inner.try_poll_next_unpin(cx)
149    }
150}
151
152impl<T, U> Sink<U::Item> for Framed<T, U>
153where
154    T: AsyncWrite + Unpin,
155    U: Encoder,
156{
157    type Error = U::Error;
158
159    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
160        self.project().inner.poll_ready(cx)
161    }
162    fn start_send(self: Pin<&mut Self>, item: U::Item) -> Result<(), Self::Error> {
163        self.project().inner.start_send(item)
164    }
165    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
166        self.project().inner.poll_flush(cx)
167    }
168    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
169        self.project().inner.poll_close(cx)
170    }
171}
172
173/// The parts obtained from [`Framed::into_parts`].
174pub struct FramedParts<T, U> {
175    /// The underlying I/O stream.
176    pub io: T,
177    /// The codec used for encoding and decoding frames.
178    pub codec: U,
179    /// The remaining read buffer, containing data that has been
180    /// read from `io` but not yet consumed by the codec's decoder.
181    pub read_buffer: BytesMut,
182    /// The remaining write buffer, containing framed data that has been
183    /// buffered but not yet flushed to `io`.
184    pub write_buffer: BytesMut,
185    /// Keep the constructor private.
186    _priv: (),
187}
188
189impl<T, U> FramedParts<T, U> {
190    /// Changes the codec used in this `FramedParts`.
191    pub fn map_codec<V, F>(self, f: F) -> FramedParts<T, V>
192    where
193        V: Encoder + Decoder,
194        F: FnOnce(U) -> V,
195    {
196        FramedParts {
197            io: self.io,
198            codec: f(self.codec),
199            read_buffer: self.read_buffer,
200            write_buffer: self.write_buffer,
201            _priv: (),
202        }
203    }
204}