timely_container/
lib.rs

1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5use std::collections::VecDeque;
6
7/// An type containing a number of records accounted for by progress tracking.
8///
9/// The object stores a number of updates and thus is able to describe it count
10/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
11/// update count is zero.
12pub trait Accountable {
13    /// The number of records
14    ///
15    /// This number is used in progress tracking to confirm the receipt of some number
16    /// of outstanding records, and it is highly load bearing. The main restriction is
17    /// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
18    /// must preserve the number of records.
19    fn record_count(&self) -> i64;
20
21    /// Determine if this contains any updates, corresponding to `update_count() == 0`.
22    /// It is a correctness error for this to by anything other than `self.record_count() == 0`.
23    #[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
24}
25
26/// A container that allows iteration morally equivalent to [`IntoIterator`].
27///
28/// Iterating the container presents items in an implementation-specific order.
29/// The container's contents are not changed.
30pub trait IterContainer {
31    /// The type of elements when reading non-destructively from the container.
32    type ItemRef<'a> where Self: 'a;
33    /// Iterator type when reading from the container.
34    type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
35    /// Returns an iterator that reads the contents of this container.
36    fn iter(&self) -> Self::Iter<'_>;
37}
38
39/// A container that can drain itself.
40///
41/// Draining the container presents items in an implementation-specific order.
42/// The container is in an undefined state after calling [`drain`]. Dropping
43/// the iterator also leaves the container in an undefined state.
44pub trait DrainContainer {
45    /// The type of elements when draining the container.
46    type Item<'a> where Self: 'a;
47    /// Iterator type when draining the container.
48    type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
49    /// Returns an iterator that drains the contents of this container.
50    /// Drain leaves the container in an undefined state.
51    fn drain(&mut self) -> Self::DrainIter<'_>;
52}
53
54/// A container that can be sized and reveals its capacity.
55pub trait SizableContainer {
56    /// Indicates that the container is "full" and should be shipped.
57    fn at_capacity(&self) -> bool;
58    /// Restores `self` to its desired capacity, if it has one.
59    ///
60    /// The `stash` argument is available, and may have the intended capacity.
61    /// However, it may be non-empty, and may be of the wrong capacity. The
62    /// method should guard against these cases.
63    ///
64    /// Assume that the `stash` is in an undefined state, and properly clear it
65    /// before re-using it.
66    fn ensure_capacity(&mut self, stash: &mut Option<Self>) where Self: Sized;
67}
68
69/// A container that can absorb items of a specific type.
70pub trait PushInto<T> {
71    /// Push item into self.
72    fn push_into(&mut self, item: T);
73}
74
75/// A type that can build containers from items.
76///
77/// An implementation needs to absorb elements, and later reveal equivalent information
78/// chunked into individual containers, but is free to change the data representation to
79/// better fit the properties of the container.
80///
81/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
82/// that users can push the expected item types.
83///
84/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
85/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
86/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
87/// to drain pending or finished data.
88///
89/// The caller should consume the containers returned by [`Self::extract`] and
90/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
91/// any remaining elements.
92///
93/// Implementations are allowed to re-use the contents of the mutable references left by the caller,
94/// but they should ensure that they clear the contents before doing so.
95///
96/// For example, a consolidating builder can aggregate differences in-place, but it has
97/// to ensure that it preserves the intended information.
98///
99/// The trait does not prescribe any specific ordering guarantees, and each implementation can
100/// decide to represent a push order for `extract` and `finish`, or not.
101pub trait ContainerBuilder: Default + 'static {
102    /// The container type we're building.
103    // The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
104    // all over Timely. `'static` because we don't want lifetimes everywhere.
105    type Container: Accountable + Default + Clone + 'static;
106    /// Extract assembled containers, potentially leaving unfinished data behind. Can
107    /// be called repeatedly, for example while the caller can send data.
108    ///
109    /// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
110    #[must_use]
111    fn extract(&mut self) -> Option<&mut Self::Container>;
112    /// Extract assembled containers and any unfinished data. Should
113    /// be called repeatedly until it returns `None`.
114    #[must_use]
115    fn finish(&mut self) -> Option<&mut Self::Container>;
116    /// Indicates a good moment to release resources.
117    ///
118    /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]
119    /// before calling this function. The implementation should not change the contents of the
120    /// builder.
121    #[inline]
122    fn relax(&mut self) { }
123}
124
125/// A wrapper trait indicating that the container building will preserve the number of records.
126///
127/// Specifically, the sum of record counts of all extracted and finished containers must equal the
128/// number of accounted records that are pushed into the container builder.
129/// If you have any questions about this trait you are best off not implementing it.
130pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
131
132/// A default container builder that uses length and preferred capacity to chunk data.
133///
134/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
135/// across [`Self::finish`] to maintain a low memory footprint.
136///
137/// Maintains FIFO order.
138#[derive(Default, Debug)]
139pub struct CapacityContainerBuilder<C>{
140    /// Container that we're writing to.
141    current: C,
142    /// Empty allocation.
143    empty: Option<C>,
144    /// Completed containers pending to be sent.
145    pending: VecDeque<C>,
146}
147
148impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
149    #[inline]
150    fn push_into(&mut self, item: T) {
151        // Ensure capacity
152        self.current.ensure_capacity(&mut self.empty);
153
154        // Push item
155        self.current.push_into(item);
156
157        // Maybe flush
158        if self.current.at_capacity() {
159            self.pending.push_back(std::mem::take(&mut self.current));
160        }
161    }
162}
163
164impl<C: Accountable + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
165    type Container = C;
166
167    #[inline]
168    fn extract(&mut self) -> Option<&mut C> {
169        if let Some(container) = self.pending.pop_front() {
170            self.empty = Some(container);
171            self.empty.as_mut()
172        } else {
173            None
174        }
175    }
176
177    #[inline]
178    fn finish(&mut self) -> Option<&mut C> {
179        if !self.current.is_empty() {
180            self.pending.push_back(std::mem::take(&mut self.current));
181        }
182        self.empty = self.pending.pop_front();
183        self.empty.as_mut()
184    }
185}
186
187impl<C: Accountable + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
188
189impl<T> Accountable for Vec<T> {
190    #[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
191    #[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
192}
193
194impl<T> IterContainer for Vec<T> {
195    type ItemRef<'a> = &'a T where T: 'a;
196    type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
197    #[inline] fn iter(&self) -> Self::Iter<'_> {
198        self.as_slice().iter()
199    }
200}
201
202impl<T> DrainContainer for Vec<T> {
203    type Item<'a> = T where T: 'a;
204    type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
205    #[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
206        self.drain(..)
207    }
208}
209
210impl<T> SizableContainer for Vec<T> {
211    fn at_capacity(&self) -> bool {
212        self.len() == self.capacity()
213    }
214    fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
215        if self.capacity() == 0 {
216            *self = stash.take().unwrap_or_default();
217            self.clear();
218        }
219        let preferred = buffer::default_capacity::<T>();
220        if self.capacity() < preferred {
221            self.reserve(preferred - self.capacity());
222        }
223    }
224}
225
226impl<T> PushInto<T> for Vec<T> {
227    #[inline]
228    fn push_into(&mut self, item: T) {
229        self.push(item)
230    }
231}
232
233
234impl<T: Clone> PushInto<&T> for Vec<T> {
235    #[inline]
236    fn push_into(&mut self, item: &T) {
237        self.push(item.clone())
238    }
239}
240
241impl<T: Clone> PushInto<&&T> for Vec<T> {
242    #[inline]
243    fn push_into(&mut self, item: &&T) {
244        self.push_into(*item)
245    }
246}
247
248mod rc {
249    use std::ops::Deref;
250    use std::rc::Rc;
251
252    use crate::{IterContainer, DrainContainer};
253
254    impl<T: crate::Accountable> crate::Accountable for Rc<T> {
255        #[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
256        #[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
257    }
258    impl<T: IterContainer> IterContainer for Rc<T> {
259        type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
260        type Iter<'a> = T::Iter<'a> where Self: 'a;
261        #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
262    }
263    impl<T: IterContainer> DrainContainer for Rc<T> {
264        type Item<'a> = T::ItemRef<'a> where Self: 'a;
265        type DrainIter<'a> = T::Iter<'a> where Self: 'a;
266        #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
267    }
268}
269
270mod arc {
271    use std::ops::Deref;
272    use std::sync::Arc;
273
274    use crate::{IterContainer, DrainContainer};
275
276    impl<T: crate::Accountable> crate::Accountable for Arc<T> {
277        #[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
278        #[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
279    }
280    impl<T: IterContainer> IterContainer for Arc<T> {
281        type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
282        type Iter<'a> = T::Iter<'a> where Self: 'a;
283        #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
284    }
285    impl<T: IterContainer> DrainContainer for Arc<T> {
286        type Item<'a> = T::ItemRef<'a> where Self: 'a;
287        type DrainIter<'a> = T::Iter<'a> where Self: 'a;
288        #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
289    }
290}
291
292pub mod buffer {
293    //! Functionality related to calculating default buffer sizes
294
295    /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
296    /// this to size in elements.
297    pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
298
299    /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
300    /// and 1, inclusively.
301    pub const fn default_capacity<T>() -> usize {
302        let size = std::mem::size_of::<T>();
303        if size == 0 {
304            BUFFER_SIZE_BYTES
305        } else if size <= BUFFER_SIZE_BYTES {
306            BUFFER_SIZE_BYTES / size
307        } else {
308            1
309        }
310    }
311}