1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
//! A large binary allocation for writing and sharing.
use timely_bytes::arc::Bytes;
/// A large binary allocation for writing and sharing.
///
/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
pub struct BytesSlab {
buffer: Bytes, // current working buffer.
in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
stash: Vec<Bytes>, // reclaimed and reusable buffers.
shift: usize, // current buffer allocation size.
valid: usize, // buffer[..valid] are valid bytes.
}
impl BytesSlab {
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
pub fn new(shift: usize) -> Self {
BytesSlab {
buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
in_progress: Vec::new(),
stash: Vec::new(),
shift,
valid: 0,
}
}
/// The empty region of the slab.
pub fn empty(&mut self) -> &mut [u8] {
&mut self.buffer[self.valid..]
}
/// The valid region of the slab.
pub fn valid(&mut self) -> &mut [u8] {
&mut self.buffer[..self.valid]
}
/// Marks the next `bytes` bytes as valid.
pub fn make_valid(&mut self, bytes: usize) {
self.valid += bytes;
}
/// Extracts the first `bytes` valid bytes.
pub fn extract(&mut self, bytes: usize) -> Bytes {
debug_assert!(bytes <= self.valid);
self.valid -= bytes;
self.buffer.extract_to(bytes)
}
/// Ensures that `self.empty().len()` is at least `capacity`.
///
/// This method may retire the current buffer if it does not have enough space, in which case
/// it will copy any remaining contents into a new buffer. If this would not create enough free
/// space, the shift is increased until it is sufficient.
pub fn ensure_capacity(&mut self, capacity: usize) {
if self.empty().len() < capacity {
let mut increased_shift = false;
// Increase allocation if copy would be insufficient.
while self.valid + capacity > (1 << self.shift) {
self.shift += 1;
self.stash.clear(); // clear wrongly sized buffers.
self.in_progress.clear(); // clear wrongly sized buffers.
increased_shift = true;
}
// Attempt to reclaim shared slices.
if self.stash.is_empty() {
for shared in self.in_progress.iter_mut() {
if let Some(mut bytes) = shared.take() {
if bytes.try_regenerate::<Box<[u8]>>() {
// NOTE: Test should be redundant, but better safe...
if bytes.len() == (1 << self.shift) {
self.stash.push(bytes);
}
}
else {
*shared = Some(bytes);
}
}
}
self.in_progress.retain(|x| x.is_some());
}
let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
if !increased_shift {
self.in_progress.push(Some(old_buffer));
}
}
}
}