timely_bytes/lib.rs
1//! A simplified implementation of the `bytes` crate, with different features, less safety.
2//!
3//! The crate is currently minimalist rather than maximalist, and for example does not support
4//! methods on `BytesMut` that seem like they should be safe, because they are not yet needed.
5//! For example, `BytesMut` should be able to implement `Send`, and `BytesMut::extract_to` could
6//! return a `BytesMut` rather than a `Bytes`.
7//!
8//! # Examples
9//!
10//! ```
11//! use timely_bytes::arc::BytesMut;
12//!
13//! let bytes = vec![0u8; 1024];
14//! let mut shared1 = BytesMut::from(bytes);
15//! let mut shared2 = shared1.extract_to(100);
16//! let mut shared3 = shared1.extract_to(100);
17//! let mut shared4 = shared2.extract_to(60);
18//!
19//! assert_eq!(shared1.len(), 824);
20//! assert_eq!(shared2.len(), 40);
21//! assert_eq!(shared3.len(), 100);
22//! assert_eq!(shared4.len(), 60);
23//!
24//! for byte in shared1.iter_mut() { *byte = 1u8; }
25//!
26//! // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
27//! shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
28//! shared2.try_merge(shared1.freeze()).ok().expect("Failed to merge 23 and 1");
29//! shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
30//!
31//! assert_eq!(shared4.len(), 1024);
32//! ```
33#![forbid(missing_docs)]
34
35/// An `Arc`-backed mutable byte slice backed by a common allocation.
36pub mod arc {
37
38 use std::ops::{Deref, DerefMut};
39 use std::sync::Arc;
40 use std::any::Any;
41
42 /// A thread-safe byte buffer backed by a shared allocation.
43 ///
44 /// An instance of this type contends that `ptr` is valid for `len` bytes,
45 /// and that no other reference to these bytes exists, other than through
46 /// the type currently held in `sequestered`.
47 pub struct BytesMut {
48 /// Pointer to the start of this slice (not the allocation).
49 ptr: *mut u8,
50 /// Length of this slice.
51 len: usize,
52 /// Shared access to underlying resources.
53 ///
54 /// Importantly, this is unavailable for as long as the struct exists, which may
55 /// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
56 /// enough to make a stronger statement about this.
57 sequestered: Arc<dyn Any>,
58 }
59
60 impl BytesMut {
61
62 /// Create a new instance from a byte allocation.
63 pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+'static {
64
65 // Sequester allocation behind an `Arc`, which *should* keep the address
66 // stable for the lifetime of `sequestered`. The `Arc` also serves as our
67 // source of truth for the allocation, which we use to re-connect slices
68 // of the same allocation.
69 let mut sequestered = Arc::new(bytes) as Arc<dyn Any>;
70 let (ptr, len) =
71 Arc::get_mut(&mut sequestered)
72 .unwrap()
73 .downcast_mut::<B>()
74 .map(|a| (a.as_mut_ptr(), a.len()))
75 .unwrap();
76
77 BytesMut {
78 ptr,
79 len,
80 sequestered,
81 }
82 }
83
84 /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
85 ///
86 /// # Safety
87 ///
88 /// This method first tests `index` against `self.len`, which should ensure that both
89 /// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
90 pub fn extract_to(&mut self, index: usize) -> Bytes {
91
92 assert!(index <= self.len);
93
94 let result = BytesMut {
95 ptr: self.ptr,
96 len: index,
97 sequestered: Arc::clone(&self.sequestered),
98 };
99
100 self.ptr = self.ptr.wrapping_add(index);
101 self.len -= index;
102
103 result.freeze()
104 }
105
106 /// Regenerates the BytesMut if it is uniquely held.
107 ///
108 /// If uniquely held, this method recovers the initial pointer and length
109 /// of the sequestered allocation and re-initializes the BytesMut. The return
110 /// value indicates whether this occurred. A `None` value indicates that the
111 /// downcast to `B` failed and the type is not correct.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use timely_bytes::arc::BytesMut;
117 ///
118 /// let bytes = vec![0u8; 1024];
119 /// let mut shared1 = BytesMut::from(bytes);
120 /// let mut shared2 = shared1.extract_to(100);
121 /// let mut shared3 = shared1.extract_to(100);
122 /// let mut shared4 = shared2.extract_to(60);
123 ///
124 /// drop(shared3);
125 /// drop(shared2);
126 /// drop(shared4);
127 /// assert_eq!(shared1.try_regenerate::<Vec<u8>>(), Some(true));
128 /// assert!(shared1.len() == 1024);
129 /// ```
130 pub fn try_regenerate<B>(&mut self) -> Option<bool> where B: DerefMut<Target=[u8]>+'static {
131 // Only possible if this is the only reference to the sequestered allocation.
132 if let Some(boxed) = Arc::get_mut(&mut self.sequestered) {
133 let downcast = boxed.downcast_mut::<B>()?;
134 self.ptr = downcast.as_mut_ptr();
135 self.len = downcast.len();
136 Some(true)
137 }
138 else {
139 Some(false)
140 }
141 }
142
143 /// Converts a writeable byte slice to a shareable byte slice.
144 #[inline(always)]
145 pub fn freeze(self) -> Bytes {
146 Bytes {
147 ptr: self.ptr,
148 len: self.len,
149 sequestered: self.sequestered,
150 }
151 }
152 }
153
154 impl Deref for BytesMut {
155 type Target = [u8];
156 #[inline(always)]
157 fn deref(&self) -> &[u8] {
158 unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
159 }
160 }
161
162 impl DerefMut for BytesMut {
163 #[inline(always)]
164 fn deref_mut(&mut self) -> &mut [u8] {
165 unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
166 }
167 }
168
169
170 /// A thread-safe shared byte buffer backed by a shared allocation.
171 ///
172 /// An instance of this type contends that `ptr` is valid for `len` bytes,
173 /// and that no other mutable reference to these bytes exists, other than
174 /// through the type currently held in `sequestered`.
175 #[derive(Clone)]
176 pub struct Bytes {
177 /// Pointer to the start of this slice (not the allocation).
178 ptr: *const u8,
179 /// Length of this slice.
180 len: usize,
181 /// Shared access to underlying resources.
182 ///
183 /// Importantly, this is unavailable for as long as the struct exists, which may
184 /// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
185 /// enough to make a stronger statement about this.
186 sequestered: Arc<dyn Any>,
187 }
188
189 // Synchronization happens through `self.sequestered`, which means to ensure that even
190 // across multiple threads the referenced range of bytes remain valid.
191 unsafe impl Send for Bytes { }
192
193 impl Bytes {
194
195 /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
196 ///
197 /// # Safety
198 ///
199 /// This method first tests `index` against `self.len`, which should ensure that both
200 /// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
201 pub fn extract_to(&mut self, index: usize) -> Bytes {
202
203 assert!(index <= self.len);
204
205 let result = Bytes {
206 ptr: self.ptr,
207 len: index,
208 sequestered: Arc::clone(&self.sequestered),
209 };
210
211 self.ptr = self.ptr.wrapping_add(index);
212 self.len -= index;
213
214 result
215 }
216
217 /// Attempts to merge adjacent slices from the same allocation.
218 ///
219 /// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
220 /// If the merge fails self is unmodified and the result is `Err(other)`, returning the
221 /// bytes supplied as input.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// use timely_bytes::arc::BytesMut;
227 ///
228 /// let bytes = vec![0u8; 1024];
229 /// let mut shared1 = BytesMut::from(bytes).freeze();
230 /// let mut shared2 = shared1.extract_to(100);
231 /// let mut shared3 = shared1.extract_to(100);
232 /// let mut shared4 = shared2.extract_to(60);
233 ///
234 /// // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
235 /// shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
236 /// shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
237 /// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
238 /// ```
239 pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
240 if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
241 self.len += other.len;
242 Ok(())
243 }
244 else {
245 Err(other)
246 }
247 }
248 }
249
250 impl Deref for Bytes {
251 type Target = [u8];
252 #[inline(always)]
253 fn deref(&self) -> &[u8] {
254 unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
255 }
256 }
257}