asynchronous_codec/
framed.rs
1use super::framed_read::{framed_read_2, FramedRead2};
2use super::framed_write::{framed_write_2, FramedWrite2};
3use super::fuse::Fuse;
4use super::{Decoder, Encoder};
5use bytes::BytesMut;
6use futures_sink::Sink;
7use futures_util::io::{AsyncRead, AsyncWrite};
8use futures_util::stream::{Stream, TryStreamExt};
9use pin_project_lite::pin_project;
10use std::marker::Unpin;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16 #[derive(Debug)]
41 pub struct Framed<T, U> {
42 #[pin]
43 inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
44 }
45}
46
47impl<T, U> Deref for Framed<T, U> {
48 type Target = T;
49
50 fn deref(&self) -> &T {
51 &self.inner
52 }
53}
54
55impl<T, U> DerefMut for Framed<T, U> {
56 fn deref_mut(&mut self) -> &mut T {
57 &mut self.inner
58 }
59}
60
61impl<T, U> Framed<T, U>
62where
63 T: AsyncRead + AsyncWrite,
64 U: Decoder + Encoder,
65{
66 pub fn new(inner: T, codec: U) -> Self {
69 Self {
70 inner: framed_read_2(framed_write_2(Fuse::new(inner, codec), None), None),
71 }
72 }
73
74 pub fn from_parts(
78 FramedParts {
79 io,
80 codec,
81 write_buffer,
82 read_buffer,
83 ..
84 }: FramedParts<T, U>,
85 ) -> Self {
86 let framed_write = framed_write_2(Fuse::new(io, codec), Some(write_buffer));
87 let framed_read = framed_read_2(framed_write, Some(read_buffer));
88 Self { inner: framed_read }
89 }
90
91 pub fn into_parts(self) -> FramedParts<T, U> {
96 let (framed_write, read_buffer) = self.inner.into_parts();
97 let (fuse, write_buffer) = framed_write.into_parts();
98 FramedParts {
99 io: fuse.t,
100 codec: fuse.u,
101 read_buffer,
102 write_buffer,
103 _priv: (),
104 }
105 }
106
107 pub fn into_inner(self) -> T {
113 self.into_parts().io
114 }
115
116 pub fn codec(&self) -> &U {
122 &self.inner.u
123 }
124
125 pub fn codec_mut(&mut self) -> &mut U {
131 &mut self.inner.u
132 }
133
134 pub fn read_buffer(&self) -> &BytesMut {
136 self.inner.buffer()
137 }
138}
139
140impl<T, U> Stream for Framed<T, U>
141where
142 T: AsyncRead + Unpin,
143 U: Decoder,
144{
145 type Item = Result<U::Item, U::Error>;
146
147 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148 self.inner.try_poll_next_unpin(cx)
149 }
150}
151
152impl<T, U> Sink<U::Item> for Framed<T, U>
153where
154 T: AsyncWrite + Unpin,
155 U: Encoder,
156{
157 type Error = U::Error;
158
159 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
160 self.project().inner.poll_ready(cx)
161 }
162 fn start_send(self: Pin<&mut Self>, item: U::Item) -> Result<(), Self::Error> {
163 self.project().inner.start_send(item)
164 }
165 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
166 self.project().inner.poll_flush(cx)
167 }
168 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
169 self.project().inner.poll_close(cx)
170 }
171}
172
173pub struct FramedParts<T, U> {
175 pub io: T,
177 pub codec: U,
179 pub read_buffer: BytesMut,
182 pub write_buffer: BytesMut,
185 _priv: (),
187}
188
189impl<T, U> FramedParts<T, U> {
190 pub fn map_codec<V, F>(self, f: F) -> FramedParts<T, V>
192 where
193 V: Encoder + Decoder,
194 F: FnOnce(U) -> V,
195 {
196 FramedParts {
197 io: self.io,
198 codec: f(self.codec),
199 read_buffer: self.read_buffer,
200 write_buffer: self.write_buffer,
201 _priv: (),
202 }
203 }
204}