Skip to main content

timely_bytes/
lib.rs

1//! A simplified implementation of the `bytes` crate, with different features, less safety.
2//!
3//! The crate is currently minimalist rather than maximalist, and for example does not support
4//! methods on `BytesMut` that seem like they should be safe, because they are not yet needed.
5//! For example, `BytesMut` should be able to implement `Send`, and `BytesMut::extract_to` could
6//! return a `BytesMut` rather than a `Bytes`.
7//!
8//! # Examples
9//!
10//! ```
11//! use timely_bytes::arc::BytesMut;
12//!
13//! let bytes = vec![0u8; 1024];
14//! let mut shared1 = BytesMut::from(bytes);
15//! let mut shared2 = shared1.extract_to(100);
16//! let mut shared3 = shared1.extract_to(100);
17//! let mut shared4 = shared2.extract_to(60);
18//!
19//! assert_eq!(shared1.len(), 824);
20//! assert_eq!(shared2.len(), 40);
21//! assert_eq!(shared3.len(), 100);
22//! assert_eq!(shared4.len(), 60);
23//!
24//! for byte in shared1.iter_mut() { *byte = 1u8; }
25//!
26//! // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
27//! shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
28//! shared2.try_merge(shared1.freeze()).ok().expect("Failed to merge 23 and 1");
29//! shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
30//!
31//! assert_eq!(shared4.len(), 1024);
32//! ```
33#![forbid(missing_docs)]
34
35/// An `Arc`-backed mutable byte slice backed by a common allocation.
36pub mod arc {
37
38    use std::ops::{Deref, DerefMut};
39    use std::sync::Arc;
40    use std::any::Any;
41
42    /// A thread-safe byte buffer backed by a shared allocation.
43    ///
44    /// An instance of this type contends that `ptr` is valid for `len` bytes,
45    /// and that no other reference to these bytes exists, other than through
46    /// the type currently held in `sequestered`.
47    pub struct BytesMut {
48        /// Pointer to the start of this slice (not the allocation).
49        ptr: *mut u8,
50        /// Length of this slice.
51        len: usize,
52        /// Shared access to underlying resources.
53        ///
54        /// Importantly, this is unavailable for as long as the struct exists, which may
55        /// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
56        /// enough to make a stronger statement about this.
57        sequestered: Arc<dyn Any>,
58    }
59
60    impl BytesMut {
61
62        /// Create a new instance from a byte allocation.
63        pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+Send+'static {
64
65            // Sequester allocation behind an `Arc`, which *should* keep the address
66            // stable for the lifetime of `sequestered`. The `Arc` also serves as our
67            // source of truth for the allocation, which we use to re-connect slices
68            // of the same allocation.
69            let mut sequestered = Arc::new(bytes) as Arc<dyn Any>;
70            let (ptr, len) =
71            Arc::get_mut(&mut sequestered)
72                .unwrap()
73                .downcast_mut::<B>()
74                .map(|a| (a.as_mut_ptr(), a.len()))
75                .unwrap();
76
77            BytesMut {
78                ptr,
79                len,
80                sequestered,
81            }
82        }
83
84        /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
85        ///
86        /// # Safety
87        ///
88        /// This method first tests `index` against `self.len`, which should ensure that both
89        /// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
90        pub fn extract_to(&mut self, index: usize) -> Bytes {
91
92            assert!(index <= self.len);
93
94            let result = BytesMut {
95                ptr: self.ptr,
96                len: index,
97                sequestered: Arc::clone(&self.sequestered),
98            };
99
100            self.ptr = self.ptr.wrapping_add(index);
101            self.len -= index;
102
103            result.freeze()
104        }
105
106        /// Regenerates the BytesMut if it is uniquely held.
107        ///
108        /// If uniquely held, this method recovers the initial pointer and length
109        /// of the sequestered allocation and re-initializes the BytesMut. The return
110        /// value indicates whether this occurred. A `None` value indicates that the
111        /// downcast to `B` failed and the type is not correct.
112        ///
113        /// # Examples
114        ///
115        /// ```
116        /// use timely_bytes::arc::BytesMut;
117        ///
118        /// let bytes = vec![0u8; 1024];
119        /// let mut shared1 = BytesMut::from(bytes);
120        /// let mut shared2 = shared1.extract_to(100);
121        /// let mut shared3 = shared1.extract_to(100);
122        /// let mut shared4 = shared2.extract_to(60);
123        ///
124        /// drop(shared3);
125        /// drop(shared2);
126        /// drop(shared4);
127        /// assert_eq!(shared1.try_regenerate::<Vec<u8>>(), Some(true));
128        /// assert!(shared1.len() == 1024);
129        /// ```
130        pub fn try_regenerate<B>(&mut self) -> Option<bool> where B: DerefMut<Target=[u8]>+'static {
131            // Only possible if this is the only reference to the sequestered allocation.
132            if let Some(boxed) = Arc::get_mut(&mut self.sequestered) {
133                let downcast = boxed.downcast_mut::<B>()?;
134                self.ptr = downcast.as_mut_ptr();
135                self.len = downcast.len();
136                Some(true)
137            }
138            else {
139                Some(false)
140            }
141        }
142
143        /// Converts a writeable byte slice to a shareable byte slice.
144        #[inline(always)]
145        pub fn freeze(self) -> Bytes {
146            Bytes {
147                ptr: self.ptr,
148                len: self.len,
149                sequestered: self.sequestered,
150            }
151        }
152    }
153
154    impl Deref for BytesMut {
155        type Target = [u8];
156        #[inline(always)]
157        fn deref(&self) -> &[u8] {
158            unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
159        }
160    }
161
162    impl DerefMut for BytesMut {
163        #[inline(always)]
164        fn deref_mut(&mut self) -> &mut [u8] {
165            unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
166        }
167    }
168
169
170    /// A thread-safe shared byte buffer backed by a shared allocation.
171    ///
172    /// An instance of this type contends that `ptr` is valid for `len` bytes,
173    /// and that no other mutable reference to these bytes exists, other than
174    /// through the type currently held in `sequestered`.
175    #[derive(Clone)]
176    pub struct Bytes {
177        /// Pointer to the start of this slice (not the allocation).
178        ptr: *const u8,
179        /// Length of this slice.
180        len: usize,
181        /// Shared access to underlying resources.
182        ///
183        /// Importantly, this is unavailable for as long as the struct exists, which may
184        /// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
185        /// enough to make a stronger statement about this.
186        sequestered: Arc<dyn Any>,
187    }
188
189    // Synchronization happens through `self.sequestered`, which means to ensure that even
190    // across multiple threads the referenced range of bytes remains valid.
191    unsafe impl Send for Bytes { }
192
193    // `Sync` holds because everything reachable through `&Bytes` is read-only or atomic:
194    // `Deref` yields `&[u8]` (and `u8: Sync`), the mutating methods take `&mut self`, and
195    // cloning only touches the atomic `Arc` refcount. There is no interior mutability and
196    // no path to a `&mut` from a shared reference.
197    //
198    // Note this requires only that the sequestered payload `B` be `Send` (enforced by
199    // `BytesMut::from`), not `Sync`: `B` is never exposed by reference, so it is never
200    // shared across threads. The only cross-thread use of `B` is its destructor, which may
201    // run on whichever thread drops the last `Arc` clone -- and that needs `Send`, not `Sync`.
202    unsafe impl Sync for Bytes { }
203
204    impl Bytes {
205
206        /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
207        ///
208        /// # Safety
209        ///
210        /// This method first tests `index` against `self.len`, which should ensure that both
211        /// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
212        pub fn extract_to(&mut self, index: usize) -> Bytes {
213
214            assert!(index <= self.len);
215
216            let result = Bytes {
217                ptr: self.ptr,
218                len: index,
219                sequestered: Arc::clone(&self.sequestered),
220            };
221
222            self.ptr = self.ptr.wrapping_add(index);
223            self.len -= index;
224
225            result
226        }
227
228        /// Attempts to merge adjacent slices from the same allocation.
229        ///
230        /// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
231        /// If the merge fails self is unmodified and the result is `Err(other)`, returning the
232        /// bytes supplied as input.
233        ///
234        /// # Examples
235        ///
236        /// ```
237        /// use timely_bytes::arc::BytesMut;
238        ///
239        /// let bytes = vec![0u8; 1024];
240        /// let mut shared1 = BytesMut::from(bytes).freeze();
241        /// let mut shared2 = shared1.extract_to(100);
242        /// let mut shared3 = shared1.extract_to(100);
243        /// let mut shared4 = shared2.extract_to(60);
244        ///
245        /// // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
246        /// shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
247        /// shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
248        /// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
249        /// ```
250        pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
251            if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
252                self.len += other.len;
253                Ok(())
254            }
255            else {
256                Err(other)
257            }
258        }
259    }
260
261    impl Deref for Bytes {
262        type Target = [u8];
263        #[inline(always)]
264        fn deref(&self) -> &[u8] {
265            unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
266        }
267    }
268}