asynchronous_codec/codec/
length.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use crate::{Decoder, Encoder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io::Error;

const U64_LENGTH: usize = std::mem::size_of::<u64>();

/// A simple `Codec` implementation sending your data by prefixing it by its length.
///
/// # Example
///
/// This codec will most likely be used wrapped in another codec like so.
///
/// ```
/// use asynchronous_codec::{Decoder, Encoder, LengthCodec};
/// use bytes::{Bytes, BytesMut};
/// use std::io::{Error, ErrorKind};
///
/// pub struct MyStringCodec(LengthCodec);
///
/// impl Encoder for MyStringCodec {
///     type Item = String;
///     type Error = Error;
///
///     fn encode(&mut self, src: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
///         let bytes = Bytes::from(src);
///         self.0.encode(bytes, dst)
///     }
/// }
///
/// impl Decoder for MyStringCodec {
///     type Item = String;
///     type Error = Error;
///
///     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
///         match self.0.decode(src)? {
///             Some(bytes) => {
///                 match String::from_utf8(bytes.to_vec()) {
///                     Ok(string) => Ok(Some(string)),
///                     Err(e) => Err(Error::new(ErrorKind::InvalidData, e))
///                 }
///             },
///             None => Ok(None),
///         }
///     }
/// }
/// ```
pub struct LengthCodec;

impl Encoder for LengthCodec {
    type Item = Bytes;
    type Error = Error;

    fn encode(&mut self, src: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
        dst.reserve(U64_LENGTH + src.len());
        dst.put_u64(src.len() as u64);
        dst.extend_from_slice(&src);
        Ok(())
    }
}

impl Decoder for LengthCodec {
    type Item = Bytes;
    type Error = Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < U64_LENGTH {
            return Ok(None);
        }

        let mut len_bytes = [0u8; U64_LENGTH];
        len_bytes.copy_from_slice(&src[..U64_LENGTH]);
        let len = u64::from_be_bytes(len_bytes) as usize;

        if src.len() - U64_LENGTH >= len {
            // Skip the length header we already read.
            src.advance(U64_LENGTH);
            Ok(Some(src.split_to(len).freeze()))
        } else {
            Ok(None)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    mod decode {
        use super::*;

        #[test]
        fn it_returns_bytes_withouth_length_header() {
            let mut codec = LengthCodec {};

            let mut src = BytesMut::with_capacity(5);
            src.put(&[0, 0, 0, 0, 0, 0, 0, 3u8, 1, 2, 3, 4][..]);
            let item = codec.decode(&mut src).unwrap();

            assert!(item == Some(Bytes::from(&[1u8, 2, 3][..])));
        }
    }
}