timely_communication/allocator/zero_copy/
bytes_slab.rs

1//! A large binary allocation for writing and sharing.
2
3use std::ops::{Deref, DerefMut};
4use timely_bytes::arc::{Bytes, BytesMut};
5
6/// A large binary allocation for writing and sharing.
7///
8/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after
9/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
10/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
11pub struct BytesSlab {
12    buffer:         BytesMut,                   // current working buffer.
13    in_progress:    Vec<Option<BytesMut>>,      // buffers shared with workers.
14    stash:          Vec<BytesMut>,              // reclaimed and reusable buffers.
15    shift:          usize,                      // current buffer allocation size.
16    valid:          usize,                      // buffer[..valid] are valid bytes.
17    new_bytes:      BytesRefill,                // function to allocate new buffers.
18}
19
20/// Ability to acquire and policy to retain byte buffers.
21#[derive(Clone)]
22pub struct BytesRefill {
23    /// Logic to acquire a new buffer of a certain number of bytes.
24    pub logic: std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>,
25    /// An optional limit on the number of empty buffers retained.
26    pub limit: Option<usize>,
27}
28
29impl BytesSlab {
30    /// Allocates a new `BytesSlab` with an initial size determined by a shift.
31    pub fn new(shift: usize, new_bytes: BytesRefill) -> Self {
32        BytesSlab {
33            buffer: BytesMut::from(BoxDerefMut { boxed: (new_bytes.logic)(1 << shift) }),
34            in_progress: Vec::new(),
35            stash: Vec::new(),
36            shift,
37            valid: 0,
38            new_bytes,
39        }
40    }
41    /// The empty region of the slab.
42    pub fn empty(&mut self) -> &mut [u8] {
43        &mut self.buffer[self.valid..]
44    }
45    /// The valid region of the slab.
46    pub fn valid(&mut self) -> &mut [u8] {
47        &mut self.buffer[..self.valid]
48    }
49    /// Marks the next `bytes` bytes as valid.
50    pub fn make_valid(&mut self, bytes: usize) {
51        self.valid += bytes;
52    }
53    /// Extracts the first `bytes` valid bytes.
54    pub fn extract(&mut self, bytes: usize) -> Bytes {
55        debug_assert!(bytes <= self.valid);
56        self.valid -= bytes;
57        self.buffer.extract_to(bytes)
58    }
59
60    /// Ensures that `self.empty().len()` is at least `capacity`.
61    ///
62    /// This method may retire the current buffer if it does not have enough space, in which case
63    /// it will copy any remaining contents into a new buffer. If this would not create enough free
64    /// space, the shift is increased until it is sufficient.
65    pub fn ensure_capacity(&mut self, capacity: usize) {
66
67        if self.empty().len() < capacity {
68
69            let mut increased_shift = false;
70
71            // Increase allocation if copy would be insufficient.
72            while self.valid + capacity > (1 << self.shift) {
73                self.shift += 1;
74                self.stash.clear();         // clear wrongly sized buffers.
75                self.in_progress.clear();   // clear wrongly sized buffers.
76                increased_shift = true;
77            }
78
79            // Attempt to reclaim shared slices.
80            if self.stash.is_empty() {
81                for shared in self.in_progress.iter_mut() {
82                    if let Some(mut bytes) = shared.take() {
83                        if bytes.try_regenerate::<BoxDerefMut>() {
84                            // NOTE: Test should be redundant, but better safe...
85                            if bytes.len() == (1 << self.shift) {
86                                self.stash.push(bytes);
87                            }
88                        }
89                        else {
90                            *shared = Some(bytes);
91                        }
92                    }
93                }
94                self.in_progress.retain(|x| x.is_some());
95            }
96
97            let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes.logic)(1 << self.shift) }));
98            let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
99
100            if let Some(limit) = self.new_bytes.limit {
101                self.stash.truncate(limit);
102            }
103
104            self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
105            if !increased_shift {
106                self.in_progress.push(Some(old_buffer));
107            }
108        }
109    }
110}
111
112/// A wrapper for `Box<dyn DerefMut<Target=T>>` that dereferences to `T` rather than `dyn DerefMut<Target=T>`.
113struct BoxDerefMut {
114    boxed: Box<dyn DerefMut<Target=[u8]>+'static>,
115}
116
117impl Deref for BoxDerefMut {
118    type Target = [u8];
119    fn deref(&self) -> &Self::Target {
120        &self.boxed[..]
121    }
122}
123
124impl DerefMut for BoxDerefMut {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.boxed[..]
127    }
128}