differential_dataflow/
consolidation.rs

1//! Common logic for the consolidation of vectors of Semigroups.
2//!
3//! Often we find ourselves with collections of records with associated weights (often
4//! integers) where we want to reduce the collection to the point that each record occurs
5//! at most once, with the accumulated weights. These methods supply that functionality.
6//!
7//! Importantly, these methods are used internally by differential dataflow, but are made
8//! public for the convenience of others. Their precise behavior is driven by the needs of
9//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
10//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
11//! specific behavior you require.
12
13use std::cmp::Ordering;
14use std::collections::VecDeque;
15use timely::container::{ContainerBuilder, DrainContainer, PushInto};
16use crate::Data;
17use crate::difference::{IsZero, Semigroup};
18
19/// Sorts and consolidates `vec`.
20///
21/// This method will sort `vec` and then consolidate runs of more than one entry with
22/// identical first elements by accumulating the second elements of the pairs. Should the final
23/// accumulation be zero, the element is discarded.
24#[inline]
25pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
26    consolidate_from(vec, 0);
27}
28
29/// Sorts and consolidate `vec[offset..]`.
30///
31/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
32/// identical first elements by accumulating the second elements of the pairs. Should the final
33/// accumulation be zero, the element is discarded.
34#[inline]
35pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
36    let length = consolidate_slice(&mut vec[offset..]);
37    vec.truncate(offset + length);
38}
39
40/// Sorts and consolidates a slice, returning the valid prefix length.
41#[inline]
42pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
43    if slice.len() > 1 {
44        consolidate_slice_slow(slice)
45    }
46    else {
47        slice.iter().filter(|x| !x.1.is_zero()).count()
48    }
49}
50
51/// Part of `consolidate_slice` that handles slices of length greater than 1.
52fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
53    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
54    // In a world where there are not many results, we may never even need to call in to merge sort.
55    slice.sort_by(|x,y| x.0.cmp(&y.0));
56
57    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
58    let mut offset = 0;
59    let mut accum = slice[offset].1.clone();
60
61    for index in 1 .. slice.len() {
62        if slice[index].0 == slice[index-1].0 {
63            accum.plus_equals(&slice[index].1);
64        }
65        else {
66            if !accum.is_zero() {
67                slice.swap(offset, index-1);
68                slice[offset].1.clone_from(&accum);
69                offset += 1;
70            }
71            accum.clone_from(&slice[index].1);
72        }
73    }
74    if !accum.is_zero() {
75        slice.swap(offset, slice.len()-1);
76        slice[offset].1 = accum;
77        offset += 1;
78    }
79
80    offset
81}
82
83/// Sorts and consolidates `vec`.
84///
85/// This method will sort `vec` and then consolidate runs of more than one entry with
86/// identical first two elements by accumulating the third elements of the triples. Should the final
87/// accumulation be zero, the element is discarded.
88#[inline]
89pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
90    consolidate_updates_from(vec, 0);
91}
92
93/// Sorts and consolidate `vec[offset..]`.
94///
95/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
96/// identical first two elements by accumulating the third elements of the triples. Should the final
97/// accumulation be zero, the element is discarded.
98#[inline]
99pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
100    let length = consolidate_updates_slice(&mut vec[offset..]);
101    vec.truncate(offset + length);
102}
103
104/// Sorts and consolidates a slice, returning the valid prefix length.
105#[inline]
106pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
107
108    if slice.len() > 1 {
109        consolidate_updates_slice_slow(slice)
110    }
111    else {
112        slice.iter().filter(|x| !x.2.is_zero()).count()
113    }
114}
115
116/// Part of `consolidate_updates_slice` that handles slices of length greater than 1.
117fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
118    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
119    // In a world where there are not many results, we may never even need to call in to merge sort.
120    slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
121
122    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
123    let mut offset = 0;
124    let mut accum = slice[offset].2.clone();
125
126    for index in 1 .. slice.len() {
127        if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
128            accum.plus_equals(&slice[index].2);
129        }
130        else {
131            if !accum.is_zero() {
132                slice.swap(offset, index-1);
133                slice[offset].2.clone_from(&accum);
134                offset += 1;
135            }
136            accum.clone_from(&slice[index].2);
137        }
138    }
139    if !accum.is_zero() {
140        slice.swap(offset, slice.len()-1);
141        slice[offset].2 = accum;
142        offset += 1;
143    }
144
145    offset
146}
147
148
149/// A container builder that consolidates data in-places into fixed-sized containers. Does not
150/// maintain FIFO ordering.
151#[derive(Default)]
152pub struct ConsolidatingContainerBuilder<C>{
153    current: C,
154    empty: Vec<C>,
155    outbound: VecDeque<C>,
156}
157
158impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
159where
160    D: Data,
161    T: Data,
162    R: Semigroup+'static,
163{
164    /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
165    // TODO: Can we replace `multiple` by a bool?
166    #[cold]
167    fn consolidate_and_flush_through(&mut self, multiple: usize) {
168        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
169        consolidate_updates(&mut self.current);
170        let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
171        while drain.peek().is_some() {
172            let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
173            container.clear();
174            container.extend((&mut drain).take(preferred_capacity));
175            self.outbound.push_back(container);
176        }
177    }
178}
179
180impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
181where
182    D: Data,
183    T: Data,
184    R: Semigroup+'static,
185    Vec<(D, T, R)>: PushInto<P>,
186{
187    /// Push an element.
188    ///
189    /// Precondition: `current` is not allocated or has space for at least one element.
190    #[inline]
191    fn push_into(&mut self, item: P) {
192        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
193        if self.current.capacity() < preferred_capacity * 2 {
194            self.current.reserve(preferred_capacity * 2 - self.current.capacity());
195        }
196        self.current.push_into(item);
197        if self.current.len() == self.current.capacity() {
198            // Flush complete containers.
199            self.consolidate_and_flush_through(preferred_capacity);
200        }
201    }
202}
203
204impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
205where
206    D: Data,
207    T: Data,
208    R: Semigroup+'static,
209{
210    type Container = Vec<(D,T,R)>;
211
212    #[inline]
213    fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
214        if let Some(container) = self.outbound.pop_front() {
215            self.empty.push(container);
216            self.empty.last_mut()
217        } else {
218            None
219        }
220    }
221
222    #[inline]
223    fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
224        if !self.current.is_empty() {
225            // Flush all
226            self.consolidate_and_flush_through(1);
227            // Remove all but two elements from the stash of empty to avoid memory leaks. We retain
228            // two to match `current` capacity.
229            self.empty.truncate(2);
230        }
231        self.extract()
232    }
233}
234
235/// Layout of containers and their read items to be consolidated.
236///
237/// This trait specifies behavior to extract keys and diffs from container's read
238/// items. Consolidation accumulates the diffs per key.
239///
240/// The trait requires `Container` to have access to its `Item` GAT.
241pub trait ConsolidateLayout: DrainContainer {
242    /// Key portion of data, essentially everything minus the diff
243    type Key<'a>: Eq where Self: 'a;
244
245    /// GAT diff type.
246    type Diff<'a>;
247
248    /// Owned diff type.
249    type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
250
251    /// Converts a reference diff into an owned diff.
252    fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;
253
254    /// Deconstruct an item into key and diff. Must be cheap.
255    fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
256
257    /// Push an element to a compatible container.
258    ///
259    /// This function is odd to have, so let's explain why it exists. Ideally, the container
260    /// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
261    /// might never be in a position where this is true: Vectors can push any `T`, which would
262    /// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
263    ///
264    /// For this reason, we expose a function here that takes a GAT key and an owned diff, and
265    /// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
266    fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
267
268    /// Compare two items by key to sort containers.
269    fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
270
271    /// Returns the number of items in the container.
272    fn len(&self) -> usize;
273
274    /// Clear the container. Afterwards, `len()` should return 0.
275    fn clear(&mut self);
276
277    /// Consolidate the supplied container.
278    fn consolidate_into(&mut self, target: &mut Self) {
279        // Sort input data
280        let mut permutation = Vec::with_capacity(self.len());
281        permutation.extend(self.drain());
282        permutation.sort_by(|a, b| Self::cmp(a, b));
283
284        // Iterate over the data, accumulating diffs for like keys.
285        let mut iter = permutation.drain(..);
286        if let Some(item) = iter.next() {
287
288            let (k, d) = Self::into_parts(item);
289            let mut prev_key = k;
290            let mut prev_diff = Self::owned_diff(d);
291
292            for item in iter {
293                let (next_key, next_diff) = Self::into_parts(item);
294                if next_key == prev_key {
295                    prev_diff.plus_equals(&next_diff);
296                }
297                else {
298                    if !prev_diff.is_zero() {
299                        target.push_with_diff(prev_key, prev_diff);
300                    }
301                    prev_key = next_key;
302                    prev_diff = Self::owned_diff(next_diff);
303                }
304            }
305
306            if !prev_diff.is_zero() {
307                target.push_with_diff(prev_key, prev_diff);
308            }
309        }
310    }
311}
312
313impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
314where
315    D: Ord + Clone + 'static,
316    T: Ord + Clone + 'static,
317    R: Semigroup + Clone + 'static,
318{
319    type Key<'a> = (D, T) where Self: 'a;
320    type Diff<'a> = R where Self: 'a;
321    type DiffOwned = R;
322
323    fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }
324
325    fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
326        ((data, time), diff)
327    }
328
329    fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
330        (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
331    }
332
333    fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
334        self.push((data, time, diff));
335    }
336
337    #[inline] fn len(&self) -> usize { Vec::len(self) }
338    #[inline] fn clear(&mut self) { Vec::clear(self) }
339
340    /// Consolidate the supplied container.
341    fn consolidate_into(&mut self, target: &mut Self) {
342        consolidate_updates(self);
343        std::mem::swap(self, target);
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_consolidate() {
353        let test_cases = vec![
354            (
355                vec![("a", -1), ("b", -2), ("a", 1)],
356                vec![("b", -2)],
357            ),
358            (
359                vec![("a", -1), ("b", 0), ("a", 1)],
360                vec![],
361            ),
362            (
363                vec![("a", 0)],
364                vec![],
365            ),
366            (
367                vec![("a", 0), ("b", 0)],
368                vec![],
369            ),
370            (
371                vec![("a", 1), ("b", 1)],
372                vec![("a", 1), ("b", 1)],
373            ),
374        ];
375
376        for (mut input, output) in test_cases {
377            consolidate(&mut input);
378            assert_eq!(input, output);
379        }
380    }
381
382
383    #[test]
384    fn test_consolidate_updates() {
385        let test_cases = vec![
386            (
387                vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
388                vec![("b", 1, -2)],
389            ),
390            (
391                vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
392                vec![],
393            ),
394            (
395                vec![("a", 1, 0)],
396                vec![],
397            ),
398            (
399                vec![("a", 1, 0), ("b", 1, 0)],
400                vec![],
401            ),
402            (
403                vec![("a", 1, 1), ("b", 2, 1)],
404                vec![("a", 1, 1), ("b", 2, 1)],
405            ),
406        ];
407
408        for (mut input, output) in test_cases {
409            consolidate_updates(&mut input);
410            assert_eq!(input, output);
411        }
412    }
413
414    #[test]
415    fn test_consolidating_container_builder() {
416        let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
417        for _ in 0..1024 {
418            ccb.push_into((0, 0, 0));
419        }
420        assert_eq!(ccb.extract(), None);
421        assert_eq!(ccb.finish(), None);
422
423        for i in 0..1024 {
424            ccb.push_into((i, 0, 1));
425        }
426
427        let mut collected = Vec::default();
428        while let Some(container) = ccb.finish() {
429            collected.append(container);
430        }
431        // The output happens to be sorted, but it's not guaranteed.
432        collected.sort();
433        for i in 0..1024 {
434            assert_eq!((i, 0, 1), collected[i]);
435        }
436    }
437
438    #[test]
439    fn test_consolidate_into() {
440        let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
441        let mut target = Vec::default();
442        data.sort();
443        data.consolidate_into(&mut target);
444        assert_eq!(target, [(2, 1, 1)]);
445    }
446
447    #[cfg(not(debug_assertions))]
448    const LEN: usize = 256 << 10;
449    #[cfg(not(debug_assertions))]
450    const REPS: usize = 10 << 10;
451
452    #[cfg(debug_assertions)]
453    const LEN: usize = 256 << 1;
454    #[cfg(debug_assertions)]
455    const REPS: usize = 10 << 1;
456
457    #[test]
458    fn test_consolidator_duration() {
459        let mut data = Vec::with_capacity(LEN);
460        let mut data2 = Vec::with_capacity(LEN);
461        let mut target = Vec::new();
462        let mut duration = std::time::Duration::default();
463        for _ in 0..REPS {
464            data.clear();
465            data2.clear();
466            target.clear();
467            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
468            data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
469            data.sort_by(|x,y| x.0.cmp(&y.0));
470            let start = std::time::Instant::now();
471            data.consolidate_into(&mut target);
472            duration += start.elapsed();
473
474            consolidate_updates(&mut data2);
475            assert_eq!(target, data2);
476        }
477        println!("elapsed consolidator {duration:?}");
478    }
479
480    #[test]
481    fn test_consolidator_duration_vec() {
482        let mut data = Vec::with_capacity(LEN);
483        let mut duration = std::time::Duration::default();
484        for _ in 0..REPS {
485            data.clear();
486            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
487            data.sort_by(|x,y| x.0.cmp(&y.0));
488            let start = std::time::Instant::now();
489            consolidate_updates(&mut data);
490            duration += start.elapsed();
491        }
492        println!("elapsed vec {duration:?}");
493    }
494}