1use std::cmp;
19use std::io;
20use std::sync::{Arc, Mutex};
21
22use super::{ReadHalf, TIoChannel, WriteHalf};
23
24#[derive(Clone, Debug)]
35pub struct TBufferChannel {
36 read: Arc<Mutex<ReadData>>,
37 write: Arc<Mutex<WriteData>>,
38}
39
40#[derive(Debug)]
41struct ReadData {
42 buf: Box<[u8]>,
43 pos: usize,
44 idx: usize,
45 cap: usize,
46}
47
48#[derive(Debug)]
49struct WriteData {
50 buf: Box<[u8]>,
51 pos: usize,
52 cap: usize,
53}
54
55impl TBufferChannel {
56 pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
59 TBufferChannel {
60 read: Arc::new(Mutex::new(ReadData {
61 buf: vec![0; read_capacity].into_boxed_slice(),
62 idx: 0,
63 pos: 0,
64 cap: read_capacity,
65 })),
66 write: Arc::new(Mutex::new(WriteData {
67 buf: vec![0; write_capacity].into_boxed_slice(),
68 pos: 0,
69 cap: write_capacity,
70 })),
71 }
72 }
73
74 pub fn read_bytes(&self) -> Vec<u8> {
77 let rdata = self.read.as_ref().lock().unwrap();
78 let mut buf = vec![0u8; rdata.idx];
79 buf.copy_from_slice(&rdata.buf[..rdata.idx]);
80 buf
81 }
82
83 pub fn empty_read_buffer(&mut self) {
89 let mut rdata = self.read.as_ref().lock().unwrap();
90 rdata.pos = 0;
91 rdata.idx = 0;
92 }
93
94 pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
98 self.empty_read_buffer();
99 let mut rdata = self.read.as_ref().lock().unwrap();
100 let max_bytes = cmp::min(rdata.cap, buf.len());
101 rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
102 rdata.idx = max_bytes;
103 max_bytes
104 }
105
106 pub fn write_bytes(&self) -> Vec<u8> {
109 let wdata = self.write.as_ref().lock().unwrap();
110 let mut buf = vec![0u8; wdata.pos];
111 buf.copy_from_slice(&wdata.buf[..wdata.pos]);
112 buf
113 }
114
115 pub fn empty_write_buffer(&mut self) {
118 let mut wdata = self.write.as_ref().lock().unwrap();
119 wdata.pos = 0;
120 }
121
122 pub fn copy_write_buffer_to_read_buffer(&mut self) {
125 let buf = {
127 let wdata = self.write.as_ref().lock().unwrap();
128 let b = &wdata.buf[..wdata.pos];
129 let mut b_ret = vec![0; b.len()];
130 b_ret.copy_from_slice(b);
131 b_ret
132 };
133
134 let bytes_copied = self.set_readable_bytes(&buf);
135 assert_eq!(bytes_copied, buf.len());
136
137 self.empty_write_buffer();
138 }
139}
140
141impl TIoChannel for TBufferChannel {
142 fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
143 where
144 Self: Sized,
145 {
146 Ok((
147 ReadHalf {
148 handle: TBufferChannel {
149 read: self.read.clone(),
150 write: self.write.clone(),
151 },
152 },
153 WriteHalf {
154 handle: TBufferChannel {
155 read: self.read.clone(),
156 write: self.write,
159 },
160 },
161 ))
162 }
163}
164
165impl io::Read for TBufferChannel {
166 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
167 let mut rdata = self.read.as_ref().lock().unwrap();
168 let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
169 buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
170 rdata.pos += nread;
171 Ok(nread)
172 }
173}
174
175impl io::Write for TBufferChannel {
176 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
177 let mut wdata = self.write.as_ref().lock().unwrap();
178 let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
179 let (start, end) = (wdata.pos, wdata.pos + nwrite);
180 wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
181 wdata.pos += nwrite;
182 Ok(nwrite)
183 }
184
185 fn flush(&mut self) -> io::Result<()> {
186 Ok(()) }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::io::{Read, Write};
193
194 use super::TBufferChannel;
195
196 #[test]
197 fn must_empty_write_buffer() {
198 let mut t = TBufferChannel::with_capacity(0, 1);
199
200 let bytes_to_write: [u8; 1] = [0x01];
201 let result = t.write(&bytes_to_write);
202 assert_eq!(result.unwrap(), 1);
203 assert_eq!(&t.write_bytes(), &bytes_to_write);
204
205 t.empty_write_buffer();
206 assert_eq!(t.write_bytes().len(), 0);
207 }
208
209 #[test]
210 fn must_accept_writes_after_buffer_emptied() {
211 let mut t = TBufferChannel::with_capacity(0, 2);
212
213 let bytes_to_write: [u8; 2] = [0x01, 0x02];
214
215 let result = t.write(&bytes_to_write);
217 assert_eq!(result.unwrap(), 2);
218 assert_eq!(&t.write_bytes(), &bytes_to_write);
219
220 let result = t.write(&bytes_to_write);
222 assert_eq!(result.unwrap(), 0);
223 assert_eq!(&t.write_bytes(), &bytes_to_write); t.empty_write_buffer();
227 assert_eq!(t.write_bytes().len(), 0);
228
229 let result = t.write(&bytes_to_write);
231 assert_eq!(result.unwrap(), 2);
232 assert_eq!(&t.write_bytes(), &bytes_to_write);
233 }
234
235 #[test]
236 fn must_accept_multiple_writes_until_buffer_is_full() {
237 let mut t = TBufferChannel::with_capacity(0, 10);
238
239 let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
241 let write_0_result = t.write(&bytes_to_write_0);
242 assert_eq!(write_0_result.unwrap(), 2);
243 assert_eq!(t.write_bytes(), &bytes_to_write_0);
244
245 let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
247 let write_1_result = t.write(&bytes_to_write_1);
248 assert_eq!(write_1_result.unwrap(), 7);
249 assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
250
251 let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
253 let write_2_result = t.write(&bytes_to_write_2);
254 assert_eq!(write_2_result.unwrap(), 1);
255 assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
259 let write_3_result = t.write(&bytes_to_write_3);
260 assert_eq!(write_3_result.unwrap(), 0);
261
262 let mut expected: Vec<u8> = Vec::with_capacity(10);
264 expected.extend_from_slice(&bytes_to_write_0);
265 expected.extend_from_slice(&bytes_to_write_1);
266 expected.extend_from_slice(&bytes_to_write_2[0..1]);
267 assert_eq!(t.write_bytes(), &expected[..]);
268 }
269
270 #[test]
271 fn must_empty_read_buffer() {
272 let mut t = TBufferChannel::with_capacity(1, 0);
273
274 let bytes_to_read: [u8; 1] = [0x01];
275 let result = t.set_readable_bytes(&bytes_to_read);
276 assert_eq!(result, 1);
277 assert_eq!(t.read_bytes(), &bytes_to_read);
278
279 t.empty_read_buffer();
280 assert_eq!(t.read_bytes().len(), 0);
281 }
282
283 #[test]
284 fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
285 let mut t = TBufferChannel::with_capacity(1, 0);
286
287 let bytes_to_read_0: [u8; 1] = [0x01];
288 let result = t.set_readable_bytes(&bytes_to_read_0);
289 assert_eq!(result, 1);
290 assert_eq!(t.read_bytes(), &bytes_to_read_0);
291
292 t.empty_read_buffer();
293 assert_eq!(t.read_bytes().len(), 0);
294
295 let bytes_to_read_1: [u8; 1] = [0x02];
296 let result = t.set_readable_bytes(&bytes_to_read_1);
297 assert_eq!(result, 1);
298 assert_eq!(t.read_bytes(), &bytes_to_read_1);
299 }
300
301 #[test]
302 fn must_accept_multiple_reads_until_all_bytes_read() {
303 let mut t = TBufferChannel::with_capacity(10, 0);
304
305 let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
306
307 let result = t.set_readable_bytes(&readable_bytes);
309 assert_eq!(result, 10);
310 assert_eq!(t.read_bytes(), &readable_bytes);
311
312 let mut read_buf_0 = vec![0; 5];
314 let read_result = t.read(&mut read_buf_0);
315 assert_eq!(read_result.unwrap(), 5);
316 assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
317
318 let mut read_buf_1 = vec![0; 4];
320 let read_result = t.read(&mut read_buf_1);
321 assert_eq!(read_result.unwrap(), 4);
322 assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
323
324 let mut read_buf_2 = vec![0; 3];
326 let read_result = t.read(&mut read_buf_2);
327 assert_eq!(read_result.unwrap(), 1);
328 read_buf_2.truncate(1); assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
330
331 let mut read_buf_3 = vec![0; 10];
333 let read_result = t.read(&mut read_buf_3);
334 assert_eq!(read_result.unwrap(), 0);
335 read_buf_3.truncate(0);
336
337 let mut bytes_read = Vec::with_capacity(10);
339 bytes_read.extend_from_slice(&read_buf_0);
340 bytes_read.extend_from_slice(&read_buf_1);
341 bytes_read.extend_from_slice(&read_buf_2);
342 bytes_read.extend_from_slice(&read_buf_3);
343 assert_eq!(&bytes_read, &readable_bytes);
344 }
345
346 #[test]
347 fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
348 let mut t = TBufferChannel::with_capacity(3, 0);
349
350 let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
351
352 let result = t.set_readable_bytes(&readable_bytes_0);
354 assert_eq!(result, 3);
355 assert_eq!(t.read_bytes(), &readable_bytes_0);
356
357 let mut read_buf = vec![0; 4];
358
359 let read_result = t.read(&mut read_buf);
361 assert_eq!(read_result.unwrap(), 3);
362 assert_eq!(t.read_bytes(), &read_buf[0..3]);
363
364 let read_result = t.read(&mut read_buf);
366 assert_eq!(read_result.unwrap(), 0);
367
368 let mut expected_bytes = Vec::with_capacity(4);
370 expected_bytes.extend_from_slice(&readable_bytes_0);
371 expected_bytes.push(0x00);
372 assert_eq!(&read_buf, &expected_bytes);
373
374 let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
376
377 let result = t.set_readable_bytes(&readable_bytes_1);
379 assert_eq!(result, 2);
380 assert_eq!(t.read_bytes(), &readable_bytes_1);
381
382 let read_result = t.read(&mut read_buf);
384 assert_eq!(read_result.unwrap(), 2);
385 assert_eq!(t.read_bytes(), &read_buf[0..2]);
386 }
387}