mysql_common/proto/
sync_framed.rs
1use bytes::{Buf, BufMut, BytesMut};
10
11use crate::{
12 constants::DEFAULT_MAX_ALLOWED_PACKET,
13 proto::codec::{error::PacketCodecError, PacketCodec},
14};
15
16use std::{
17 io::{
18 Error,
19 ErrorKind::{Interrupted, Other},
20 Read, Write,
21 },
22 ptr::slice_from_raw_parts_mut,
23};
24
25macro_rules! with_interrupt {
27 ($e:expr) => {
28 loop {
29 match $e {
30 Ok(x) => {
31 break Ok(x);
32 }
33 Err(ref e) if e.kind() == Interrupted => {
34 continue;
35 }
36 Err(e) => {
37 break Err(e);
38 }
39 }
40 }
41 };
42}
43
44#[derive(Debug)]
48pub struct MySyncFramed<T> {
49 eof: bool,
50 in_buf: BytesMut,
51 out_buf: BytesMut,
52 codec: PacketCodec,
53 stream: T,
54}
55
56impl<T> MySyncFramed<T> {
57 pub fn new(stream: T) -> Self {
59 MySyncFramed {
60 eof: false,
61 in_buf: BytesMut::with_capacity(DEFAULT_MAX_ALLOWED_PACKET),
62 out_buf: BytesMut::with_capacity(DEFAULT_MAX_ALLOWED_PACKET),
63 codec: PacketCodec::default(),
64 stream,
65 }
66 }
67
68 pub fn get_ref(&self) -> &T {
70 &self.stream
71 }
72
73 pub fn get_mut(&mut self) -> &mut T {
75 &mut self.stream
76 }
77
78 pub fn codec(&self) -> &PacketCodec {
80 &self.codec
81 }
82
83 pub fn codec_mut(&mut self) -> &mut PacketCodec {
85 &mut self.codec
86 }
87
88 pub fn destruct(self) -> (BytesMut, BytesMut, PacketCodec, T) {
90 (self.in_buf, self.out_buf, self.codec, self.stream)
91 }
92
93 pub fn construct(in_buf: BytesMut, out_buf: BytesMut, codec: PacketCodec, stream: T) -> Self {
95 Self {
96 eof: false,
97 in_buf,
98 out_buf,
99 codec,
100 stream,
101 }
102 }
103}
104
105impl<T> MySyncFramed<T>
106where
107 T: Write,
108{
109 pub fn write<U: Buf>(&mut self, item: &mut U) -> Result<(), PacketCodecError> {
111 self.codec.encode(item, &mut self.out_buf)?;
112 with_interrupt!(self.stream.write_all(&self.out_buf))?;
113 self.out_buf.clear();
114 Ok(())
115 }
116
117 pub fn flush(&mut self) -> Result<(), PacketCodecError> {
119 with_interrupt!(self.stream.flush())?;
120 Ok(())
121 }
122
123 pub fn send<U: Buf>(&mut self, item: &mut U) -> Result<(), PacketCodecError> {
125 self.write(item)?;
126 self.flush()
127 }
128}
129
130impl<T> MySyncFramed<T>
131where
132 T: Read,
133{
134 pub fn next_packet<U>(&mut self, dst: &mut U) -> Result<bool, PacketCodecError>
138 where
139 U: AsRef<[u8]>,
140 U: BufMut,
141 {
142 loop {
143 if self.eof {
144 return match self.codec.decode(&mut self.in_buf, dst)? {
145 true => Ok(true),
146 false => {
147 if self.in_buf.is_empty() {
148 Ok(false)
149 } else {
150 Err(Error::new(Other, "bytes remaining on stream").into())
151 }
152 }
153 };
154 } else {
155 match self.codec.decode(&mut self.in_buf, dst)? {
156 true => return Ok(true),
157 false => unsafe {
158 self.in_buf.reserve(1);
159 match with_interrupt!(self.stream.read(&mut *slice_from_raw_parts_mut(
160 self.in_buf.chunk_mut().as_mut_ptr(),
161 self.in_buf.chunk_mut().len()
162 ))) {
163 Ok(0) => self.eof = true,
164 Ok(x) => self.in_buf.advance_mut(x),
165 Err(err) => return Err(From::from(err)),
166 }
167 },
168 }
169 }
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use crate::{constants::MAX_PAYLOAD_LEN, proto::sync_framed::MySyncFramed};
177
178 #[test]
179 fn iter_packets() {
180 let mut buf = Vec::new();
181 {
182 let mut framed = MySyncFramed::new(&mut buf);
183 framed.codec_mut().max_allowed_packet = MAX_PAYLOAD_LEN;
184 framed.send(&mut &*vec![0_u8; 0]).unwrap();
185 framed.send(&mut &*vec![0_u8; 1]).unwrap();
186 framed.send(&mut &*vec![0_u8; MAX_PAYLOAD_LEN]).unwrap();
187 }
188 let mut buf = &buf[..];
189 let mut framed = MySyncFramed::new(&mut buf);
190 framed.codec_mut().max_allowed_packet = MAX_PAYLOAD_LEN;
191 let mut dst = vec![];
192 assert!(framed.next_packet(&mut dst).unwrap());
193 assert_eq!(dst, vec![0_u8; 0]);
194 dst.clear();
195 assert!(framed.next_packet(&mut dst).unwrap());
196 assert_eq!(dst, vec![0_u8; 1]);
197 dst.clear();
198 assert!(framed.next_packet(&mut dst).unwrap());
199 assert_eq!(dst, vec![0_u8; MAX_PAYLOAD_LEN]);
200 dst.clear();
201 assert!(!framed.next_packet(&mut dst).unwrap());
202 }
203
204 #[test]
205 #[should_panic(expected = "bytes remaining on stream")]
206 fn incomplete_packet() {
207 let buf = vec![2, 0, 0, 0];
208 let mut buf = &buf[..];
209 let mut dst = vec![];
210 let mut framed = MySyncFramed::new(&mut buf);
211 framed.next_packet(&mut dst).unwrap();
212 }
213}
214
215#[cfg(feature = "nightly")]
216mod bench {
217 use std::io;
218
219 use bytes::BytesMut;
220
221 use super::MySyncFramed;
222 use crate::constants::MAX_PAYLOAD_LEN;
223
224 struct Null;
225
226 impl io::Write for Null {
227 fn write(&mut self, x: &[u8]) -> io::Result<usize> {
228 Ok(x.len())
229 }
230 fn flush(&mut self) -> io::Result<()> {
231 Ok(())
232 }
233 }
234
235 struct Loop {
236 buf: Vec<u8>,
237 pos: usize,
238 }
239
240 impl io::Read for Loop {
241 fn read(&mut self, x: &mut [u8]) -> io::Result<usize> {
242 let count = std::cmp::min(x.len(), self.buf.len() - self.pos);
243 x[..count].copy_from_slice(&self.buf[self.pos..(self.pos + count)]);
244 self.pos = (self.pos + count) % self.buf.len();
245 Ok(count)
246 }
247 }
248
249 #[bench]
250 fn write_small(bencher: &mut test::Bencher) {
251 const SIZE: usize = 512;
252 let mut framed = MySyncFramed::new(Null);
253 framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
254
255 let buf = vec![0; SIZE];
256
257 bencher.bytes = (SIZE + 4 + (SIZE / MAX_PAYLOAD_LEN * 4)) as u64;
258 bencher.iter(|| {
259 framed.send(&mut &*buf).unwrap();
260 });
261 }
262
263 #[bench]
264 fn write_med(bencher: &mut test::Bencher) {
265 const SIZE: usize = 1024 * 1024;
266 let mut framed = MySyncFramed::new(Null);
267 framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
268
269 let buf = vec![0; SIZE];
270
271 bencher.bytes = (SIZE + 4 + (SIZE / MAX_PAYLOAD_LEN * 4)) as u64;
272 bencher.iter(|| {
273 framed.send(&mut &*buf).unwrap();
274 });
275 }
276
277 #[bench]
278 fn write_large(bencher: &mut test::Bencher) {
279 const SIZE: usize = 1024 * 1024 * 64;
280 let mut framed = MySyncFramed::new(Null);
281 framed.codec_mut().max_allowed_packet = 1024 * 1024 * 64;
282
283 let buf = vec![0; SIZE];
284
285 bencher.bytes = (SIZE + 4 + (SIZE / MAX_PAYLOAD_LEN * 4)) as u64;
286 bencher.iter(|| {
287 framed.send(&mut &*buf).unwrap();
288 });
289 }
290
291 #[bench]
292 fn read_small(bencher: &mut test::Bencher) {
293 const SIZE: usize = 512;
294 let mut buf = vec![];
295 let mut framed = MySyncFramed::new(&mut buf);
296
297 framed.send(&mut &*vec![0; SIZE]).unwrap();
298
299 bencher.bytes = buf.len() as u64;
300 let input = Loop { buf, pos: 0 };
301 let mut framed = MySyncFramed::new(input);
302 let mut buf = BytesMut::new();
303 bencher.iter(|| {
304 framed.codec_mut().reset_seq_id();
305 assert!(framed.next_packet(&mut buf).unwrap());
306 buf.clear();
307 });
308 }
309
310 #[bench]
311 fn read_med(bencher: &mut test::Bencher) {
312 const SIZE: usize = 1024 * 1024;
313 let mut buf = vec![];
314 let mut framed = MySyncFramed::new(&mut buf);
315
316 framed.send(&mut &*vec![0; SIZE]).unwrap();
317
318 bencher.bytes = buf.len() as u64;
319 let input = Loop { buf, pos: 0 };
320 let mut framed = MySyncFramed::new(input);
321 let mut buf = BytesMut::new();
322 bencher.iter(|| {
323 framed.codec_mut().reset_seq_id();
324 assert!(framed.next_packet(&mut buf).unwrap());
325 buf.clear();
326 });
327 }
328
329 #[bench]
330 fn read_large(bencher: &mut test::Bencher) {
331 const SIZE: usize = 1024 * 1024 * 32;
332 let mut buf = vec![];
333 let mut framed = MySyncFramed::new(&mut buf);
334 framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
335
336 framed.send(&mut &*vec![0; SIZE]).unwrap();
337
338 bencher.bytes = buf.len() as u64;
339 let input = Loop { buf, pos: 0 };
340 let mut framed = MySyncFramed::new(input);
341 framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
342 let mut buf = BytesMut::new();
343 bencher.iter(|| {
344 framed.codec_mut().reset_seq_id();
345 assert!(framed.next_packet(&mut buf).unwrap());
346 buf.clear();
347 });
348 }
349}