thrift/transport/
mem.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp;
19use std::io;
20use std::sync::{Arc, Mutex};
21
22use super::{ReadHalf, TIoChannel, WriteHalf};
23
24/// In-memory read and write channel with fixed-size read and write buffers.
25///
26/// On a `write` bytes are written to the internal write buffer. Writes are no
27/// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
28/// before subsequent writes are accepted.
29///
30/// You can set readable bytes in the internal read buffer by filling it with
31/// `set_readable_bytes(...)`. Callers can then read until the buffer is
32/// depleted. No further reads are accepted until the internal read buffer is
33/// replenished again.
34#[derive(Clone, Debug)]
35pub struct TBufferChannel {
36    read: Arc<Mutex<ReadData>>,
37    write: Arc<Mutex<WriteData>>,
38}
39
40#[derive(Debug)]
41struct ReadData {
42    buf: Box<[u8]>,
43    pos: usize,
44    idx: usize,
45    cap: usize,
46}
47
48#[derive(Debug)]
49struct WriteData {
50    buf: Box<[u8]>,
51    pos: usize,
52    cap: usize,
53}
54
55impl TBufferChannel {
56    /// Constructs a new, empty `TBufferChannel` with the given
57    /// read buffer capacity and write buffer capacity.
58    pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
59        TBufferChannel {
60            read: Arc::new(Mutex::new(ReadData {
61                buf: vec![0; read_capacity].into_boxed_slice(),
62                idx: 0,
63                pos: 0,
64                cap: read_capacity,
65            })),
66            write: Arc::new(Mutex::new(WriteData {
67                buf: vec![0; write_capacity].into_boxed_slice(),
68                pos: 0,
69                cap: write_capacity,
70            })),
71        }
72    }
73
74    /// Return a copy of the bytes held by the internal read buffer.
75    /// Returns an empty vector if no readable bytes are present.
76    pub fn read_bytes(&self) -> Vec<u8> {
77        let rdata = self.read.as_ref().lock().unwrap();
78        let mut buf = vec![0u8; rdata.idx];
79        buf.copy_from_slice(&rdata.buf[..rdata.idx]);
80        buf
81    }
82
83    // FIXME: do I really need this API call?
84    // FIXME: should this simply reset to the last set of readable bytes?
85    /// Reset the number of readable bytes to zero.
86    ///
87    /// Subsequent calls to `read` will return nothing.
88    pub fn empty_read_buffer(&mut self) {
89        let mut rdata = self.read.as_ref().lock().unwrap();
90        rdata.pos = 0;
91        rdata.idx = 0;
92    }
93
94    /// Copy bytes from the source buffer `buf` into the internal read buffer,
95    /// overwriting any existing bytes. Returns the number of bytes copied,
96    /// which is `min(buf.len(), internal_read_buf.len())`.
97    pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
98        self.empty_read_buffer();
99        let mut rdata = self.read.as_ref().lock().unwrap();
100        let max_bytes = cmp::min(rdata.cap, buf.len());
101        rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
102        rdata.idx = max_bytes;
103        max_bytes
104    }
105
106    /// Return a copy of the bytes held by the internal write buffer.
107    /// Returns an empty vector if no bytes were written.
108    pub fn write_bytes(&self) -> Vec<u8> {
109        let wdata = self.write.as_ref().lock().unwrap();
110        let mut buf = vec![0u8; wdata.pos];
111        buf.copy_from_slice(&wdata.buf[..wdata.pos]);
112        buf
113    }
114
115    /// Resets the internal write buffer, making it seem like no bytes were
116    /// written. Calling `write_buffer` after this returns an empty vector.
117    pub fn empty_write_buffer(&mut self) {
118        let mut wdata = self.write.as_ref().lock().unwrap();
119        wdata.pos = 0;
120    }
121
122    /// Overwrites the contents of the read buffer with the contents of the
123    /// write buffer. The write buffer is emptied after this operation.
124    pub fn copy_write_buffer_to_read_buffer(&mut self) {
125        // FIXME: redo this entire method
126        let buf = {
127            let wdata = self.write.as_ref().lock().unwrap();
128            let b = &wdata.buf[..wdata.pos];
129            let mut b_ret = vec![0; b.len()];
130            b_ret.copy_from_slice(b);
131            b_ret
132        };
133
134        let bytes_copied = self.set_readable_bytes(&buf);
135        assert_eq!(bytes_copied, buf.len());
136
137        self.empty_write_buffer();
138    }
139}
140
141impl TIoChannel for TBufferChannel {
142    fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
143    where
144        Self: Sized,
145    {
146        Ok((
147            ReadHalf {
148                handle: TBufferChannel {
149                    read: self.read.clone(),
150                    write: self.write.clone(),
151                },
152            },
153            WriteHalf {
154                handle: TBufferChannel {
155                    read: self.read.clone(),
156                    // NOTE: not cloning here, since this is the last statement
157                    // in this method and `write` can take ownership of `self.write`
158                    write: self.write,
159                },
160            },
161        ))
162    }
163}
164
165impl io::Read for TBufferChannel {
166    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
167        let mut rdata = self.read.as_ref().lock().unwrap();
168        let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
169        buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
170        rdata.pos += nread;
171        Ok(nread)
172    }
173}
174
175impl io::Write for TBufferChannel {
176    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
177        let mut wdata = self.write.as_ref().lock().unwrap();
178        let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
179        let (start, end) = (wdata.pos, wdata.pos + nwrite);
180        wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
181        wdata.pos += nwrite;
182        Ok(nwrite)
183    }
184
185    fn flush(&mut self) -> io::Result<()> {
186        Ok(()) // nothing to do on flush
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use std::io::{Read, Write};
193
194    use super::TBufferChannel;
195
196    #[test]
197    fn must_empty_write_buffer() {
198        let mut t = TBufferChannel::with_capacity(0, 1);
199
200        let bytes_to_write: [u8; 1] = [0x01];
201        let result = t.write(&bytes_to_write);
202        assert_eq!(result.unwrap(), 1);
203        assert_eq!(&t.write_bytes(), &bytes_to_write);
204
205        t.empty_write_buffer();
206        assert_eq!(t.write_bytes().len(), 0);
207    }
208
209    #[test]
210    fn must_accept_writes_after_buffer_emptied() {
211        let mut t = TBufferChannel::with_capacity(0, 2);
212
213        let bytes_to_write: [u8; 2] = [0x01, 0x02];
214
215        // first write (all bytes written)
216        let result = t.write(&bytes_to_write);
217        assert_eq!(result.unwrap(), 2);
218        assert_eq!(&t.write_bytes(), &bytes_to_write);
219
220        // try write again (nothing should be written)
221        let result = t.write(&bytes_to_write);
222        assert_eq!(result.unwrap(), 0);
223        assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
224
225        // now reset the buffer
226        t.empty_write_buffer();
227        assert_eq!(t.write_bytes().len(), 0);
228
229        // now try write again - the write should succeed
230        let result = t.write(&bytes_to_write);
231        assert_eq!(result.unwrap(), 2);
232        assert_eq!(&t.write_bytes(), &bytes_to_write);
233    }
234
235    #[test]
236    fn must_accept_multiple_writes_until_buffer_is_full() {
237        let mut t = TBufferChannel::with_capacity(0, 10);
238
239        // first write (all bytes written)
240        let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
241        let write_0_result = t.write(&bytes_to_write_0);
242        assert_eq!(write_0_result.unwrap(), 2);
243        assert_eq!(t.write_bytes(), &bytes_to_write_0);
244
245        // second write (all bytes written, starting at index 2)
246        let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
247        let write_1_result = t.write(&bytes_to_write_1);
248        assert_eq!(write_1_result.unwrap(), 7);
249        assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
250
251        // third write (only 1 byte written - that's all we have space for)
252        let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
253        let write_2_result = t.write(&bytes_to_write_2);
254        assert_eq!(write_2_result.unwrap(), 1);
255        assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
256
257        // fourth write (no writes are accepted)
258        let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
259        let write_3_result = t.write(&bytes_to_write_3);
260        assert_eq!(write_3_result.unwrap(), 0);
261
262        // check the full write buffer
263        let mut expected: Vec<u8> = Vec::with_capacity(10);
264        expected.extend_from_slice(&bytes_to_write_0);
265        expected.extend_from_slice(&bytes_to_write_1);
266        expected.extend_from_slice(&bytes_to_write_2[0..1]);
267        assert_eq!(t.write_bytes(), &expected[..]);
268    }
269
270    #[test]
271    fn must_empty_read_buffer() {
272        let mut t = TBufferChannel::with_capacity(1, 0);
273
274        let bytes_to_read: [u8; 1] = [0x01];
275        let result = t.set_readable_bytes(&bytes_to_read);
276        assert_eq!(result, 1);
277        assert_eq!(t.read_bytes(), &bytes_to_read);
278
279        t.empty_read_buffer();
280        assert_eq!(t.read_bytes().len(), 0);
281    }
282
283    #[test]
284    fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
285        let mut t = TBufferChannel::with_capacity(1, 0);
286
287        let bytes_to_read_0: [u8; 1] = [0x01];
288        let result = t.set_readable_bytes(&bytes_to_read_0);
289        assert_eq!(result, 1);
290        assert_eq!(t.read_bytes(), &bytes_to_read_0);
291
292        t.empty_read_buffer();
293        assert_eq!(t.read_bytes().len(), 0);
294
295        let bytes_to_read_1: [u8; 1] = [0x02];
296        let result = t.set_readable_bytes(&bytes_to_read_1);
297        assert_eq!(result, 1);
298        assert_eq!(t.read_bytes(), &bytes_to_read_1);
299    }
300
301    #[test]
302    fn must_accept_multiple_reads_until_all_bytes_read() {
303        let mut t = TBufferChannel::with_capacity(10, 0);
304
305        let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
306
307        // check that we're able to set the bytes to be read
308        let result = t.set_readable_bytes(&readable_bytes);
309        assert_eq!(result, 10);
310        assert_eq!(t.read_bytes(), &readable_bytes);
311
312        // first read
313        let mut read_buf_0 = vec![0; 5];
314        let read_result = t.read(&mut read_buf_0);
315        assert_eq!(read_result.unwrap(), 5);
316        assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
317
318        // second read
319        let mut read_buf_1 = vec![0; 4];
320        let read_result = t.read(&mut read_buf_1);
321        assert_eq!(read_result.unwrap(), 4);
322        assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
323
324        // third read (only 1 byte remains to be read)
325        let mut read_buf_2 = vec![0; 3];
326        let read_result = t.read(&mut read_buf_2);
327        assert_eq!(read_result.unwrap(), 1);
328        read_buf_2.truncate(1); // FIXME: does the caller have to do this?
329        assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
330
331        // fourth read (nothing should be readable)
332        let mut read_buf_3 = vec![0; 10];
333        let read_result = t.read(&mut read_buf_3);
334        assert_eq!(read_result.unwrap(), 0);
335        read_buf_3.truncate(0);
336
337        // check that all the bytes we received match the original (again!)
338        let mut bytes_read = Vec::with_capacity(10);
339        bytes_read.extend_from_slice(&read_buf_0);
340        bytes_read.extend_from_slice(&read_buf_1);
341        bytes_read.extend_from_slice(&read_buf_2);
342        bytes_read.extend_from_slice(&read_buf_3);
343        assert_eq!(&bytes_read, &readable_bytes);
344    }
345
346    #[test]
347    fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
348        let mut t = TBufferChannel::with_capacity(3, 0);
349
350        let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
351
352        // check that we're able to set the bytes to be read
353        let result = t.set_readable_bytes(&readable_bytes_0);
354        assert_eq!(result, 3);
355        assert_eq!(t.read_bytes(), &readable_bytes_0);
356
357        let mut read_buf = vec![0; 4];
358
359        // drain the read buffer
360        let read_result = t.read(&mut read_buf);
361        assert_eq!(read_result.unwrap(), 3);
362        assert_eq!(t.read_bytes(), &read_buf[0..3]);
363
364        // check that a subsequent read fails
365        let read_result = t.read(&mut read_buf);
366        assert_eq!(read_result.unwrap(), 0);
367
368        // we don't modify the read buffer on failure
369        let mut expected_bytes = Vec::with_capacity(4);
370        expected_bytes.extend_from_slice(&readable_bytes_0);
371        expected_bytes.push(0x00);
372        assert_eq!(&read_buf, &expected_bytes);
373
374        // replenish the read buffer again
375        let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
376
377        // check that we're able to set the bytes to be read
378        let result = t.set_readable_bytes(&readable_bytes_1);
379        assert_eq!(result, 2);
380        assert_eq!(t.read_bytes(), &readable_bytes_1);
381
382        // read again
383        let read_result = t.read(&mut read_buf);
384        assert_eq!(read_result.unwrap(), 2);
385        assert_eq!(t.read_bytes(), &read_buf[0..2]);
386    }
387}