timely_communication/allocator/zero_copy/bytes_slab.rs
1//! A large binary allocation for writing and sharing.
2
3use std::ops::{Deref, DerefMut};
4use timely_bytes::arc::{Bytes, BytesMut};
5
6/// A large binary allocation for writing and sharing.
7///
8/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after
9/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
10/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
11pub struct BytesSlab {
12 buffer: BytesMut, // current working buffer.
13 in_progress: Vec<Option<BytesMut>>, // buffers shared with workers.
14 stash: Vec<BytesMut>, // reclaimed and reusable buffers.
15 shift: usize, // current buffer allocation size.
16 valid: usize, // buffer[..valid] are valid bytes.
17 new_bytes: BytesRefill, // function to allocate new buffers.
18}
19
20/// Ability to acquire and policy to retain byte buffers.
21#[derive(Clone)]
22pub struct BytesRefill {
23 /// Logic to acquire a new buffer of a certain number of bytes.
24 pub logic: std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>,
25 /// An optional limit on the number of empty buffers retained.
26 pub limit: Option<usize>,
27}
28
29impl BytesSlab {
30 /// Allocates a new `BytesSlab` with an initial size determined by a shift.
31 pub fn new(shift: usize, new_bytes: BytesRefill) -> Self {
32 BytesSlab {
33 buffer: BytesMut::from(BoxDerefMut { boxed: (new_bytes.logic)(1 << shift) }),
34 in_progress: Vec::new(),
35 stash: Vec::new(),
36 shift,
37 valid: 0,
38 new_bytes,
39 }
40 }
41 /// The empty region of the slab.
42 pub fn empty(&mut self) -> &mut [u8] {
43 &mut self.buffer[self.valid..]
44 }
45 /// The valid region of the slab.
46 pub fn valid(&mut self) -> &mut [u8] {
47 &mut self.buffer[..self.valid]
48 }
49 /// Marks the next `bytes` bytes as valid.
50 pub fn make_valid(&mut self, bytes: usize) {
51 self.valid += bytes;
52 }
53 /// Extracts the first `bytes` valid bytes.
54 pub fn extract(&mut self, bytes: usize) -> Bytes {
55 debug_assert!(bytes <= self.valid);
56 self.valid -= bytes;
57 self.buffer.extract_to(bytes)
58 }
59
60 /// Ensures that `self.empty().len()` is at least `capacity`.
61 ///
62 /// This method may retire the current buffer if it does not have enough space, in which case
63 /// it will copy any remaining contents into a new buffer. If this would not create enough free
64 /// space, the shift is increased until it is sufficient.
65 pub fn ensure_capacity(&mut self, capacity: usize) {
66
67 if self.empty().len() < capacity {
68
69 let mut increased_shift = false;
70
71 // Increase allocation if copy would be insufficient.
72 while self.valid + capacity > (1 << self.shift) {
73 self.shift += 1;
74 self.stash.clear(); // clear wrongly sized buffers.
75 self.in_progress.clear(); // clear wrongly sized buffers.
76 increased_shift = true;
77 }
78
79 // Attempt to reclaim shared slices.
80 if self.stash.is_empty() {
81 for shared in self.in_progress.iter_mut() {
82 if let Some(mut bytes) = shared.take() {
83 if bytes.try_regenerate::<BoxDerefMut>() {
84 // NOTE: Test should be redundant, but better safe...
85 if bytes.len() == (1 << self.shift) {
86 self.stash.push(bytes);
87 }
88 }
89 else {
90 *shared = Some(bytes);
91 }
92 }
93 }
94 self.in_progress.retain(|x| x.is_some());
95 }
96
97 let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes.logic)(1 << self.shift) }));
98 let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
99
100 if let Some(limit) = self.new_bytes.limit {
101 self.stash.truncate(limit);
102 }
103
104 self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
105 if !increased_shift {
106 self.in_progress.push(Some(old_buffer));
107 }
108 }
109 }
110}
111
112/// A wrapper for `Box<dyn DerefMut<Target=T>>` that dereferences to `T` rather than `dyn DerefMut<Target=T>`.
113struct BoxDerefMut {
114 boxed: Box<dyn DerefMut<Target=[u8]>+'static>,
115}
116
117impl Deref for BoxDerefMut {
118 type Target = [u8];
119 fn deref(&self) -> &Self::Target {
120 &self.boxed[..]
121 }
122}
123
124impl DerefMut for BoxDerefMut {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.boxed[..]
127 }
128}