differential_dataflow/
consolidation.rs

1//! Common logic for the consolidation of vectors of Semigroups.
2//!
3//! Often we find ourselves with collections of records with associated weights (often
4//! integers) where we want to reduce the collection to the point that each record occurs
5//! at most once, with the accumulated weights. These methods supply that functionality.
6//!
7//! Importantly, these methods are used internally by differential dataflow, but are made
8//! public for the convenience of others. Their precise behavior is driven by the needs of
9//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
10//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
11//! specific behavior you require.
12
13use std::cmp::Ordering;
14use std::collections::VecDeque;
15use timely::Container;
16use timely::container::{ContainerBuilder, PushInto};
17use crate::Data;
18use crate::difference::{IsZero, Semigroup};
19
20/// Sorts and consolidates `vec`.
21///
22/// This method will sort `vec` and then consolidate runs of more than one entry with
23/// identical first elements by accumulating the second elements of the pairs. Should the final
24/// accumulation be zero, the element is discarded.
25#[inline]
26pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
27    consolidate_from(vec, 0);
28}
29
30/// Sorts and consolidate `vec[offset..]`.
31///
32/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
33/// identical first elements by accumulating the second elements of the pairs. Should the final
34/// accumulation be zero, the element is discarded.
35#[inline]
36pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
37    let length = consolidate_slice(&mut vec[offset..]);
38    vec.truncate(offset + length);
39}
40
41/// Sorts and consolidates a slice, returning the valid prefix length.
42#[inline]
43pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
44    if slice.len() > 1 {
45        consolidate_slice_slow(slice)
46    }
47    else {
48        slice.iter().filter(|x| !x.1.is_zero()).count()
49    }
50}
51
52/// Part of `consolidate_slice` that handles slices of length greater than 1.
53fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
54    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
55    // In a world where there are not many results, we may never even need to call in to merge sort.
56    slice.sort_by(|x,y| x.0.cmp(&y.0));
57
58    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
59    let mut offset = 0;
60    let mut accum = slice[offset].1.clone();
61
62    for index in 1 .. slice.len() {
63        if slice[index].0 == slice[index-1].0 {
64            accum.plus_equals(&slice[index].1);
65        }
66        else {
67            if !accum.is_zero() {
68                slice.swap(offset, index-1);
69                slice[offset].1.clone_from(&accum);
70                offset += 1;
71            }
72            accum.clone_from(&slice[index].1);
73        }
74    }
75    if !accum.is_zero() {
76        slice.swap(offset, slice.len()-1);
77        slice[offset].1 = accum;
78        offset += 1;
79    }
80
81    offset
82}
83
84/// Sorts and consolidates `vec`.
85///
86/// This method will sort `vec` and then consolidate runs of more than one entry with
87/// identical first two elements by accumulating the third elements of the triples. Should the final
88/// accumulation be zero, the element is discarded.
89#[inline]
90pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
91    consolidate_updates_from(vec, 0);
92}
93
94/// Sorts and consolidate `vec[offset..]`.
95///
96/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
97/// identical first two elements by accumulating the third elements of the triples. Should the final
98/// accumulation be zero, the element is discarded.
99#[inline]
100pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
101    let length = consolidate_updates_slice(&mut vec[offset..]);
102    vec.truncate(offset + length);
103}
104
105/// Sorts and consolidates a slice, returning the valid prefix length.
106#[inline]
107pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
108
109    if slice.len() > 1 {
110        consolidate_updates_slice_slow(slice)
111    }
112    else {
113        slice.iter().filter(|x| !x.2.is_zero()).count()
114    }
115}
116
117/// Part of `consolidate_updates_slice` that handles slices of length greater than 1.
118fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
119    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
120    // In a world where there are not many results, we may never even need to call in to merge sort.
121    slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
122
123    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
124    let mut offset = 0;
125    let mut accum = slice[offset].2.clone();
126
127    for index in 1 .. slice.len() {
128        if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
129            accum.plus_equals(&slice[index].2);
130        }
131        else {
132            if !accum.is_zero() {
133                slice.swap(offset, index-1);
134                slice[offset].2.clone_from(&accum);
135                offset += 1;
136            }
137            accum.clone_from(&slice[index].2);
138        }
139    }
140    if !accum.is_zero() {
141        slice.swap(offset, slice.len()-1);
142        slice[offset].2 = accum;
143        offset += 1;
144    }
145
146    offset
147}
148
149
150/// A container builder that consolidates data in-places into fixed-sized containers. Does not
151/// maintain FIFO ordering.
152#[derive(Default)]
153pub struct ConsolidatingContainerBuilder<C>{
154    current: C,
155    empty: Vec<C>,
156    outbound: VecDeque<C>,
157}
158
159impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
160where
161    D: Data,
162    T: Data,
163    R: Semigroup+'static,
164{
165    /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
166    // TODO: Can we replace `multiple` by a bool?
167    #[cold]
168    fn consolidate_and_flush_through(&mut self, multiple: usize) {
169        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
170        consolidate_updates(&mut self.current);
171        let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
172        while drain.peek().is_some() {
173            let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
174            container.clear();
175            container.extend((&mut drain).take(preferred_capacity));
176            self.outbound.push_back(container);
177        }
178    }
179}
180
181impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
182where
183    D: Data,
184    T: Data,
185    R: Semigroup+'static,
186    Vec<(D, T, R)>: PushInto<P>,
187{
188    /// Push an element.
189    ///
190    /// Precondition: `current` is not allocated or has space for at least one element.
191    #[inline]
192    fn push_into(&mut self, item: P) {
193        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
194        if self.current.capacity() < preferred_capacity * 2 {
195            self.current.reserve(preferred_capacity * 2 - self.current.capacity());
196        }
197        self.current.push_into(item);
198        if self.current.len() == self.current.capacity() {
199            // Flush complete containers.
200            self.consolidate_and_flush_through(preferred_capacity);
201        }
202    }
203}
204
205impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
206where
207    D: Data,
208    T: Data,
209    R: Semigroup+'static,
210{
211    type Container = Vec<(D,T,R)>;
212
213    #[inline]
214    fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
215        if let Some(container) = self.outbound.pop_front() {
216            self.empty.push(container);
217            self.empty.last_mut()
218        } else {
219            None
220        }
221    }
222
223    #[inline]
224    fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
225        if !self.current.is_empty() {
226            // Flush all
227            self.consolidate_and_flush_through(1);
228            // Remove all but two elements from the stash of empty to avoid memory leaks. We retain
229            // two to match `current` capacity.
230            self.empty.truncate(2);
231        }
232        self.extract()
233    }
234}
235
236/// Layout of containers and their read items to be consolidated.
237///
238/// This trait specifies behavior to extract keys and diffs from container's read
239/// items. Consolidation accumulates the diffs per key.
240///
241/// The trait requires `Container` to have access to its `Item` GAT.
242pub trait ConsolidateLayout: Container {
243    /// Key portion of data, essentially everything minus the diff
244    type Key<'a>: Eq where Self: 'a;
245
246    /// GAT diff type.
247    type Diff<'a>;
248
249    /// Owned diff type.
250    type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
251
252    /// Converts a reference diff into an owned diff.
253    fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;
254
255    /// Deconstruct an item into key and diff. Must be cheap.
256    fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
257
258    /// Push an element to a compatible container.
259    ///
260    /// This function is odd to have, so let's explain why it exists. Ideally, the container
261    /// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
262    /// might never be in a position where this is true: Vectors can push any `T`, which would
263    /// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
264    ///
265    /// For this reason, we expose a function here that takes a GAT key and an owned diff, and
266    /// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
267    fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
268
269    /// Compare two items by key to sort containers.
270    fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
271
272    /// Consolidate the supplied container.
273    fn consolidate_into(&mut self, target: &mut Self) {
274        // Sort input data
275        let mut permutation = Vec::with_capacity(self.len());
276        permutation.extend(self.drain());
277        permutation.sort_by(|a, b| Self::cmp(a, b));
278
279        // Iterate over the data, accumulating diffs for like keys.
280        let mut iter = permutation.drain(..);
281        if let Some(item) = iter.next() {
282
283            let (k, d) = Self::into_parts(item);
284            let mut prev_key = k;
285            let mut prev_diff = Self::owned_diff(d);
286
287            for item in iter {
288                let (next_key, next_diff) = Self::into_parts(item);
289                if next_key == prev_key {
290                    prev_diff.plus_equals(&next_diff);
291                }
292                else {
293                    if !prev_diff.is_zero() {
294                        target.push_with_diff(prev_key, prev_diff);
295                    }
296                    prev_key = next_key;
297                    prev_diff = Self::owned_diff(next_diff);
298                }
299            }
300
301            if !prev_diff.is_zero() {
302                target.push_with_diff(prev_key, prev_diff);
303            }
304        }
305    }
306}
307
308impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
309where
310    D: Ord + Clone + 'static,
311    T: Ord + Clone + 'static,
312    R: Semigroup + Clone + 'static,
313{
314    type Key<'a> = (D, T) where Self: 'a;
315    type Diff<'a> = R where Self: 'a;
316    type DiffOwned = R;
317
318    fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }
319
320    fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
321        ((data, time), diff)
322    }
323
324    fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
325        (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
326    }
327
328    fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
329        self.push((data, time, diff));
330    }
331
332    /// Consolidate the supplied container.
333    fn consolidate_into(&mut self, target: &mut Self) {
334        consolidate_updates(self);
335        std::mem::swap(self, target);
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_consolidate() {
345        let test_cases = vec![
346            (
347                vec![("a", -1), ("b", -2), ("a", 1)],
348                vec![("b", -2)],
349            ),
350            (
351                vec![("a", -1), ("b", 0), ("a", 1)],
352                vec![],
353            ),
354            (
355                vec![("a", 0)],
356                vec![],
357            ),
358            (
359                vec![("a", 0), ("b", 0)],
360                vec![],
361            ),
362            (
363                vec![("a", 1), ("b", 1)],
364                vec![("a", 1), ("b", 1)],
365            ),
366        ];
367
368        for (mut input, output) in test_cases {
369            consolidate(&mut input);
370            assert_eq!(input, output);
371        }
372    }
373
374
375    #[test]
376    fn test_consolidate_updates() {
377        let test_cases = vec![
378            (
379                vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
380                vec![("b", 1, -2)],
381            ),
382            (
383                vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
384                vec![],
385            ),
386            (
387                vec![("a", 1, 0)],
388                vec![],
389            ),
390            (
391                vec![("a", 1, 0), ("b", 1, 0)],
392                vec![],
393            ),
394            (
395                vec![("a", 1, 1), ("b", 2, 1)],
396                vec![("a", 1, 1), ("b", 2, 1)],
397            ),
398        ];
399
400        for (mut input, output) in test_cases {
401            consolidate_updates(&mut input);
402            assert_eq!(input, output);
403        }
404    }
405
406    #[test]
407    fn test_consolidating_container_builder() {
408        let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
409        for _ in 0..1024 {
410            ccb.push_into((0, 0, 0));
411        }
412        assert_eq!(ccb.extract(), None);
413        assert_eq!(ccb.finish(), None);
414
415        for i in 0..1024 {
416            ccb.push_into((i, 0, 1));
417        }
418
419        let mut collected = Vec::default();
420        while let Some(container) = ccb.finish() {
421            collected.append(container);
422        }
423        // The output happens to be sorted, but it's not guaranteed.
424        collected.sort();
425        for i in 0..1024 {
426            assert_eq!((i, 0, 1), collected[i]);
427        }
428    }
429
430    #[test]
431    fn test_consolidate_into() {
432        let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
433        let mut target = Vec::default();
434        data.sort();
435        data.consolidate_into(&mut target);
436        assert_eq!(target, [(2, 1, 1)]);
437    }
438
439    #[cfg(not(debug_assertions))]
440    const LEN: usize = 256 << 10;
441    #[cfg(not(debug_assertions))]
442    const REPS: usize = 10 << 10;
443
444    #[cfg(debug_assertions)]
445    const LEN: usize = 256 << 1;
446    #[cfg(debug_assertions)]
447    const REPS: usize = 10 << 1;
448
449    #[test]
450    fn test_consolidator_duration() {
451        let mut data = Vec::with_capacity(LEN);
452        let mut data2 = Vec::with_capacity(LEN);
453        let mut target = Vec::new();
454        let mut duration = std::time::Duration::default();
455        for _ in 0..REPS {
456            data.clear();
457            data2.clear();
458            target.clear();
459            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
460            data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
461            data.sort_by(|x,y| x.0.cmp(&y.0));
462            let start = std::time::Instant::now();
463            data.consolidate_into(&mut target);
464            duration += start.elapsed();
465
466            consolidate_updates(&mut data2);
467            assert_eq!(target, data2);
468        }
469        println!("elapsed consolidator {duration:?}");
470    }
471
472    #[test]
473    fn test_consolidator_duration_vec() {
474        let mut data = Vec::with_capacity(LEN);
475        let mut duration = std::time::Duration::default();
476        for _ in 0..REPS {
477            data.clear();
478            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
479            data.sort_by(|x,y| x.0.cmp(&y.0));
480            let start = std::time::Instant::now();
481            consolidate_updates(&mut data);
482            duration += start.elapsed();
483        }
484        println!("elapsed vec {duration:?}");
485    }
486}