1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
//! A collection of updates of the form `(T, i64)`.

use serde::{Deserialize, Serialize};
use smallvec::SmallVec;

/// A collection of updates of the form `(T, i64)`.
///
/// A `ChangeBatch` accumulates updates of the form `(T, i64)`, where it is capable of consolidating
/// the representation and removing elements whose `i64` field accumulates to zero.
///
/// The implementation is designed to be as lazy as possible, simply appending to a list of updates
/// until they are required. This means that several seemingly simple operations may be expensive, in
/// that they may provoke a compaction. I've tried to prevent exposing methods that allow surprisingly
/// expensive operations; all operations should take an amortized constant or logarithmic time.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ChangeBatch<T, const X: usize = 2> {
    // A list of updates to which we append.
    updates: SmallVec<[(T, i64); X]>,
    // The length of the prefix of `self.updates` known to be compact.
    clean: usize,
}

impl<T, const X: usize> ChangeBatch<T, X> {

    /// Allocates a new empty `ChangeBatch`.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new();
    /// assert!(batch.is_empty());
    ///```
    pub fn new() -> Self {
        ChangeBatch {
            updates: SmallVec::new(),
            clean: 0,
        }
    }

    /// Allocates a new empty `ChangeBatch` with space for `capacity` updates.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::with_capacity(10);
    /// assert!(batch.is_empty());
    ///```
    pub fn with_capacity(capacity: usize) -> Self {
        ChangeBatch {
            updates: SmallVec::with_capacity(capacity),
            clean: 0,
        }
    }

    /// Returns `true` if the change batch is not guaranteed compact.
    pub fn is_dirty(&self) -> bool {
        self.updates.len() > self.clean
    }

    /// Expose the internal vector of updates.
    pub fn unstable_internal_updates(&self) -> &SmallVec<[(T, i64); X]> { &self.updates }

    /// Expose the internal value of `clean`.
    pub fn unstable_internal_clean(&self) -> usize { self.clean }

    /// Clears the map.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// batch.clear();
    /// assert!(batch.is_empty());
    ///```
    #[inline]
    pub fn clear(&mut self) {
        self.updates.clear();
        self.clean = 0;
    }
}

