differential_dataflow/
containers.rs

1//! A columnar container based on the columnation library.
2
3use std::iter::FromIterator;
4
5pub use columnation::*;
6use timely::container::PushInto;
7
8/// An append-only vector that store records as columns.
9///
10/// This container maintains elements that might conventionally own
11/// memory allocations, but instead the pointers to those allocations
12/// reference larger regions of memory shared with multiple instances
13/// of the type. Elements can be retrieved as references, and care is
14/// taken when this type is dropped to ensure that the correct memory
15/// is returned (rather than the incorrect memory, from running the
16/// elements `Drop` implementations).
17pub struct TimelyStack<T: Columnation> {
18    local: Vec<T>,
19    inner: T::InnerRegion,
20}
21
22impl<T: Columnation> TimelyStack<T> {
23    /// Construct a [TimelyStack], reserving space for `capacity` elements
24    ///
25    /// Note that the associated region is not initialized to a specific capacity
26    /// because we can't generally know how much space would be required.
27    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            local: Vec::with_capacity(capacity),
30            inner: T::InnerRegion::default(),
31        }
32    }
33
34    /// Ensures `Self` can absorb `items` without further allocations.
35    ///
36    /// The argument `items` may be cloned and iterated multiple times.
37    /// Please be careful if it contains side effects.
38    #[inline(always)]
39    pub fn reserve_items<'a, I>(&mut self, items: I)
40    where
41        I: Iterator<Item= &'a T> + Clone,
42        T: 'a,
43    {
44        self.local.reserve(items.clone().count());
45        self.inner.reserve_items(items);
46    }
47
48    /// Ensures `Self` can absorb `items` without further allocations.
49    ///
50    /// The argument `items` may be cloned and iterated multiple times.
51    /// Please be careful if it contains side effects.
52    #[inline(always)]
53    pub fn reserve_regions<'a, I>(&mut self, regions: I)
54    where
55        Self: 'a,
56        I: Iterator<Item= &'a Self> + Clone,
57    {
58        self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum());
59        self.inner.reserve_regions(regions.map(|cs| &cs.inner));
60    }
61
62
63
64    /// Copies an element in to the region.
65    ///
66    /// The element can be read by indexing
67    pub fn copy(&mut self, item: &T) {
68        // TODO: Some types `T` should just be cloned.
69        // E.g. types that are `Copy` or vecs of ZSTs.
70        unsafe {
71            self.local.push(self.inner.copy(item));
72        }
73    }
74    /// Empties the collection.
75    pub fn clear(&mut self) {
76        unsafe {
77            // Unsafety justified in that setting the length to zero exposes
78            // no invalid data.
79            self.local.set_len(0);
80            self.inner.clear();
81        }
82    }
83    /// Retain elements that pass a predicate, from a specified offset.
84    ///
85    /// This method may or may not reclaim memory in the inner region.
86    pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
87        let mut write_position = index;
88        for position in index..self.local.len() {
89            if predicate(&self[position]) {
90                // TODO: compact the inner region and update pointers.
91                self.local.swap(position, write_position);
92                write_position += 1;
93            }
94        }
95        unsafe {
96            // Unsafety justified in that `write_position` is no greater than
97            // `self.local.len()` and so this exposes no invalid data.
98            self.local.set_len(write_position);
99        }
100    }
101
102    /// Unsafe access to `local` data. The slices stor data that is backed by a region
103    /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice.
104    ///
105    /// # Safety
106    /// Elements within `local` can be reordered, but not mutated, removed and/or dropped.
107    pub unsafe fn local(&mut self) -> &mut [T] {
108        &mut self.local[..]
109    }
110
111    /// Estimate the memory capacity in bytes.
112    #[inline]
113    pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
114        let size_of = std::mem::size_of::<T>();
115        callback(self.local.len() * size_of, self.local.capacity() * size_of);
116        self.inner.heap_size(callback);
117    }
118
119    /// Estimate the consumed memory capacity in bytes, summing both used and total capacity.
120    #[inline]
121    pub fn summed_heap_size(&self) -> (usize, usize) {
122        let (mut length, mut capacity) = (0, 0);
123        self.heap_size(|len, cap| {
124            length += len;
125            capacity += cap
126        });
127        (length, capacity)
128    }
129
130    /// The length in items.
131    #[inline]
132    pub fn len(&self) -> usize {
133        self.local.len()
134    }
135
136    /// Returns `true` if the stack is empty.
137    pub fn is_empty(&self) -> bool {
138        self.local.is_empty()
139    }
140
141    /// The capacity of the local vector.
142    #[inline]
143    pub fn capacity(&self) -> usize {
144        self.local.capacity()
145    }
146
147    /// Reserve space for `additional` elements.
148    #[inline]
149    pub fn reserve(&mut self, additional: usize) {
150        self.local.reserve(additional)
151    }
152}
153
154impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
155    /// Copies a destructured tuple `(A, B)` into this column stack.
156    ///
157    /// This serves situations where a tuple should be constructed from its constituents but
158    /// not all elements are available as owned data.
159    ///
160    /// The element can be read by indexing
161    pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
162        unsafe {
163            self.local.push(self.inner.copy_destructured(t1, t2));
164        }
165    }
166}
167
168impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
169    /// Copies a destructured tuple `(A, B, C)` into this column stack.
170    ///
171    /// This serves situations where a tuple should be constructed from its constituents but
172    /// not all elements are available as owned data.
173    ///
174    /// The element can be read by indexing
175    pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
176        unsafe {
177            self.local.push(self.inner.copy_destructured(r0, r1, r2));
178        }
179    }
180}
181
182impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
183    type Target = [T];
184    #[inline(always)]
185    fn deref(&self) -> &Self::Target {
186        &self.local[..]
187    }
188}
189
190impl<T: Columnation> Drop for TimelyStack<T> {
191    fn drop(&mut self) {
192        self.clear();
193    }
194}
195
196impl<T: Columnation> Default for TimelyStack<T> {
197    fn default() -> Self {
198        Self {
199            local: Vec::new(),
200            inner: T::InnerRegion::default(),
201        }
202    }
203}
204
205impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
206    fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
207        let iter = iter.into_iter();
208        let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
209        for element in iter {
210            c.copy(element);
211        }
212
213        c
214    }
215}
216
217impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
218    fn eq(&self, other: &Self) -> bool {
219        PartialEq::eq(&self[..], &other[..])
220    }
221}
222
223impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
224
225impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        self[..].fmt(f)
228    }
229}
230
231impl<T: Columnation> Clone for TimelyStack<T> {
232    fn clone(&self) -> Self {
233        let mut new: Self = Default::default();
234        for item in &self[..] {
235            new.copy(item);
236        }
237        new
238    }
239
240    fn clone_from(&mut self, source: &Self) {
241        self.clear();
242        for item in &source[..] {
243            self.copy(item);
244        }
245    }
246}
247
248impl<T: Columnation> PushInto<T> for TimelyStack<T> {
249    #[inline]
250    fn push_into(&mut self, item: T) {
251        self.copy(&item);
252    }
253}
254
255impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
256    #[inline]
257    fn push_into(&mut self, item: &T) {
258        self.copy(item);
259    }
260}
261
262
263impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
264    #[inline]
265    fn push_into(&mut self, item: &&T) {
266        self.copy(*item);
267    }
268}
269
270mod container {
271    use std::ops::Deref;
272
273    use columnation::Columnation;
274    use timely::Container;
275    use timely::container::SizableContainer;
276
277    use crate::containers::TimelyStack;
278
279    impl<T: Columnation> Container for TimelyStack<T> {
280        type ItemRef<'a> = &'a T where Self: 'a;
281        type Item<'a> = &'a T where Self: 'a;
282
283        fn len(&self) -> usize {
284            self.local.len()
285        }
286
287        fn is_empty(&self) -> bool {
288            self.local.is_empty()
289        }
290
291        fn clear(&mut self) {
292            TimelyStack::clear(self)
293        }
294
295        type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
296
297        fn iter(&self) -> Self::Iter<'_> {
298            self.deref().iter()
299        }
300
301        type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
302
303        fn drain(&mut self) -> Self::DrainIter<'_> {
304            (*self).iter()
305        }
306    }
307
308    impl<T: Columnation> SizableContainer for TimelyStack<T> {
309        fn at_capacity(&self) -> bool {
310            self.len() == self.capacity()
311        }
312        fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
313            if self.capacity() == 0 {
314                *self = stash.take().unwrap_or_default();
315                self.clear();
316            }
317            let preferred = timely::container::buffer::default_capacity::<T>();
318            if self.capacity() < preferred {
319                self.reserve(preferred - self.capacity());
320            }
321        }
322    }
323}