timely/progress/
change_batch.rs

1//! A collection of updates of the form `(T, i64)`.
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5
6/// A collection of updates of the form `(T, i64)`.
7///
8/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
9/// the representation and removing elements whose `i64` field accumulates to zero.
10///
11/// The implementation is designed to be as lazy as possible, simply appending to a list of updates
12/// until they are required. This means that several seemingly simple operations may be expensive, in
13/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
14/// expensive operations; all operations should take an amortized constant or logarithmic time.
15#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
16pub struct ChangeBatch<T, const X: usize = 2> {
17    // A list of updates to which we append.
18    updates: SmallVec<[(T, i64); X]>,
19    // The length of the prefix of `self.updates` known to be compact.
20    clean: usize,
21}
22
23impl<T, const X: usize> ChangeBatch<T, X> {
24
25    /// Allocates a new empty `ChangeBatch`.
26    ///
27    /// # Examples
28    ///
29    ///```
30    /// use timely::progress::ChangeBatch;
31    ///
32    /// let mut batch = ChangeBatch::<usize>::new();
33    /// assert!(batch.is_empty());
34    ///```
35    pub fn new() -> Self {
36        ChangeBatch {
37            updates: SmallVec::new(),
38            clean: 0,
39        }
40    }
41
42    /// Allocates a new empty `ChangeBatch` with space for `capacity` updates.
43    ///
44    /// # Examples
45    ///
46    ///```
47    /// use timely::progress::ChangeBatch;
48    ///
49    /// let mut batch = ChangeBatch::<usize>::with_capacity(10);
50    /// assert!(batch.is_empty());
51    ///```
52    pub fn with_capacity(capacity: usize) -> Self {
53        ChangeBatch {
54            updates: SmallVec::with_capacity(capacity),
55            clean: 0,
56        }
57    }
58
59    /// Returns `true` if the change batch is not guaranteed compact.
60    pub fn is_dirty(&self) -> bool {
61        self.updates.len() > self.clean
62    }
63
64    /// Expose the internal vector of updates.
65    pub fn unstable_internal_updates(&self) -> &SmallVec<[(T, i64); X]> { &self.updates }
66
67    /// Expose the internal value of `clean`.
68    pub fn unstable_internal_clean(&self) -> usize { self.clean }
69
70    /// Clears the map.
71    ///
72    /// # Examples
73    ///
74    ///```
75    /// use timely::progress::ChangeBatch;
76    ///
77    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
78    /// batch.clear();
79    /// assert!(batch.is_empty());
80    ///```
81    #[inline]
82    pub fn clear(&mut self) {
83        self.updates.clear();
84        self.clean = 0;
85    }
86}
87
88impl<T, const X: usize> ChangeBatch<T, X>
89where
90    T: Ord,
91{
92    
93    /// Allocates a new `ChangeBatch` with a single entry.
94    ///
95    /// # Examples
96    ///
97    ///```
98    /// use timely::progress::ChangeBatch;
99    ///
100    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
101    /// assert!(!batch.is_empty());
102    ///```
103    pub fn new_from(key: T, val: i64) -> Self {
104        let mut result = ChangeBatch::new();
105        result.update(key, val);
106        result
107    }
108
109    /// Adds a new update, for `item` with `value`.
110    ///
111    /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
112    /// half the length of the list, which would keep the total footprint within reasonable bounds
113    /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
114    /// is worth paying without some experimentation.
115    ///
116    /// # Examples
117    ///
118    ///```
119    /// use timely::progress::ChangeBatch;
120    ///
121    /// let mut batch = ChangeBatch::<usize>::new();
122    /// batch.update(17, 1);
123    /// assert!(!batch.is_empty());
124    ///```
125    #[inline]
126    pub fn update(&mut self, item: T, value: i64) {
127        self.updates.push((item, value));
128        self.maintain_bounds();
129    }
130
131    /// Performs a sequence of updates described by `iterator`.
132    ///
133    /// # Examples
134    ///
135    ///```
136    /// use timely::progress::ChangeBatch;
137    ///
138    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
139    /// batch.extend(vec![(17, -1)].into_iter());
140    /// assert!(batch.is_empty());
141    ///```
142    #[inline]
143    pub fn extend<I: Iterator<Item=(T, i64)>>(&mut self, iterator: I) {
144        self.updates.extend(iterator);
145        self.maintain_bounds();
146    }
147
148    /// Extracts the `Vec<(T, i64)>` from the map, consuming it.
149    ///
150    /// # Examples
151    ///
152    ///```
153    /// use timely::progress::ChangeBatch;
154    ///
155    /// let batch = ChangeBatch::<usize>::new_from(17, 1);
156    /// assert_eq!(batch.into_inner().to_vec(), vec![(17, 1)]);
157    ///```
158    pub fn into_inner(mut self) -> SmallVec<[(T, i64); X]> {
159        self.compact();
160        self.updates
161    }
162
163    /// Iterates over the contents of the map.
164    ///
165    /// # Examples
166    ///
167    ///```
168    /// use timely::progress::ChangeBatch;
169    ///
170    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
171    /// {   // scope allows borrow of `batch` to drop.
172    ///     let mut iter = batch.iter();
173    ///     assert_eq!(iter.next(), Some(&(17, 1)));
174    ///     assert_eq!(iter.next(), None);
175    /// }
176    /// assert!(!batch.is_empty());
177    ///```
178    #[inline]
179    pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
180        self.compact();
181        self.updates.iter()
182    }
183
184    /// Drains the set of updates.
185    ///
186    /// This operation first compacts the set of updates so that the drained results
187    /// have at most one occurrence of each item.
188    ///
189    /// # Examples
190    ///
191    ///```
192    /// use timely::progress::ChangeBatch;
193    ///
194    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
195    /// {   // scope allows borrow of `batch` to drop.
196    ///     let mut iter = batch.drain();
197    ///     assert_eq!(iter.next(), Some((17, 1)));
198    ///     assert_eq!(iter.next(), None);
199    /// }
200    /// assert!(batch.is_empty());
201    ///```
202    #[inline]
203    pub fn drain(&mut self) -> smallvec::Drain<[(T, i64); X]> {
204        self.compact();
205        self.clean = 0;
206        self.updates.drain(..)
207    }
208
209    /// Returns `true` iff all keys have value zero.
210    ///
211    /// This method requires mutable access to `self` because it may need to compact the representation
212    /// to determine if the batch of updates is indeed empty. We could also implement a weaker form of
213    /// `is_empty` which just checks the length of `self.updates`, and which could confirm the absence of
214    /// any updates, but could report false negatives if there are updates which would cancel.
215    ///
216    /// # Examples
217    ///
218    ///```
219    /// use timely::progress::ChangeBatch;
220    ///
221    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
222    /// batch.update(17, -1);
223    /// assert!(batch.is_empty());
224    ///```
225    #[inline]
226    pub fn is_empty(&mut self) -> bool {
227        if self.clean > self.updates.len() / 2 {
228            false
229        }
230        else {
231            self.compact();
232            self.updates.is_empty()
233        }
234    }
235
236    /// Number of compacted updates.
237    ///
238    /// This method requires mutable access to `self` because it may need to compact the
239    /// representation to determine the number of actual updates.
240    ///
241    /// # Examples
242    ///
243    ///```
244    /// use timely::progress::ChangeBatch;
245    ///
246    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
247    /// batch.update(17, -1);
248    /// batch.update(14, -1);
249    /// assert_eq!(batch.len(), 1);
250    ///```
251    #[inline]
252    pub fn len(&mut self) -> usize {
253        self.compact();
254        self.updates.len()
255    }
256
257    /// Drains `self` into `other`.
258    ///
259    /// This method has similar a effect to calling `other.extend(self.drain())`, but has the
260    /// opportunity to optimize this to a `::std::mem::swap(self, other)` when `other` is empty.
261    /// As many uses of this method are to propagate updates, this optimization can be quite
262    /// handy.
263    ///
264    /// # Examples
265    ///
266    ///```
267    /// use timely::progress::ChangeBatch;
268    ///
269    /// let mut batch1 = ChangeBatch::<usize>::new_from(17, 1);
270    /// let mut batch2 = ChangeBatch::new();
271    /// batch1.drain_into(&mut batch2);
272    /// assert!(batch1.is_empty());
273    /// assert!(!batch2.is_empty());
274    ///```
275    #[inline]
276    pub fn drain_into(&mut self, other: &mut ChangeBatch<T, X>) where T: Clone {
277        if other.updates.is_empty() {
278            ::std::mem::swap(self, other);
279        }
280        else {
281            other.extend(self.updates.drain(..));
282            self.clean = 0;
283        }
284    }
285
286    /// Compact the internal representation.
287    ///
288    /// This method sort `self.updates` and consolidates elements with equal item, discarding
289    /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
290    /// elements is non-zero.
291    #[inline]
292    pub fn compact(&mut self) {
293        if self.clean < self.updates.len() && self.updates.len() > 1 {
294            self.updates.sort_by(|x,y| x.0.cmp(&y.0));
295            for i in 0 .. self.updates.len() - 1 {
296                if self.updates[i].0 == self.updates[i+1].0 {
297                    self.updates[i+1].1 += self.updates[i].1;
298                    self.updates[i].1 = 0;
299                }
300            }
301
302            self.updates.retain(|x| x.1 != 0);
303        }
304        self.clean = self.updates.len();
305    }
306
307    /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
308    /// This function tries to minimize work by only compacting if enough work has accumulated.
309    fn maintain_bounds(&mut self) {
310        // if we have more than 32 elements and at least half of them are not clean, compact
311        if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
312            self.compact()
313        }
314    }
315}
316
317impl<T, const X: usize> Default for ChangeBatch<T, X> {
318    fn default() -> Self {
319        Self::new()
320    }
321}