mysql_common/proto/
sync_framed.rs

1// Copyright (c) 2017 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use bytes::{Buf, BufMut, BytesMut};
10
11use crate::{
12    constants::DEFAULT_MAX_ALLOWED_PACKET,
13    proto::codec::{error::PacketCodecError, PacketCodec},
14};
15
16use std::{
17    io::{
18        Error,
19        ErrorKind::{Interrupted, Other},
20        Read, Write,
21    },
22    ptr::slice_from_raw_parts_mut,
23};
24
25// stolen from futures-rs
26macro_rules! with_interrupt {
27    ($e:expr) => {
28        loop {
29            match $e {
30                Ok(x) => {
31                    break Ok(x);
32                }
33                Err(ref e) if e.kind() == Interrupted => {
34                    continue;
35                }
36                Err(e) => {
37                    break Err(e);
38                }
39            }
40        }
41    };
42}
43
44/// Synchronous framed stream for MySql protocol.
45///
46/// This type is a synchronous alternative to `tokio_codec::Framed`.
47#[derive(Debug)]
48pub struct MySyncFramed<T> {
49    eof: bool,
50    in_buf: BytesMut,
51    out_buf: BytesMut,
52    codec: PacketCodec,
53    stream: T,
54}
55
56impl<T> MySyncFramed<T> {
57    /// Creates new instance with the given `stream`.
58    pub fn new(stream: T) -> Self {
59        MySyncFramed {
60            eof: false,
61            in_buf: BytesMut::with_capacity(DEFAULT_MAX_ALLOWED_PACKET),
62            out_buf: BytesMut::with_capacity(DEFAULT_MAX_ALLOWED_PACKET),
63            codec: PacketCodec::default(),
64            stream,
65        }
66    }
67
68    /// Returns reference to a stream.
69    pub fn get_ref(&self) -> &T {
70        &self.stream
71    }
72
73    /// Returns mutable reference to a stream.
74    pub fn get_mut(&mut self) -> &mut T {
75        &mut self.stream
76    }
77
78    /// Returns reference to a codec.
79    pub fn codec(&self) -> &PacketCodec {
80        &self.codec
81    }
82
83    /// Returns mutable reference to a codec.
84    pub fn codec_mut(&mut self) -> &mut PacketCodec {
85        &mut self.codec
86    }
87
88    /// Consumes self and returns wrapped buffers, codec and stream.
89    pub fn destruct(self) -> (BytesMut, BytesMut, PacketCodec, T) {
90        (self.in_buf, self.out_buf, self.codec, self.stream)
91    }
92
93    /// Creates new instance from given buffers, `codec` and `stream`.
94    pub fn construct(in_buf: BytesMut, out_buf: BytesMut, codec: PacketCodec, stream: T) -> Self {
95        Self {
96            eof: false,
97            in_buf,
98            out_buf,
99            codec,
100            stream,
101        }
102    }
103}
104
105impl<T> MySyncFramed<T>
106where
107    T: Write,
108{
109    /// Will write packets into the stream. Stream may not be flushed.
110    pub fn write<U: Buf>(&mut self, item: &mut U) -> Result<(), PacketCodecError> {
111        self.codec.encode(item, &mut self.out_buf)?;
112        with_interrupt!(self.stream.write_all(&self.out_buf))?;
113        self.out_buf.clear();
114        Ok(())
115    }
116
117    /// Will flush wrapped stream.
118    pub fn flush(&mut self) -> Result<(), PacketCodecError> {
119        with_interrupt!(self.stream.flush())?;
120        Ok(())
121    }
122
123    /// Will send packets into the stream. Stream will be flushed.
124    pub fn send<U: Buf>(&mut self, item: &mut U) -> Result<(), PacketCodecError> {
125        self.write(item)?;
126        self.flush()
127    }
128}
129
130impl<T> MySyncFramed<T>
131where
132    T: Read,
133{
134    /// Returns `true` if `dst` contains the next packet.
135    ///
136    /// `false` means, that the `dst` is empty and the stream is at eof.
137    pub fn next_packet<U>(&mut self, dst: &mut U) -> Result<bool, PacketCodecError>
138    where
139        U: AsRef<[u8]>,
140        U: BufMut,
141    {
142        loop {
143            if self.eof {
144                return match self.codec.decode(&mut self.in_buf, dst)? {
145                    true => Ok(true),
146                    false => {
147                        if self.in_buf.is_empty() {
148                            Ok(false)
149                        } else {
150                            Err(Error::new(Other, "bytes remaining on stream").into())
151                        }
152                    }
153                };
154            } else {
155                match self.codec.decode(&mut self.in_buf, dst)? {
156                    true => return Ok(true),
157                    false => unsafe {
158                        self.in_buf.reserve(1);
159                        match with_interrupt!(self.stream.read(&mut *slice_from_raw_parts_mut(
160                            self.in_buf.chunk_mut().as_mut_ptr(),
161                            self.in_buf.chunk_mut().len()
162                        ))) {
163                            Ok(0) => self.eof = true,
164                            Ok(x) => self.in_buf.advance_mut(x),
165                            Err(err) => return Err(From::from(err)),
166                        }
167                    },
168                }
169            }
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use crate::{constants::MAX_PAYLOAD_LEN, proto::sync_framed::MySyncFramed};
177
178    #[test]
179    fn iter_packets() {
180        let mut buf = Vec::new();
181        {
182            let mut framed = MySyncFramed::new(&mut buf);
183            framed.codec_mut().max_allowed_packet = MAX_PAYLOAD_LEN;
184            framed.send(&mut &*vec![0_u8; 0]).unwrap();
185            framed.send(&mut &*vec![0_u8; 1]).unwrap();
186            framed.send(&mut &*vec![0_u8; MAX_PAYLOAD_LEN]).unwrap();
187        }
188        let mut buf = &buf[..];
189        let mut framed = MySyncFramed::new(&mut buf);
190        framed.codec_mut().max_allowed_packet = MAX_PAYLOAD_LEN;
191        let mut dst = vec![];
192        assert!(framed.next_packet(&mut dst).unwrap());
193        assert_eq!(dst, vec![0_u8; 0]);
194        dst.clear();
195        assert!(framed.next_packet(&mut dst).unwrap());
196        assert_eq!(dst, vec![0_u8; 1]);
197        dst.clear();
198        assert!(framed.next_packet(&mut dst).unwrap());
199        assert_eq!(dst, vec![0_u8; MAX_PAYLOAD_LEN]);
200        dst.clear();
201        assert!(!framed.next_packet(&mut dst).unwrap());
202    }
203
204    #[test]
205    #[should_panic(expected = "bytes remaining on stream")]
206    fn incomplete_packet() {
207        let buf = vec![2, 0, 0, 0];
208        let mut buf = &buf[..];
209        let mut dst = vec![];
210        let mut framed = MySyncFramed::new(&mut buf);
211        framed.next_packet(&mut dst).unwrap();
212    }
213}
214
215#[cfg(feature = "nightly")]
216mod bench {
217    use std::io;
218
219    use bytes::BytesMut;
220
221    use super::MySyncFramed;
222    use crate::constants::MAX_PAYLOAD_LEN;
223
224    struct Null;
225
226    impl io::Write for Null {
227        fn write(&mut self, x: &[u8]) -> io::Result<usize> {
228            Ok(x.len())
229        }
230        fn flush(&mut self) -> io::Result<()> {
231            Ok(())
232        }
233    }
234
235    struct Loop {
236        buf: Vec<u8>,
237        pos: usize,
238    }
239
240    impl io::Read for Loop {
241        fn read(&mut self, x: &mut [u8]) -> io::Result<usize> {
242            let count = std::cmp::min(x.len(), self.buf.len() - self.pos);
243            x[..count].copy_from_slice(&self.buf[self.pos..(self.pos + count)]);
244            self.pos = (self.pos + count) % self.buf.len();
245            Ok(count)
246        }
247    }
248
249    #[bench]
250    fn write_small(bencher: &mut test::Bencher) {
251        const SIZE: usize = 512;
252        let mut framed = MySyncFramed::new(Null);
253        framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
254
255        let buf = vec![0; SIZE];
256
257        bencher.bytes = (SIZE + 4 + (SIZE / MAX_PAYLOAD_LEN * 4)) as u64;
258        bencher.iter(|| {
259            framed.send(&mut &*buf).unwrap();
260        });
261    }
262
263    #[bench]
264    fn write_med(bencher: &mut test::Bencher) {
265        const SIZE: usize = 1024 * 1024;
266        let mut framed = MySyncFramed::new(Null);
267        framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
268
269        let buf = vec![0; SIZE];
270
271        bencher.bytes = (SIZE + 4 + (SIZE / MAX_PAYLOAD_LEN * 4)) as u64;
272        bencher.iter(|| {
273            framed.send(&mut &*buf).unwrap();
274        });
275    }
276
277    #[bench]
278    fn write_large(bencher: &mut test::Bencher) {
279        const SIZE: usize = 1024 * 1024 * 64;
280        let mut framed = MySyncFramed::new(Null);
281        framed.codec_mut().max_allowed_packet = 1024 * 1024 * 64;
282
283        let buf = vec![0; SIZE];
284
285        bencher.bytes = (SIZE + 4 + (SIZE / MAX_PAYLOAD_LEN * 4)) as u64;
286        bencher.iter(|| {
287            framed.send(&mut &*buf).unwrap();
288        });
289    }
290
291    #[bench]
292    fn read_small(bencher: &mut test::Bencher) {
293        const SIZE: usize = 512;
294        let mut buf = vec![];
295        let mut framed = MySyncFramed::new(&mut buf);
296
297        framed.send(&mut &*vec![0; SIZE]).unwrap();
298
299        bencher.bytes = buf.len() as u64;
300        let input = Loop { buf, pos: 0 };
301        let mut framed = MySyncFramed::new(input);
302        let mut buf = BytesMut::new();
303        bencher.iter(|| {
304            framed.codec_mut().reset_seq_id();
305            assert!(framed.next_packet(&mut buf).unwrap());
306            buf.clear();
307        });
308    }
309
310    #[bench]
311    fn read_med(bencher: &mut test::Bencher) {
312        const SIZE: usize = 1024 * 1024;
313        let mut buf = vec![];
314        let mut framed = MySyncFramed::new(&mut buf);
315
316        framed.send(&mut &*vec![0; SIZE]).unwrap();
317
318        bencher.bytes = buf.len() as u64;
319        let input = Loop { buf, pos: 0 };
320        let mut framed = MySyncFramed::new(input);
321        let mut buf = BytesMut::new();
322        bencher.iter(|| {
323            framed.codec_mut().reset_seq_id();
324            assert!(framed.next_packet(&mut buf).unwrap());
325            buf.clear();
326        });
327    }
328
329    #[bench]
330    fn read_large(bencher: &mut test::Bencher) {
331        const SIZE: usize = 1024 * 1024 * 32;
332        let mut buf = vec![];
333        let mut framed = MySyncFramed::new(&mut buf);
334        framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
335
336        framed.send(&mut &*vec![0; SIZE]).unwrap();
337
338        bencher.bytes = buf.len() as u64;
339        let input = Loop { buf, pos: 0 };
340        let mut framed = MySyncFramed::new(input);
341        framed.codec_mut().max_allowed_packet = 1024 * 1024 * 32;
342        let mut buf = BytesMut::new();
343        bencher.iter(|| {
344            framed.codec_mut().reset_seq_id();
345            assert!(framed.next_packet(&mut buf).unwrap());
346            buf.clear();
347        });
348    }
349}