timely_container/lib.rs
1//! Specifications for containers
2
3#![forbid(missing_docs)]
4
5use std::collections::VecDeque;
6
7/// An type containing a number of records accounted for by progress tracking.
8///
9/// The object stores a number of updates and thus is able to describe it count
10/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
11/// update count is zero.
12pub trait Accountable {
13 /// The number of records
14 ///
15 /// This number is used in progress tracking to confirm the receipt of some number
16 /// of outstanding records, and it is highly load bearing. The main restriction is
17 /// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
18 /// must preserve the number of records.
19 fn record_count(&self) -> i64;
20
21 /// Determine if this contains any updates, corresponding to `update_count() == 0`.
22 /// It is a correctness error for this to by anything other than `self.record_count() == 0`.
23 #[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
24}
25
26/// A container that allows iteration morally equivalent to [`IntoIterator`].
27///
28/// Iterating the container presents items in an implementation-specific order.
29/// The container's contents are not changed.
30pub trait IterContainer {
31 /// The type of elements when reading non-destructively from the container.
32 type ItemRef<'a> where Self: 'a;
33 /// Iterator type when reading from the container.
34 type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
35 /// Returns an iterator that reads the contents of this container.
36 fn iter(&self) -> Self::Iter<'_>;
37}
38
39/// A container that can drain itself.
40///
41/// Draining the container presents items in an implementation-specific order.
42/// The container is in an undefined state after calling [`drain`]. Dropping
43/// the iterator also leaves the container in an undefined state.
44pub trait DrainContainer {
45 /// The type of elements when draining the container.
46 type Item<'a> where Self: 'a;
47 /// Iterator type when draining the container.
48 type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
49 /// Returns an iterator that drains the contents of this container.
50 /// Drain leaves the container in an undefined state.
51 fn drain(&mut self) -> Self::DrainIter<'_>;
52}
53
54/// A container that can be sized and reveals its capacity.
55pub trait SizableContainer {
56 /// Indicates that the container is "full" and should be shipped.
57 fn at_capacity(&self) -> bool;
58 /// Restores `self` to its desired capacity, if it has one.
59 ///
60 /// The `stash` argument is available, and may have the intended capacity.
61 /// However, it may be non-empty, and may be of the wrong capacity. The
62 /// method should guard against these cases.
63 ///
64 /// Assume that the `stash` is in an undefined state, and properly clear it
65 /// before re-using it.
66 fn ensure_capacity(&mut self, stash: &mut Option<Self>) where Self: Sized;
67}
68
69/// A container that can absorb items of a specific type.
70pub trait PushInto<T> {
71 /// Push item into self.
72 fn push_into(&mut self, item: T);
73}
74
75/// A type that can build containers from items.
76///
77/// An implementation needs to absorb elements, and later reveal equivalent information
78/// chunked into individual containers, but is free to change the data representation to
79/// better fit the properties of the container.
80///
81/// Types implementing this trait should provide appropriate [`PushInto`] implementations such
82/// that users can push the expected item types.
83///
84/// The owner extracts data in two ways. The opportunistic [`Self::extract`] method returns
85/// any ready data, but doesn't need to produce partial outputs. In contrast, [`Self::finish`]
86/// needs to produce all outputs, even partial ones. Caller should repeatedly call the functions
87/// to drain pending or finished data.
88///
89/// The caller should consume the containers returned by [`Self::extract`] and
90/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
91/// any remaining elements.
92///
93/// Implementations are allowed to re-use the contents of the mutable references left by the caller,
94/// but they should ensure that they clear the contents before doing so.
95///
96/// For example, a consolidating builder can aggregate differences in-place, but it has
97/// to ensure that it preserves the intended information.
98///
99/// The trait does not prescribe any specific ordering guarantees, and each implementation can
100/// decide to represent a push order for `extract` and `finish`, or not.
101pub trait ContainerBuilder: Default + 'static {
102 /// The container type we're building.
103 // The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
104 // all over Timely. `'static` because we don't want lifetimes everywhere.
105 type Container: Accountable + Default + Clone + 'static;
106 /// Extract assembled containers, potentially leaving unfinished data behind. Can
107 /// be called repeatedly, for example while the caller can send data.
108 ///
109 /// Returns a `Some` if there is data ready to be shipped, and `None` otherwise.
110 #[must_use]
111 fn extract(&mut self) -> Option<&mut Self::Container>;
112 /// Extract assembled containers and any unfinished data. Should
113 /// be called repeatedly until it returns `None`.
114 #[must_use]
115 fn finish(&mut self) -> Option<&mut Self::Container>;
116 /// Indicates a good moment to release resources.
117 ///
118 /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]
119 /// before calling this function. The implementation should not change the contents of the
120 /// builder.
121 #[inline]
122 fn relax(&mut self) { }
123}
124
125/// A wrapper trait indicating that the container building will preserve the number of records.
126///
127/// Specifically, the sum of record counts of all extracted and finished containers must equal the
128/// number of accounted records that are pushed into the container builder.
129/// If you have any questions about this trait you are best off not implementing it.
130pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
131
132/// A default container builder that uses length and preferred capacity to chunk data.
133///
134/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
135/// across [`Self::finish`] to maintain a low memory footprint.
136///
137/// Maintains FIFO order.
138#[derive(Default, Debug)]
139pub struct CapacityContainerBuilder<C>{
140 /// Container that we're writing to.
141 current: C,
142 /// Empty allocation.
143 empty: Option<C>,
144 /// Completed containers pending to be sent.
145 pending: VecDeque<C>,
146}
147
148impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
149 #[inline]
150 fn push_into(&mut self, item: T) {
151 // Ensure capacity
152 self.current.ensure_capacity(&mut self.empty);
153
154 // Push item
155 self.current.push_into(item);
156
157 // Maybe flush
158 if self.current.at_capacity() {
159 self.pending.push_back(std::mem::take(&mut self.current));
160 }
161 }
162}
163
164impl<C: Accountable + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
165 type Container = C;
166
167 #[inline]
168 fn extract(&mut self) -> Option<&mut C> {
169 if let Some(container) = self.pending.pop_front() {
170 self.empty = Some(container);
171 self.empty.as_mut()
172 } else {
173 None
174 }
175 }
176
177 #[inline]
178 fn finish(&mut self) -> Option<&mut C> {
179 if !self.current.is_empty() {
180 self.pending.push_back(std::mem::take(&mut self.current));
181 }
182 self.empty = self.pending.pop_front();
183 self.empty.as_mut()
184 }
185}
186
187impl<C: Accountable + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
188
189impl<T> Accountable for Vec<T> {
190 #[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
191 #[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
192}
193
194impl<T> IterContainer for Vec<T> {
195 type ItemRef<'a> = &'a T where T: 'a;
196 type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
197 #[inline] fn iter(&self) -> Self::Iter<'_> {
198 self.as_slice().iter()
199 }
200}
201
202impl<T> DrainContainer for Vec<T> {
203 type Item<'a> = T where T: 'a;
204 type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
205 #[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
206 self.drain(..)
207 }
208}
209
210impl<T> SizableContainer for Vec<T> {
211 fn at_capacity(&self) -> bool {
212 self.len() == self.capacity()
213 }
214 fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
215 if self.capacity() == 0 {
216 *self = stash.take().unwrap_or_default();
217 self.clear();
218 }
219 let preferred = buffer::default_capacity::<T>();
220 if self.capacity() < preferred {
221 self.reserve(preferred - self.capacity());
222 }
223 }
224}
225
226impl<T> PushInto<T> for Vec<T> {
227 #[inline]
228 fn push_into(&mut self, item: T) {
229 self.push(item)
230 }
231}
232
233
234impl<T: Clone> PushInto<&T> for Vec<T> {
235 #[inline]
236 fn push_into(&mut self, item: &T) {
237 self.push(item.clone())
238 }
239}
240
241impl<T: Clone> PushInto<&&T> for Vec<T> {
242 #[inline]
243 fn push_into(&mut self, item: &&T) {
244 self.push_into(*item)
245 }
246}
247
248mod rc {
249 use std::ops::Deref;
250 use std::rc::Rc;
251
252 use crate::{IterContainer, DrainContainer};
253
254 impl<T: crate::Accountable> crate::Accountable for Rc<T> {
255 #[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
256 #[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
257 }
258 impl<T: IterContainer> IterContainer for Rc<T> {
259 type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
260 type Iter<'a> = T::Iter<'a> where Self: 'a;
261 #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
262 }
263 impl<T: IterContainer> DrainContainer for Rc<T> {
264 type Item<'a> = T::ItemRef<'a> where Self: 'a;
265 type DrainIter<'a> = T::Iter<'a> where Self: 'a;
266 #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
267 }
268}
269
270mod arc {
271 use std::ops::Deref;
272 use std::sync::Arc;
273
274 use crate::{IterContainer, DrainContainer};
275
276 impl<T: crate::Accountable> crate::Accountable for Arc<T> {
277 #[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
278 #[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
279 }
280 impl<T: IterContainer> IterContainer for Arc<T> {
281 type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
282 type Iter<'a> = T::Iter<'a> where Self: 'a;
283 #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
284 }
285 impl<T: IterContainer> DrainContainer for Arc<T> {
286 type Item<'a> = T::ItemRef<'a> where Self: 'a;
287 type DrainIter<'a> = T::Iter<'a> where Self: 'a;
288 #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
289 }
290}
291
292pub mod buffer {
293 //! Functionality related to calculating default buffer sizes
294
295 /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts
296 /// this to size in elements.
297 pub const BUFFER_SIZE_BYTES: usize = 1 << 13;
298
299 /// The maximum buffer capacity in elements. Returns a number between [BUFFER_SIZE_BYTES]
300 /// and 1, inclusively.
301 pub const fn default_capacity<T>() -> usize {
302 let size = std::mem::size_of::<T>();
303 if size == 0 {
304 BUFFER_SIZE_BYTES
305 } else if size <= BUFFER_SIZE_BYTES {
306 BUFFER_SIZE_BYTES / size
307 } else {
308 1
309 }
310 }
311}