timely/progress/change_batch.rs
1//! A collection of updates of the form `(T, i64)`.
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5
6/// A collection of updates of the form `(T, i64)`.
7///
8/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
9/// the representation and removing elements whose `i64` field accumulates to zero.
10///
11/// The implementation is designed to be as lazy as possible, simply appending to a list of updates
12/// until they are required. This means that several seemingly simple operations may be expensive, in
13/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
14/// expensive operations; all operations should take an amortized constant or logarithmic time.
15#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
16pub struct ChangeBatch<T, const X: usize = 2> {
17 // A list of updates to which we append.
18 updates: SmallVec<[(T, i64); X]>,
19 // The length of the prefix of `self.updates` known to be compact.
20 clean: usize,
21}
22
23impl<T, const X: usize> ChangeBatch<T, X> {
24
25 /// Allocates a new empty `ChangeBatch`.
26 ///
27 /// # Examples
28 ///
29 ///```
30 /// use timely::progress::ChangeBatch;
31 ///
32 /// let mut batch = ChangeBatch::<usize>::new();
33 /// assert!(batch.is_empty());
34 ///```
35 pub fn new() -> Self {
36 ChangeBatch {
37 updates: SmallVec::new(),
38 clean: 0,
39 }
40 }
41
42 /// Allocates a new empty `ChangeBatch` with space for `capacity` updates.
43 ///
44 /// # Examples
45 ///
46 ///```
47 /// use timely::progress::ChangeBatch;
48 ///
49 /// let mut batch = ChangeBatch::<usize>::with_capacity(10);
50 /// assert!(batch.is_empty());
51 ///```
52 pub fn with_capacity(capacity: usize) -> Self {
53 ChangeBatch {
54 updates: SmallVec::with_capacity(capacity),
55 clean: 0,
56 }
57 }
58
59 /// Returns `true` if the change batch is not guaranteed compact.
60 pub fn is_dirty(&self) -> bool {
61 self.updates.len() > self.clean
62 }
63
64 /// Expose the internal vector of updates.
65 pub fn unstable_internal_updates(&self) -> &SmallVec<[(T, i64); X]> { &self.updates }
66
67 /// Expose the internal value of `clean`.
68 pub fn unstable_internal_clean(&self) -> usize { self.clean }
69
70 /// Clears the map.
71 ///
72 /// # Examples
73 ///
74 ///```
75 /// use timely::progress::ChangeBatch;
76 ///
77 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
78 /// batch.clear();
79 /// assert!(batch.is_empty());
80 ///```
81 #[inline]
82 pub fn clear(&mut self) {
83 self.updates.clear();
84 self.clean = 0;
85 }
86}
87
88impl<T, const X: usize> ChangeBatch<T, X>
89where
90 T: Ord,
91{
92
93 /// Allocates a new `ChangeBatch` with a single entry.
94 ///
95 /// # Examples
96 ///
97 ///```
98 /// use timely::progress::ChangeBatch;
99 ///
100 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
101 /// assert!(!batch.is_empty());
102 ///```
103 pub fn new_from(key: T, val: i64) -> Self {
104 let mut result = ChangeBatch::new();
105 result.update(key, val);
106 result
107 }
108
109 /// Adds a new update, for `item` with `value`.
110 ///
111 /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
112 /// half the length of the list, which would keep the total footprint within reasonable bounds
113 /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
114 /// is worth paying without some experimentation.
115 ///
116 /// # Examples
117 ///
118 ///```
119 /// use timely::progress::ChangeBatch;
120 ///
121 /// let mut batch = ChangeBatch::<usize>::new();
122 /// batch.update(17, 1);
123 /// assert!(!batch.is_empty());
124 ///```
125 #[inline]
126 pub fn update(&mut self, item: T, value: i64) {
127 self.updates.push((item, value));
128 self.maintain_bounds();
129 }
130
131 /// Performs a sequence of updates described by `iterator`.
132 ///
133 /// # Examples
134 ///
135 ///```
136 /// use timely::progress::ChangeBatch;
137 ///
138 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
139 /// batch.extend(vec![(17, -1)].into_iter());
140 /// assert!(batch.is_empty());
141 ///```
142 #[inline]
143 pub fn extend<I: Iterator<Item=(T, i64)>>(&mut self, iterator: I) {
144 self.updates.extend(iterator);
145 self.maintain_bounds();
146 }
147
148 /// Extracts the `Vec<(T, i64)>` from the map, consuming it.
149 ///
150 /// # Examples
151 ///
152 ///```
153 /// use timely::progress::ChangeBatch;
154 ///
155 /// let batch = ChangeBatch::<usize>::new_from(17, 1);
156 /// assert_eq!(batch.into_inner().to_vec(), vec![(17, 1)]);
157 ///```
158 pub fn into_inner(mut self) -> SmallVec<[(T, i64); X]> {
159 self.compact();
160 self.updates
161 }
162
163 /// Iterates over the contents of the map.
164 ///
165 /// # Examples
166 ///
167 ///```
168 /// use timely::progress::ChangeBatch;
169 ///
170 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
171 /// { // scope allows borrow of `batch` to drop.
172 /// let mut iter = batch.iter();
173 /// assert_eq!(iter.next(), Some(&(17, 1)));
174 /// assert_eq!(iter.next(), None);
175 /// }
176 /// assert!(!batch.is_empty());
177 ///```
178 #[inline]
179 pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
180 self.compact();
181 self.updates.iter()
182 }
183
184 /// Drains the set of updates.
185 ///
186 /// This operation first compacts the set of updates so that the drained results
187 /// have at most one occurrence of each item.
188 ///
189 /// # Examples
190 ///
191 ///```
192 /// use timely::progress::ChangeBatch;
193 ///
194 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
195 /// { // scope allows borrow of `batch` to drop.
196 /// let mut iter = batch.drain();
197 /// assert_eq!(iter.next(), Some((17, 1)));
198 /// assert_eq!(iter.next(), None);
199 /// }
200 /// assert!(batch.is_empty());
201 ///```
202 #[inline]
203 pub fn drain(&mut self) -> smallvec::Drain<[(T, i64); X]> {
204 self.compact();
205 self.clean = 0;
206 self.updates.drain(..)
207 }
208
209 /// Returns `true` iff all keys have value zero.
210 ///
211 /// This method requires mutable access to `self` because it may need to compact the representation
212 /// to determine if the batch of updates is indeed empty. We could also implement a weaker form of
213 /// `is_empty` which just checks the length of `self.updates`, and which could confirm the absence of
214 /// any updates, but could report false negatives if there are updates which would cancel.
215 ///
216 /// # Examples
217 ///
218 ///```
219 /// use timely::progress::ChangeBatch;
220 ///
221 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
222 /// batch.update(17, -1);
223 /// assert!(batch.is_empty());
224 ///```
225 #[inline]
226 pub fn is_empty(&mut self) -> bool {
227 if self.clean > self.updates.len() / 2 {
228 false
229 }
230 else {
231 self.compact();
232 self.updates.is_empty()
233 }
234 }
235
236 /// Number of compacted updates.
237 ///
238 /// This method requires mutable access to `self` because it may need to compact the
239 /// representation to determine the number of actual updates.
240 ///
241 /// # Examples
242 ///
243 ///```
244 /// use timely::progress::ChangeBatch;
245 ///
246 /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
247 /// batch.update(17, -1);
248 /// batch.update(14, -1);
249 /// assert_eq!(batch.len(), 1);
250 ///```
251 #[inline]
252 pub fn len(&mut self) -> usize {
253 self.compact();
254 self.updates.len()
255 }
256
257 /// Drains `self` into `other`.
258 ///
259 /// This method has similar a effect to calling `other.extend(self.drain())`, but has the
260 /// opportunity to optimize this to a `::std::mem::swap(self, other)` when `other` is empty.
261 /// As many uses of this method are to propagate updates, this optimization can be quite
262 /// handy.
263 ///
264 /// # Examples
265 ///
266 ///```
267 /// use timely::progress::ChangeBatch;
268 ///
269 /// let mut batch1 = ChangeBatch::<usize>::new_from(17, 1);
270 /// let mut batch2 = ChangeBatch::new();
271 /// batch1.drain_into(&mut batch2);
272 /// assert!(batch1.is_empty());
273 /// assert!(!batch2.is_empty());
274 ///```
275 #[inline]
276 pub fn drain_into(&mut self, other: &mut ChangeBatch<T, X>) where T: Clone {
277 if other.updates.is_empty() {
278 ::std::mem::swap(self, other);
279 }
280 else {
281 other.extend(self.updates.drain(..));
282 self.clean = 0;
283 }
284 }
285
286 /// Compact the internal representation.
287 ///
288 /// This method sort `self.updates` and consolidates elements with equal item, discarding
289 /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
290 /// elements is non-zero.
291 #[inline]
292 pub fn compact(&mut self) {
293 if self.clean < self.updates.len() && self.updates.len() > 1 {
294 self.updates.sort_by(|x,y| x.0.cmp(&y.0));
295 for i in 0 .. self.updates.len() - 1 {
296 if self.updates[i].0 == self.updates[i+1].0 {
297 self.updates[i+1].1 += self.updates[i].1;
298 self.updates[i].1 = 0;
299 }
300 }
301
302 self.updates.retain(|x| x.1 != 0);
303 }
304 self.clean = self.updates.len();
305 }
306
307 /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
308 /// This function tries to minimize work by only compacting if enough work has accumulated.
309 fn maintain_bounds(&mut self) {
310 // if we have more than 32 elements and at least half of them are not clean, compact
311 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
312 self.compact()
313 }
314 }
315}
316
317impl<T, const X: usize> Default for ChangeBatch<T, X> {
318 fn default() -> Self {
319 Self::new()
320 }
321}