timely_container/
lib.rs

1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5use std::collections::VecDeque;
6
7/// A container transferring data through dataflow edges
8///
9/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
11///
12/// A container must implement default. The default implementation is not required to allocate
13/// memory for variable-length components.
14///
15/// We require the container to be cloneable to enable efficient copies when providing references
16/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17/// is efficient (which is not necessarily the case when deriving `Clone`.)
18pub trait Container: Default {
19    /// The type of elements when reading non-destructively from the container.
20    type ItemRef<'a> where Self: 'a;
21
22    /// The type of elements when draining the container.
23    type Item<'a> where Self: 'a;
24
25    /// Push `item` into self
26    #[inline]
27    fn push<T>(&mut self, item: T) where Self: PushInto<T> {
28        self.push_into(item)
29    }
30
31    /// The number of elements in this container
32    ///
33    /// This number is used in progress tracking to confirm the receipt of some number
34    /// of outstanding records, and it is highly load bearing. The main restriction is
35    /// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
36    /// must preserve the number of items.
37    fn len(&self) -> usize;
38
39    /// Determine if the container contains any elements, corresponding to `len() == 0`.
40    fn is_empty(&self) -> bool {
41        self.len() == 0
42    }
43
44    /// Remove all contents from `self` while retaining allocated memory.
45    /// After calling `clear`, `is_empty` must return `true` and `len` 0.
46    fn clear(&mut self);
47
48    /// Iterator type when reading from the container.
49    type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
50
51    /// Returns an iterator that reads the contents of this container.
52    fn iter(&self) -> Self::Iter<'_>;
53
54    /// Iterator type when draining the container.
55    type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
56
57    /// Returns an iterator that drains the contents of this container.
58    /// Drain leaves the container in an undefined state.
59    fn drain(&mut self) -> Self::DrainIter<'_>;
60}
61
62/// A container that can be sized and reveals its capacity.
63pub trait SizableContainer: Container {
64    /// Indicates that the container is "full" and should be shipped.
65    fn at_capacity(&self) -> bool;
66    /// Restores `self` to its desired capacity, if it has one.
67    ///
68    /// The `stash` argument is available, and may have the intended capacity.
69    /// However, it may be non-empty, and may be of the wrong capacity. The
70    /// method should guard against these cases.
71    fn ensure_capacity(&mut self, stash: &mut Option<Self>);
72}
73
74/// A container that can absorb items of a specific type.
75pub trait PushInto<T> {
76    /// Push item into self.
77    fn push_into(&mut self, item: T);
78}
79
80/// A type that can build containers from items.
81///
82/// An implementation needs to absorb elements, and later reveal equivalent information
83/// chunked into individual containers, but is free to change the data representation to
84/// better fit the properties of the container.
85///
86/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
87/// that users can push the expected item types.
88///
89/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
90/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
91/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
92/// to drain pending or finished data.
93///
94/// The caller should consume the containers returned by [`Self::extract`] and
95/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
96/// any remaining elements.
97///
98/// For example, a consolidating builder can aggregate differences in-place, but it has
99/// to ensure that it preserves the intended information.
100///
101/// The trait does not prescribe any specific ordering guarantees, and each implementation can
102/// decide to represent a push order for `extract` and `finish`, or not.
103pub trait ContainerBuilder: Default + 'static {
104    /// The container type we're building.
105    type Container: Container + Clone + 'static;
106    /// Extract assembled containers, potentially leaving unfinished data behind. Can
107    /// be called repeatedly, for example while the caller can send data.
108    ///
109    /// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
110    #[must_use]
111    fn extract(&mut self) -> Option<&mut Self::Container>;
112    /// Extract assembled containers and any unfinished data. Should
113    /// be called repeatedly until it returns `None`.
114    #[must_use]
115    fn finish(&mut self) -> Option<&mut Self::Container>;
116    /// Partitions `container` among `builders`, using the function `index` to direct items.
117    fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
118    where
119        Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
120        I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
121    {
122        for datum in container.drain() {
123            let index = index(&datum);
124            builders[index].push_into(datum);
125        }
126        container.clear();
127    }
128
129    /// Indicates a good moment to release resources.
130    ///
131    /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]
132    /// before calling this function. The implementation should not change the contents of the
133    /// builder.
134    #[inline]
135    fn relax(&mut self) { }
136}
137
138/// A wrapper trait indicating that the container building will preserve the number of records.
139///
140/// Specifically, the sum of lengths of all extracted and finished containers must equal the
141/// number of times that `push_into` is called on the container builder.
142/// If you have any questions about this trait you are best off not implementing it.
143pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
144
145/// A default container builder that uses length and preferred capacity to chunk data.
146///
147/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
148/// across [`Self::finish`] to maintain a low memory footprint.
149///
150/// Maintains FIFO order.
151#[derive(Default, Debug)]
152pub struct CapacityContainerBuilder<C>{
153    /// Container that we're writing to.
154    current: C,
155    /// Empty allocation.
156    empty: Option<C>,
157    /// Completed containers pending to be sent.
158    pending: VecDeque<C>,
159}
160
161impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
162    #[inline]
163    fn push_into(&mut self, item: T) {
164        // Ensure capacity
165        self.current.ensure_capacity(&mut self.empty);
166
167        // Push item
168        self.current.push(item);
169
170        // Maybe flush
171        if self.current.at_capacity() {
172            self.pending.push_back(std::mem::take(&mut self.current));
173        }
174    }
175}
176
177impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
178    type Container = C;
179
180    #[inline]
181    fn extract(&mut self) -> Option<&mut C> {
182        if let Some(container) = self.pending.pop_front() {
183            self.empty = Some(container);
184            self.empty.as_mut()
185        } else {
186            None
187        }
188    }
189
190    #[inline]
191    fn finish(&mut self) -> Option<&mut C> {
192        if !self.current.is_empty() {
193            self.pending.push_back(std::mem::take(&mut self.current));
194        }
195        self.empty = self.pending.pop_front();
196        self.empty.as_mut()
197    }
198}
199
200impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
201
202impl<T> Container for Vec<T> {
203    type ItemRef<'a> = &'a T where T: 'a;
204    type Item<'a> = T where T: 'a;
205
206    fn len(&self) -> usize {
207        Vec::len(self)
208    }
209
210    fn is_empty(&self) -> bool {
211        Vec::is_empty(self)
212    }
213
214    fn clear(&mut self) { Vec::clear(self) }
215
216    type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
217
218    fn iter(&self) -> Self::Iter<'_> {
219        self.as_slice().iter()
220    }
221
222    type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
223
224    fn drain(&mut self) -> Self::DrainIter<'_> {
225        self.drain(..)
226    }
227}
228
229impl<T> SizableContainer for Vec<T> {
230    fn at_capacity(&self) -> bool {
231        self.len() == self.capacity()
232    }
233    fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
234        if self.capacity() == 0 {
235            *self = stash.take().unwrap_or_default();
236            self.clear();
237        }
238        let preferred = buffer::default_capacity::<T>();
239        if self.capacity() < preferred {
240            self.reserve(preferred - self.capacity());
241        }
242    }
243}
244
245impl<T> PushInto<T> for Vec<T> {
246    #[inline]
247    fn push_into(&mut self, item: T) {
248        self.push(item)
249    }
250}
251
252
253impl<T: Clone> PushInto<&T> for Vec<T> {
254    #[inline]
255    fn push_into(&mut self, item: &T) {
256        self.push(item.clone())
257    }
258}
259
260impl<T: Clone> PushInto<&&T> for Vec<T> {
261    #[inline]
262    fn push_into(&mut self, item: &&T) {
263        self.push_into(*item)
264    }
265}
266
267mod rc {
268    use std::ops::Deref;
269    use std::rc::Rc;
270
271    use crate::Container;
272
273    impl<T: Container> Container for Rc<T> {
274        type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
275        type Item<'a> = T::ItemRef<'a> where Self: 'a;
276
277        fn len(&self) -> usize {
278            std::ops::Deref::deref(self).len()
279        }
280
281        fn is_empty(&self) -> bool {
282            std::ops::Deref::deref(self).is_empty()
283        }
284
285        fn clear(&mut self) {
286            // Try to reuse the allocation if possible
287            if let Some(inner) = Rc::get_mut(self) {
288                inner.clear();
289            } else {
290                *self = Self::default();
291            }
292        }
293
294        type Iter<'a> = T::Iter<'a> where Self: 'a;
295
296        fn iter(&self) -> Self::Iter<'_> {
297            self.deref().iter()
298        }
299
300        type DrainIter<'a> = T::Iter<'a> where Self: 'a;
301
302        fn drain(&mut self) -> Self::DrainIter<'_> {
303            self.iter()
304        }
305    }
306}
307
308mod arc {
309    use std::ops::Deref;
310    use std::sync::Arc;
311
312    use crate::Container;
313
314    impl<T: Container> Container for Arc<T> {
315        type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
316        type Item<'a> = T::ItemRef<'a> where Self: 'a;
317
318        fn len(&self) -> usize {
319            std::ops::Deref::deref(self).len()
320        }
321
322        fn is_empty(&self) -> bool {
323            std::ops::Deref::deref(self).is_empty()
324        }
325
326        fn clear(&mut self) {
327            // Try to reuse the allocation if possible
328            if let Some(inner) = Arc::get_mut(self) {
329                inner.clear();
330            } else {
331                *self = Self::default();
332            }
333        }
334
335        type Iter<'a> = T::Iter<'a> where Self: 'a;
336
337        fn iter(&self) -> Self::Iter<'_> {
338            self.deref().iter()
339        }
340
341        type DrainIter<'a> = T::Iter<'a> where Self: 'a;
342
343        fn drain(&mut self) -> Self::DrainIter<'_> {
344            self.iter()
345        }
346    }
347}
348
349pub mod buffer {
350    //! Functionality related to calculating default buffer sizes
351
352    /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
353    /// this to size in elements.
354    pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
355
356    /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
357    /// and 1, inclusively.
358    pub const fn default_capacity<T>() -> usize {
359        let size = std::mem::size_of::<T>();
360        if size == 0 {
361            BUFFER_SIZE_BYTES
362        } else if size <= BUFFER_SIZE_BYTES {
363            BUFFER_SIZE_BYTES / size
364        } else {
365            1
366        }
367    }
368}