tokio_util/codec/
framed_write.rs1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15use super::FramedParts;
16
17pin_project! {
18 pub struct FramedWrite<T, E> {
33 #[pin]
34 inner: FramedImpl<T, E, WriteFrame>,
35 }
36}
37
38impl<T, E> FramedWrite<T, E> {
39 pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
41 FramedWrite {
42 inner: FramedImpl {
43 inner,
44 codec: encoder,
45 state: WriteFrame::default(),
46 },
47 }
48 }
49
50 pub fn with_capacity(inner: T, encoder: E, capacity: usize) -> FramedWrite<T, E> {
53 FramedWrite {
54 inner: FramedImpl {
55 inner,
56 codec: encoder,
57 state: WriteFrame {
58 buffer: BytesMut::with_capacity(capacity),
59 backpressure_boundary: capacity,
60 },
61 },
62 }
63 }
64
65 pub fn get_ref(&self) -> &T {
72 &self.inner.inner
73 }
74
75 pub fn get_mut(&mut self) -> &mut T {
82 &mut self.inner.inner
83 }
84
85 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
92 self.project().inner.project().inner
93 }
94
95 pub fn into_inner(self) -> T {
101 self.inner.inner
102 }
103
104 pub fn encoder(&self) -> &E {
106 &self.inner.codec
107 }
108
109 pub fn encoder_mut(&mut self) -> &mut E {
111 &mut self.inner.codec
112 }
113
114 pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
117 where
118 F: FnOnce(E) -> C,
119 {
120 let FramedImpl {
122 inner,
123 state,
124 codec,
125 } = self.inner;
126 FramedWrite {
127 inner: FramedImpl {
128 inner,
129 state,
130 codec: map(codec),
131 },
132 }
133 }
134
135 pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
137 self.project().inner.project().codec
138 }
139
140 pub fn write_buffer(&self) -> &BytesMut {
142 &self.inner.state.buffer
143 }
144
145 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
147 &mut self.inner.state.buffer
148 }
149
150 pub fn backpressure_boundary(&self) -> usize {
152 self.inner.state.backpressure_boundary
153 }
154
155 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
157 self.inner.state.backpressure_boundary = boundary;
158 }
159
160 pub fn into_parts(self) -> FramedParts<T, E> {
163 FramedParts {
164 io: self.inner.inner,
165 codec: self.inner.codec,
166 read_buf: BytesMut::new(),
167 write_buf: self.inner.state.buffer,
168 _priv: (),
169 }
170 }
171}
172
173impl<T, I, E> Sink<I> for FramedWrite<T, E>
175where
176 T: AsyncWrite,
177 E: Encoder<I>,
178 E::Error: From<io::Error>,
179{
180 type Error = E::Error;
181
182 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
183 self.project().inner.poll_ready(cx)
184 }
185
186 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
187 self.project().inner.start_send(item)
188 }
189
190 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
191 self.project().inner.poll_flush(cx)
192 }
193
194 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 self.project().inner.poll_close(cx)
196 }
197}
198
199impl<T, D> Stream for FramedWrite<T, D>
201where
202 T: Stream,
203{
204 type Item = T::Item;
205
206 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207 self.project().inner.project().inner.poll_next(cx)
208 }
209}
210
211impl<T, U> fmt::Debug for FramedWrite<T, U>
212where
213 T: fmt::Debug,
214 U: fmt::Debug,
215{
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 f.debug_struct("FramedWrite")
218 .field("inner", &self.get_ref())
219 .field("encoder", &self.encoder())
220 .field("buffer", &self.inner.state.buffer)
221 .finish()
222 }
223}