timely_container/lib.rs
1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5use std::collections::VecDeque;
6
7/// A container transferring data through dataflow edges
8///
9/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
11///
12/// A container must implement default. The default implementation is not required to allocate
13/// memory for variable-length components.
14///
15/// We require the container to be cloneable to enable efficient copies when providing references
16/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17/// is efficient (which is not necessarily the case when deriving `Clone`.)
18pub trait Container: Default {
19 /// The type of elements when reading non-destructively from the container.
20 type ItemRef<'a> where Self: 'a;
21
22 /// The type of elements when draining the container.
23 type Item<'a> where Self: 'a;
24
25 /// Push `item` into self
26 #[inline]
27 fn push<T>(&mut self, item: T) where Self: PushInto<T> {
28 self.push_into(item)
29 }
30
31 /// The number of elements in this container
32 ///
33 /// This number is used in progress tracking to confirm the receipt of some number
34 /// of outstanding records, and it is highly load bearing. The main restriction is
35 /// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
36 /// must preserve the number of items.
37 fn len(&self) -> usize;
38
39 /// Determine if the container contains any elements, corresponding to `len() == 0`.
40 fn is_empty(&self) -> bool {
41 self.len() == 0
42 }
43
44 /// Remove all contents from `self` while retaining allocated memory.
45 /// After calling `clear`, `is_empty` must return `true` and `len` 0.
46 fn clear(&mut self);
47
48 /// Iterator type when reading from the container.
49 type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
50
51 /// Returns an iterator that reads the contents of this container.
52 fn iter(&self) -> Self::Iter<'_>;
53
54 /// Iterator type when draining the container.
55 type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
56
57 /// Returns an iterator that drains the contents of this container.
58 /// Drain leaves the container in an undefined state.
59 fn drain(&mut self) -> Self::DrainIter<'_>;
60}
61
62/// A container that can be sized and reveals its capacity.
63pub trait SizableContainer: Container {
64 /// Indicates that the container is "full" and should be shipped.
65 fn at_capacity(&self) -> bool;
66 /// Restores `self` to its desired capacity, if it has one.
67 ///
68 /// The `stash` argument is available, and may have the intended capacity.
69 /// However, it may be non-empty, and may be of the wrong capacity. The
70 /// method should guard against these cases.
71 fn ensure_capacity(&mut self, stash: &mut Option<Self>);
72}
73
74/// A container that can absorb items of a specific type.
75pub trait PushInto<T> {
76 /// Push item into self.
77 fn push_into(&mut self, item: T);
78}
79
80/// A type that can build containers from items.
81///
82/// An implementation needs to absorb elements, and later reveal equivalent information
83/// chunked into individual containers, but is free to change the data representation to
84/// better fit the properties of the container.
85///
86/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
87/// that users can push the expected item types.
88///
89/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
90/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
91/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
92/// to drain pending or finished data.
93///
94/// The caller should consume the containers returned by [`Self::extract`] and
95/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
96/// any remaining elements.
97///
98/// For example, a consolidating builder can aggregate differences in-place, but it has
99/// to ensure that it preserves the intended information.
100///
101/// The trait does not prescribe any specific ordering guarantees, and each implementation can
102/// decide to represent a push order for `extract` and `finish`, or not.
103pub trait ContainerBuilder: Default + 'static {
104 /// The container type we're building.
105 type Container: Container + Clone + 'static;
106 /// Extract assembled containers, potentially leaving unfinished data behind. Can
107 /// be called repeatedly, for example while the caller can send data.
108 ///
109 /// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
110 #[must_use]
111 fn extract(&mut self) -> Option<&mut Self::Container>;
112 /// Extract assembled containers and any unfinished data. Should
113 /// be called repeatedly until it returns `None`.
114 #[must_use]
115 fn finish(&mut self) -> Option<&mut Self::Container>;
116 /// Partitions `container` among `builders`, using the function `index` to direct items.
117 fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
118 where
119 Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
120 I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
121 {
122 for datum in container.drain() {
123 let index = index(&datum);
124 builders[index].push_into(datum);
125 }
126 container.clear();
127 }
128}
129
130/// A wrapper trait indicating that the container building will preserve the number of records.
131///
132/// Specifically, the sum of lengths of all extracted and finished containers must equal the
133/// number of times that `push_into` is called on the container builder.
134/// If you have any questions about this trait you are best off not implementing it.
135pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
136
137/// A default container builder that uses length and preferred capacity to chunk data.
138///
139/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
140/// across [`Self::finish`] to maintain a low memory footprint.
141///
142/// Maintains FIFO order.
143#[derive(Default, Debug)]
144pub struct CapacityContainerBuilder<C>{
145 /// Container that we're writing to.
146 current: C,
147 /// Empty allocation.
148 empty: Option<C>,
149 /// Completed containers pending to be sent.
150 pending: VecDeque<C>,
151}
152
153impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
154 #[inline]
155 fn push_into(&mut self, item: T) {
156 // Ensure capacity
157 self.current.ensure_capacity(&mut self.empty);
158
159 // Push item
160 self.current.push(item);
161
162 // Maybe flush
163 if self.current.at_capacity() {
164 self.pending.push_back(std::mem::take(&mut self.current));
165 }
166 }
167}
168
169impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
170 type Container = C;
171
172 #[inline]
173 fn extract(&mut self) -> Option<&mut C> {
174 if let Some(container) = self.pending.pop_front() {
175 self.empty = Some(container);
176 self.empty.as_mut()
177 } else {
178 None
179 }
180 }
181
182 #[inline]
183 fn finish(&mut self) -> Option<&mut C> {
184 if !self.current.is_empty() {
185 self.pending.push_back(std::mem::take(&mut self.current));
186 }
187 self.empty = self.pending.pop_front();
188 self.empty.as_mut()
189 }
190}
191
192impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
193
194impl<T> Container for Vec<T> {
195 type ItemRef<'a> = &'a T where T: 'a;
196 type Item<'a> = T where T: 'a;
197
198 fn len(&self) -> usize {
199 Vec::len(self)
200 }
201
202 fn is_empty(&self) -> bool {
203 Vec::is_empty(self)
204 }
205
206 fn clear(&mut self) { Vec::clear(self) }
207
208 type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
209
210 fn iter(&self) -> Self::Iter<'_> {
211 self.as_slice().iter()
212 }
213
214 type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
215
216 fn drain(&mut self) -> Self::DrainIter<'_> {
217 self.drain(..)
218 }
219}
220
221impl<T> SizableContainer for Vec<T> {
222 fn at_capacity(&self) -> bool {
223 self.len() == self.capacity()
224 }
225 fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
226 if self.capacity() == 0 {
227 *self = stash.take().unwrap_or_default();
228 self.clear();
229 }
230 let preferred = buffer::default_capacity::<T>();
231 if self.capacity() < preferred {
232 self.reserve(preferred - self.capacity());
233 }
234 }
235}
236
237impl<T> PushInto<T> for Vec<T> {
238 #[inline]
239 fn push_into(&mut self, item: T) {
240 self.push(item)
241 }
242}
243
244
245impl<T: Clone> PushInto<&T> for Vec<T> {
246 #[inline]
247 fn push_into(&mut self, item: &T) {
248 self.push(item.clone())
249 }
250}
251
252impl<T: Clone> PushInto<&&T> for Vec<T> {
253 #[inline]
254 fn push_into(&mut self, item: &&T) {
255 self.push_into(*item)
256 }
257}
258
259mod rc {
260 use std::ops::Deref;
261 use std::rc::Rc;
262
263 use crate::Container;
264
265 impl<T: Container> Container for Rc<T> {
266 type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
267 type Item<'a> = T::ItemRef<'a> where Self: 'a;
268
269 fn len(&self) -> usize {
270 std::ops::Deref::deref(self).len()
271 }
272
273 fn is_empty(&self) -> bool {
274 std::ops::Deref::deref(self).is_empty()
275 }
276
277 fn clear(&mut self) {
278 // Try to reuse the allocation if possible
279 if let Some(inner) = Rc::get_mut(self) {
280 inner.clear();
281 } else {
282 *self = Self::default();
283 }
284 }
285
286 type Iter<'a> = T::Iter<'a> where Self: 'a;
287
288 fn iter(&self) -> Self::Iter<'_> {
289 self.deref().iter()
290 }
291
292 type DrainIter<'a> = T::Iter<'a> where Self: 'a;
293
294 fn drain(&mut self) -> Self::DrainIter<'_> {
295 self.iter()
296 }
297 }
298}
299
300mod arc {
301 use std::ops::Deref;
302 use std::sync::Arc;
303
304 use crate::Container;
305
306 impl<T: Container> Container for Arc<T> {
307 type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
308 type Item<'a> = T::ItemRef<'a> where Self: 'a;
309
310 fn len(&self) -> usize {
311 std::ops::Deref::deref(self).len()
312 }
313
314 fn is_empty(&self) -> bool {
315 std::ops::Deref::deref(self).is_empty()
316 }
317
318 fn clear(&mut self) {
319 // Try to reuse the allocation if possible
320 if let Some(inner) = Arc::get_mut(self) {
321 inner.clear();
322 } else {
323 *self = Self::default();
324 }
325 }
326
327 type Iter<'a> = T::Iter<'a> where Self: 'a;
328
329 fn iter(&self) -> Self::Iter<'_> {
330 self.deref().iter()
331 }
332
333 type DrainIter<'a> = T::Iter<'a> where Self: 'a;
334
335 fn drain(&mut self) -> Self::DrainIter<'_> {
336 self.iter()
337 }
338 }
339}
340
341pub mod buffer {
342 //! Functionality related to calculating default buffer sizes
343
344 /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
345 /// this to size in elements.
346 pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
347
348 /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
349 /// and 1, inclusively.
350 pub const fn default_capacity<T>() -> usize {
351 let size = std::mem::size_of::<T>();
352 if size == 0 {
353 BUFFER_SIZE_BYTES
354 } else if size <= BUFFER_SIZE_BYTES {
355 BUFFER_SIZE_BYTES / size
356 } else {
357 1
358 }
359 }
360}