differential_dataflow/
containers.rs1use std::iter::FromIterator;
4
5pub use columnation::*;
6use timely::container::PushInto;
7
8pub struct TimelyStack<T: Columnation> {
18 local: Vec<T>,
19 inner: T::InnerRegion,
20}
21
22impl<T: Columnation> TimelyStack<T> {
23 pub fn with_capacity(capacity: usize) -> Self {
28 Self {
29 local: Vec::with_capacity(capacity),
30 inner: T::InnerRegion::default(),
31 }
32 }
33
34 #[inline(always)]
39 pub fn reserve_items<'a, I>(&mut self, items: I)
40 where
41 I: Iterator<Item= &'a T> + Clone,
42 T: 'a,
43 {
44 self.local.reserve(items.clone().count());
45 self.inner.reserve_items(items);
46 }
47
48 #[inline(always)]
53 pub fn reserve_regions<'a, I>(&mut self, regions: I)
54 where
55 Self: 'a,
56 I: Iterator<Item= &'a Self> + Clone,
57 {
58 self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum());
59 self.inner.reserve_regions(regions.map(|cs| &cs.inner));
60 }
61
62
63
64 pub fn copy(&mut self, item: &T) {
68 unsafe {
71 self.local.push(self.inner.copy(item));
72 }
73 }
74 pub fn clear(&mut self) {
76 unsafe {
77 self.local.set_len(0);
80 self.inner.clear();
81 }
82 }
83 pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
87 let mut write_position = index;
88 for position in index..self.local.len() {
89 if predicate(&self[position]) {
90 self.local.swap(position, write_position);
92 write_position += 1;
93 }
94 }
95 unsafe {
96 self.local.set_len(write_position);
99 }
100 }
101
102 pub unsafe fn local(&mut self) -> &mut [T] {
108 &mut self.local[..]
109 }
110
111 #[inline]
113 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
114 let size_of = std::mem::size_of::<T>();
115 callback(self.local.len() * size_of, self.local.capacity() * size_of);
116 self.inner.heap_size(callback);
117 }
118
119 #[inline]
121 pub fn summed_heap_size(&self) -> (usize, usize) {
122 let (mut length, mut capacity) = (0, 0);
123 self.heap_size(|len, cap| {
124 length += len;
125 capacity += cap
126 });
127 (length, capacity)
128 }
129
130 #[inline]
132 pub fn len(&self) -> usize {
133 self.local.len()
134 }
135
136 pub fn is_empty(&self) -> bool {
138 self.local.is_empty()
139 }
140
141 #[inline]
143 pub fn capacity(&self) -> usize {
144 self.local.capacity()
145 }
146
147 #[inline]
149 pub fn reserve(&mut self, additional: usize) {
150 self.local.reserve(additional)
151 }
152}
153
154impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
155 pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
162 unsafe {
163 self.local.push(self.inner.copy_destructured(t1, t2));
164 }
165 }
166}
167
168impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
169 pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
176 unsafe {
177 self.local.push(self.inner.copy_destructured(r0, r1, r2));
178 }
179 }
180}
181
182impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
183 type Target = [T];
184 #[inline(always)]
185 fn deref(&self) -> &Self::Target {
186 &self.local[..]
187 }
188}
189
190impl<T: Columnation> Drop for TimelyStack<T> {
191 fn drop(&mut self) {
192 self.clear();
193 }
194}
195
196impl<T: Columnation> Default for TimelyStack<T> {
197 fn default() -> Self {
198 Self {
199 local: Vec::new(),
200 inner: T::InnerRegion::default(),
201 }
202 }
203}
204
205impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
206 fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
207 let iter = iter.into_iter();
208 let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
209 for element in iter {
210 c.copy(element);
211 }
212
213 c
214 }
215}
216
217impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
218 fn eq(&self, other: &Self) -> bool {
219 PartialEq::eq(&self[..], &other[..])
220 }
221}
222
223impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
224
225impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 self[..].fmt(f)
228 }
229}
230
231impl<T: Columnation> Clone for TimelyStack<T> {
232 fn clone(&self) -> Self {
233 let mut new: Self = Default::default();
234 for item in &self[..] {
235 new.copy(item);
236 }
237 new
238 }
239
240 fn clone_from(&mut self, source: &Self) {
241 self.clear();
242 for item in &source[..] {
243 self.copy(item);
244 }
245 }
246}
247
248impl<T: Columnation> PushInto<T> for TimelyStack<T> {
249 #[inline]
250 fn push_into(&mut self, item: T) {
251 self.copy(&item);
252 }
253}
254
255impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
256 #[inline]
257 fn push_into(&mut self, item: &T) {
258 self.copy(item);
259 }
260}
261
262
263impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
264 #[inline]
265 fn push_into(&mut self, item: &&T) {
266 self.copy(*item);
267 }
268}
269
270mod container {
271 use std::ops::Deref;
272
273 use columnation::Columnation;
274 use timely::Container;
275 use timely::container::SizableContainer;
276
277 use crate::containers::TimelyStack;
278
279 impl<T: Columnation> Container for TimelyStack<T> {
280 type ItemRef<'a> = &'a T where Self: 'a;
281 type Item<'a> = &'a T where Self: 'a;
282
283 fn len(&self) -> usize {
284 self.local.len()
285 }
286
287 fn is_empty(&self) -> bool {
288 self.local.is_empty()
289 }
290
291 fn clear(&mut self) {
292 TimelyStack::clear(self)
293 }
294
295 type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
296
297 fn iter(&self) -> Self::Iter<'_> {
298 self.deref().iter()
299 }
300
301 type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
302
303 fn drain(&mut self) -> Self::DrainIter<'_> {
304 (*self).iter()
305 }
306 }
307
308 impl<T: Columnation> SizableContainer for TimelyStack<T> {
309 fn at_capacity(&self) -> bool {
310 self.len() == self.capacity()
311 }
312 fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
313 if self.capacity() == 0 {
314 *self = stash.take().unwrap_or_default();
315 self.clear();
316 }
317 let preferred = timely::container::buffer::default_capacity::<T>();
318 if self.capacity() < preferred {
319 self.reserve(preferred - self.capacity());
320 }
321 }
322 }
323}