impl<T, const X: usize> ChangeBatch<T, X>
where
    T: Ord,
{
    
    /// Allocates a new `ChangeBatch` with a single entry.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// assert!(!batch.is_empty());
    ///```
    pub fn new_from(key: T, val: i64) -> Self {
        let mut result = ChangeBatch::new();
        result.update(key, val);
        result
    }

    /// Adds a new update, for `item` with `value`.
    ///
    /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
    /// half the length of the list, which would keep the total footprint within reasonable bounds
    /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
    /// is worth paying without some experimentation.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new();
    /// batch.update(17, 1);
    /// assert!(!batch.is_empty());
    ///```
    #[inline]
    pub fn update(&mut self, item: T, value: i64) {
        self.updates.push((item, value));
        self.maintain_bounds();
    }

    /// Performs a sequence of updates described by `iterator`.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// batch.extend(vec![(17, -1)].into_iter());
    /// assert!(batch.is_empty());
    ///```
    #[inline]
    pub fn extend<I: Iterator<Item=(T, i64)>>(&mut self, iterator: I) {
        self.updates.extend(iterator);
        self.maintain_bounds();
    }

    /// Extracts the `Vec<(T, i64)>` from the map, consuming it.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let batch = ChangeBatch::<usize>::new_from(17, 1);
    /// assert_eq!(batch.into_inner().to_vec(), vec![(17, 1)]);
    ///```
    pub fn into_inner(mut self) -> SmallVec<[(T, i64); X]> {
        self.compact();
        self.updates
    }

    /// Iterates over the contents of the map.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// {   // scope allows borrow of `batch` to drop.
    ///     let mut iter = batch.iter();
    ///     assert_eq!(iter.next(), Some(&(17, 1)));
    ///     assert_eq!(iter.next(), None);
    /// }
    /// assert!(!batch.is_empty());
    ///```
    #[inline]
    pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
        self.compact();
        self.updates.iter()
    }

    /// Drains the set of updates.
    ///
    /// This operation first compacts the set of updates so that the drained results
    /// have at most one occurrence of each item.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// {   // scope allows borrow of `batch` to drop.
    ///     let mut iter = batch.drain();
    ///     assert_eq!(iter.next(), Some((17, 1)));
    ///     assert_eq!(iter.next(), None);
    /// }
    /// assert!(batch.is_empty());
    ///```
    #[inline]
    pub fn drain(&mut self) -> smallvec::Drain<[(T, i64); X]> {
        self.compact();
        self.clean = 0;
        self.updates.drain(..)
    }

    /// Returns `true` iff all keys have value zero.
    ///
    /// This method requires mutable access to `self` because it may need to compact the representation
    /// to determine if the batch of updates is indeed empty. We could also implement a weaker form of
    /// `is_empty` which just checks the length of `self.updates`, and which could confirm the absence of
    /// any updates, but could report false negatives if there are updates which would cancel.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// batch.update(17, -1);
    /// assert!(batch.is_empty());
    ///```
    #[inline]
    pub fn is_empty(&mut self) -> bool {
        if self.clean > self.updates.len() / 2 {
            false
        }
        else {
            self.compact();
            self.updates.is_empty()
        }
    }

    /// Number of compacted updates.
    ///
    /// This method requires mutable access to `self` because it may need to compact the
    /// representation to determine the number of actual updates.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch = ChangeBatch::<usize>::new_from(17, 1);
    /// batch.update(17, -1);
    /// batch.update(14, -1);
    /// assert_eq!(batch.len(), 1);
    ///```
    #[inline]
    pub fn len(&mut self) -> usize {
        self.compact();
        self.updates.len()
    }

    /// Drains `self` into `other`.
    ///
    /// This method has similar a effect to calling `other.extend(self.drain())`, but has the
    /// opportunity to optimize this to a `::std::mem::swap(self, other)` when `other` is empty.
    /// As many uses of this method are to propagate updates, this optimization can be quite
    /// handy.
    ///
    /// # Examples
    ///
    ///```
    /// use timely::progress::ChangeBatch;
    ///
    /// let mut batch1 = ChangeBatch::<usize>::new_from(17, 1);
    /// let mut batch2 = ChangeBatch::new();
    /// batch1.drain_into(&mut batch2);
    /// assert!(batch1.is_empty());
    /// assert!(!batch2.is_empty());
    ///```
    #[inline]
    pub fn drain_into(&mut self, other: &mut ChangeBatch<T, X>) where T: Clone {
        if other.updates.is_empty() {
            ::std::mem::swap(self, other);
        }
        else {
            other.extend(self.updates.drain(..));
            self.clean = 0;
        }
    }

    /// Compact the internal representation.
    ///
    /// This method sort `self.updates` and consolidates elements with equal item, discarding
    /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
    /// elements is non-zero.
    #[inline]
    pub fn compact(&mut self) {
        if self.clean < self.updates.len() && self.updates.len() > 1 {
            self.updates.sort_by(|x,y| x.0.cmp(&y.0));
            for i in 0 .. self.updates.len() - 1 {
                if self.updates[i].0 == self.updates[i+1].0 {
                    self.updates[i+1].1 += self.updates[i].1;
                    self.updates[i].1 = 0;
                }
            }

            self.updates.retain(|x| x.1 != 0);
        }
        self.clean = self.updates.len();
    }

    /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
    /// This function tries to minimize work by only compacting if enough work has accumulated.
    fn maintain_bounds(&mut self) {
        // if we have more than 32 elements and at least half of them are not clean, compact
        if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
            self.compact()
        }
    }
}

impl<T, const X: usize> Default for ChangeBatch<T, X> {
    fn default() -> Self {
        Self::new()
    }
}