tungstenite/
buffer.rs

1//! A buffer for reading data from the network.
2//!
3//! The `ReadBuffer` is a buffer of bytes similar to a first-in, first-out queue.
4//! It is filled by reading from a stream supporting `Read` and is then
5//! accessible as a cursor for reading bytes.
6
7use std::io::{Cursor, Read, Result as IoResult};
8
9use bytes::Buf;
10
11/// A FIFO buffer for reading packets from the network.
12#[derive(Debug)]
13pub struct ReadBuffer<const CHUNK_SIZE: usize> {
14    storage: Cursor<Vec<u8>>,
15    chunk: Box<[u8; CHUNK_SIZE]>,
16}
17
18impl<const CHUNK_SIZE: usize> ReadBuffer<CHUNK_SIZE> {
19    /// Create a new empty input buffer.
20    pub fn new() -> Self {
21        Self::with_capacity(CHUNK_SIZE)
22    }
23
24    /// Create a new empty input buffer with a given `capacity`.
25    pub fn with_capacity(capacity: usize) -> Self {
26        Self::from_partially_read(Vec::with_capacity(capacity))
27    }
28
29    /// Create a input buffer filled with previously read data.
30    pub fn from_partially_read(part: Vec<u8>) -> Self {
31        Self { storage: Cursor::new(part), chunk: Box::new([0; CHUNK_SIZE]) }
32    }
33
34    /// Get a cursor to the data storage.
35    pub fn as_cursor(&self) -> &Cursor<Vec<u8>> {
36        &self.storage
37    }
38
39    /// Get a cursor to the mutable data storage.
40    pub fn as_cursor_mut(&mut self) -> &mut Cursor<Vec<u8>> {
41        &mut self.storage
42    }
43
44    /// Consume the `ReadBuffer` and get the internal storage.
45    pub fn into_vec(mut self) -> Vec<u8> {
46        // Current implementation of `tungstenite-rs` expects that the `into_vec()` drains
47        // the data from the container that has already been read by the cursor.
48        self.clean_up();
49
50        // Now we can safely return the internal container.
51        self.storage.into_inner()
52    }
53
54    /// Read next portion of data from the given input stream.
55    pub fn read_from<S: Read>(&mut self, stream: &mut S) -> IoResult<usize> {
56        self.clean_up();
57        let size = stream.read(&mut *self.chunk)?;
58        self.storage.get_mut().extend_from_slice(&self.chunk[..size]);
59        Ok(size)
60    }
61
62    /// Cleans ups the part of the vector that has been already read by the cursor.
63    fn clean_up(&mut self) {
64        let pos = self.storage.position() as usize;
65        self.storage.get_mut().drain(0..pos).count();
66        self.storage.set_position(0);
67    }
68}
69
70impl<const CHUNK_SIZE: usize> Buf for ReadBuffer<CHUNK_SIZE> {
71    fn remaining(&self) -> usize {
72        Buf::remaining(self.as_cursor())
73    }
74
75    fn chunk(&self) -> &[u8] {
76        Buf::chunk(self.as_cursor())
77    }
78
79    fn advance(&mut self, cnt: usize) {
80        Buf::advance(self.as_cursor_mut(), cnt);
81    }
82}
83
84impl<const CHUNK_SIZE: usize> Default for ReadBuffer<CHUNK_SIZE> {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[test]
95    fn simple_reading() {
96        let mut input = Cursor::new(b"Hello World!".to_vec());
97        let mut buffer = ReadBuffer::<4096>::new();
98        let size = buffer.read_from(&mut input).unwrap();
99        assert_eq!(size, 12);
100        assert_eq!(buffer.chunk(), b"Hello World!");
101    }
102
103    #[test]
104    fn reading_in_chunks() {
105        let mut inp = Cursor::new(b"Hello World!".to_vec());
106        let mut buf = ReadBuffer::<4>::new();
107
108        let size = buf.read_from(&mut inp).unwrap();
109        assert_eq!(size, 4);
110        assert_eq!(buf.chunk(), b"Hell");
111
112        buf.advance(2);
113        assert_eq!(buf.chunk(), b"ll");
114        assert_eq!(buf.storage.get_mut(), b"Hell");
115
116        let size = buf.read_from(&mut inp).unwrap();
117        assert_eq!(size, 4);
118        assert_eq!(buf.chunk(), b"llo Wo");
119        assert_eq!(buf.storage.get_mut(), b"llo Wo");
120
121        let size = buf.read_from(&mut inp).unwrap();
122        assert_eq!(size, 4);
123        assert_eq!(buf.chunk(), b"llo World!");
124    }
125}