use std::cmp;
use std::io;
use std::sync::{Arc, Mutex};
use super::{ReadHalf, TIoChannel, WriteHalf};
#[derive(Clone, Debug)]
pub struct TBufferChannel {
read: Arc<Mutex<ReadData>>,
write: Arc<Mutex<WriteData>>,
}
#[derive(Debug)]
struct ReadData {
buf: Box<[u8]>,
pos: usize,
idx: usize,
cap: usize,
}
#[derive(Debug)]
struct WriteData {
buf: Box<[u8]>,
pos: usize,
cap: usize,
}
impl TBufferChannel {
pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
TBufferChannel {
read: Arc::new(Mutex::new(ReadData {
buf: vec![0; read_capacity].into_boxed_slice(),
idx: 0,
pos: 0,
cap: read_capacity,
})),
write: Arc::new(Mutex::new(WriteData {
buf: vec![0; write_capacity].into_boxed_slice(),
pos: 0,
cap: write_capacity,
})),
}
}
pub fn read_bytes(&self) -> Vec<u8> {
let rdata = self.read.as_ref().lock().unwrap();
let mut buf = vec![0u8; rdata.idx];
buf.copy_from_slice(&rdata.buf[..rdata.idx]);
buf
}
pub fn empty_read_buffer(&mut self) {
let mut rdata = self.read.as_ref().lock().unwrap();
rdata.pos = 0;
rdata.idx = 0;
}
pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
self.empty_read_buffer();
let mut rdata = self.read.as_ref().lock().unwrap();
let max_bytes = cmp::min(rdata.cap, buf.len());
rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
rdata.idx = max_bytes;
max_bytes
}
pub fn write_bytes(&self) -> Vec<u8> {
let wdata = self.write.as_ref().lock().unwrap();
let mut buf = vec![0u8; wdata.pos];
buf.copy_from_slice(&wdata.buf[..wdata.pos]);
buf
}
pub fn empty_write_buffer(&mut self) {
let mut wdata = self.write.as_ref().lock().unwrap();
wdata.pos = 0;
}
pub fn copy_write_buffer_to_read_buffer(&mut self) {
let buf = {
let wdata = self.write.as_ref().lock().unwrap();
let b = &wdata.buf[..wdata.pos];
let mut b_ret = vec![0; b.len()];
b_ret.copy_from_slice(b);
b_ret
};
let bytes_copied = self.set_readable_bytes(&buf);
assert_eq!(bytes_copied, buf.len());
self.empty_write_buffer();
}
}
impl TIoChannel for TBufferChannel {
fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
where
Self: Sized,
{
Ok((
ReadHalf {
handle: TBufferChannel {
read: self.read.clone(),
write: self.write.clone(),
},
},
WriteHalf {
handle: TBufferChannel {
read: self.read.clone(),
write: self.write,
},
},
))
}
}
impl io::Read for TBufferChannel {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut rdata = self.read.as_ref().lock().unwrap();
let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
rdata.pos += nread;
Ok(nread)
}
}
impl io::Write for TBufferChannel {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut wdata = self.write.as_ref().lock().unwrap();
let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
let (start, end) = (wdata.pos, wdata.pos + nwrite);
wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
wdata.pos += nwrite;
Ok(nwrite)
}
fn flush(&mut self) -> io::Result<()> {
Ok(()) }
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use super::TBufferChannel;
#[test]
fn must_empty_write_buffer() {
let mut t = TBufferChannel::with_capacity(0, 1);
let bytes_to_write: [u8; 1] = [0x01];
let result = t.write(&bytes_to_write);
assert_eq!(result.unwrap(), 1);
assert_eq!(&t.write_bytes(), &bytes_to_write);
t.empty_write_buffer();
assert_eq!(t.write_bytes().len(), 0);
}
#[test]
fn must_accept_writes_after_buffer_emptied() {
let mut t = TBufferChannel::with_capacity(0, 2);
let bytes_to_write: [u8; 2] = [0x01, 0x02];
let result = t.write(&bytes_to_write);
assert_eq!(result.unwrap(), 2);
assert_eq!(&t.write_bytes(), &bytes_to_write);
let result = t.write(&bytes_to_write);
assert_eq!(result.unwrap(), 0);
assert_eq!(&t.write_bytes(), &bytes_to_write); t.empty_write_buffer();
assert_eq!(t.write_bytes().len(), 0);
let result = t.write(&bytes_to_write);
assert_eq!(result.unwrap(), 2);
assert_eq!(&t.write_bytes(), &bytes_to_write);
}
#[test]
fn must_accept_multiple_writes_until_buffer_is_full() {
let mut t = TBufferChannel::with_capacity(0, 10);
let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
let write_0_result = t.write(&bytes_to_write_0);
assert_eq!(write_0_result.unwrap(), 2);
assert_eq!(t.write_bytes(), &bytes_to_write_0);
let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
let write_1_result = t.write(&bytes_to_write_1);
assert_eq!(write_1_result.unwrap(), 7);
assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
let write_2_result = t.write(&bytes_to_write_2);
assert_eq!(write_2_result.unwrap(), 1);
assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
let write_3_result = t.write(&bytes_to_write_3);
assert_eq!(write_3_result.unwrap(), 0);
let mut expected: Vec<u8> = Vec::with_capacity(10);
expected.extend_from_slice(&bytes_to_write_0);
expected.extend_from_slice(&bytes_to_write_1);
expected.extend_from_slice(&bytes_to_write_2[0..1]);
assert_eq!(t.write_bytes(), &expected[..]);
}
#[test]
fn must_empty_read_buffer() {
let mut t = TBufferChannel::with_capacity(1, 0);
let bytes_to_read: [u8; 1] = [0x01];
let result = t.set_readable_bytes(&bytes_to_read);
assert_eq!(result, 1);
assert_eq!(t.read_bytes(), &bytes_to_read);
t.empty_read_buffer();
assert_eq!(t.read_bytes().len(), 0);
}
#[test]
fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
let mut t = TBufferChannel::with_capacity(1, 0);
let bytes_to_read_0: [u8; 1] = [0x01];
let result = t.set_readable_bytes(&bytes_to_read_0);
assert_eq!(result, 1);
assert_eq!(t.read_bytes(), &bytes_to_read_0);
t.empty_read_buffer();
assert_eq!(t.read_bytes().len(), 0);
let bytes_to_read_1: [u8; 1] = [0x02];
let result = t.set_readable_bytes(&bytes_to_read_1);
assert_eq!(result, 1);
assert_eq!(t.read_bytes(), &bytes_to_read_1);
}
#[test]
fn must_accept_multiple_reads_until_all_bytes_read() {
let mut t = TBufferChannel::with_capacity(10, 0);
let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
let result = t.set_readable_bytes(&readable_bytes);
assert_eq!(result, 10);
assert_eq!(t.read_bytes(), &readable_bytes);
let mut read_buf_0 = vec![0; 5];
let read_result = t.read(&mut read_buf_0);
assert_eq!(read_result.unwrap(), 5);
assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
let mut read_buf_1 = vec![0; 4];
let read_result = t.read(&mut read_buf_1);
assert_eq!(read_result.unwrap(), 4);
assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
let mut read_buf_2 = vec![0; 3];
let read_result = t.read(&mut read_buf_2);
assert_eq!(read_result.unwrap(), 1);
read_buf_2.truncate(1); assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
let mut read_buf_3 = vec![0; 10];
let read_result = t.read(&mut read_buf_3);
assert_eq!(read_result.unwrap(), 0);
read_buf_3.truncate(0);
let mut bytes_read = Vec::with_capacity(10);
bytes_read.extend_from_slice(&read_buf_0);
bytes_read.extend_from_slice(&read_buf_1);
bytes_read.extend_from_slice(&read_buf_2);
bytes_read.extend_from_slice(&read_buf_3);
assert_eq!(&bytes_read, &readable_bytes);
}
#[test]
fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
let mut t = TBufferChannel::with_capacity(3, 0);
let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
let result = t.set_readable_bytes(&readable_bytes_0);
assert_eq!(result, 3);
assert_eq!(t.read_bytes(), &readable_bytes_0);
let mut read_buf = vec![0; 4];
let read_result = t.read(&mut read_buf);
assert_eq!(read_result.unwrap(), 3);
assert_eq!(t.read_bytes(), &read_buf[0..3]);
let read_result = t.read(&mut read_buf);
assert_eq!(read_result.unwrap(), 0);
let mut expected_bytes = Vec::with_capacity(4);
expected_bytes.extend_from_slice(&readable_bytes_0);
expected_bytes.push(0x00);
assert_eq!(&read_buf, &expected_bytes);
let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
let result = t.set_readable_bytes(&readable_bytes_1);
assert_eq!(result, 2);
assert_eq!(t.read_bytes(), &readable_bytes_1);
let read_result = t.read(&mut read_buf);
assert_eq!(read_result.unwrap(), 2);
assert_eq!(t.read_bytes(), &read_buf[0..2]);
}
}