differential_dataflow/
consolidation.rs

1//! Common logic for the consolidation of vectors of Semigroups.
2//!
3//! Often we find ourselves with collections of records with associated weights (often
4//! integers) where we want to reduce the collection to the point that each record occurs
5//! at most once, with the accumulated weights. These methods supply that functionality.
6//!
7//! Importantly, these methods are used internally by differential dataflow, but are made
8//! public for the convenience of others. Their precise behavior is driven by the needs of
9//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
10//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
11//! specific behavior you require.
12
13use std::cmp::Ordering;
14use std::collections::VecDeque;
15use timely::Container;
16use timely::container::{ContainerBuilder, PushInto};
17use crate::{IntoOwned, Data};
18use crate::difference::{IsZero, Semigroup};
19
20/// Sorts and consolidates `vec`.
21///
22/// This method will sort `vec` and then consolidate runs of more than one entry with
23/// identical first elements by accumulating the second elements of the pairs. Should the final
24/// accumulation be zero, the element is discarded.
25pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
26    consolidate_from(vec, 0);
27}
28
29/// Sorts and consolidate `vec[offset..]`.
30///
31/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
32/// identical first elements by accumulating the second elements of the pairs. Should the final
33/// accumulation be zero, the element is discarded.
34pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
35    let length = consolidate_slice(&mut vec[offset..]);
36    vec.truncate(offset + length);
37}
38
39/// Sorts and consolidates a slice, returning the valid prefix length.
40pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
41
42    if slice.len() > 1 {
43
44        // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
45        // In a world where there are not many results, we may never even need to call in to merge sort.
46        slice.sort_by(|x,y| x.0.cmp(&y.0));
47
48        // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
49        let mut offset = 0;
50        let mut accum = slice[offset].1.clone();
51
52        for index in 1 .. slice.len() {
53            if slice[index].0 == slice[index-1].0 {
54                accum.plus_equals(&slice[index].1);
55            }
56            else {
57                if !accum.is_zero() {
58                    slice.swap(offset, index-1);
59                    slice[offset].1.clone_from(&accum);
60                    offset += 1;
61                }
62                accum.clone_from(&slice[index].1);
63            }
64        }
65        if !accum.is_zero() {
66            slice.swap(offset, slice.len()-1);
67            slice[offset].1 = accum;
68            offset += 1;
69        }
70
71        offset
72    }
73    else {
74        slice.iter().filter(|x| !x.1.is_zero()).count()
75    }
76}
77
78/// Sorts and consolidates `vec`.
79///
80/// This method will sort `vec` and then consolidate runs of more than one entry with
81/// identical first two elements by accumulating the third elements of the triples. Should the final
82/// accumulation be zero, the element is discarded.
83pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
84    consolidate_updates_from(vec, 0);
85}
86
87/// Sorts and consolidate `vec[offset..]`.
88///
89/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
90/// identical first two elements by accumulating the third elements of the triples. Should the final
91/// accumulation be zero, the element is discarded.
92pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
93    let length = consolidate_updates_slice(&mut vec[offset..]);
94    vec.truncate(offset + length);
95}
96
97/// Sorts and consolidates a slice, returning the valid prefix length.
98pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
99
100    if slice.len() > 1 {
101
102        // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
103        // In a world where there are not many results, we may never even need to call in to merge sort.
104        slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
105
106        // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
107        let mut offset = 0;
108        let mut accum = slice[offset].2.clone();
109
110        for index in 1 .. slice.len() {
111            if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
112                accum.plus_equals(&slice[index].2);
113            }
114            else {
115                if !accum.is_zero() {
116                    slice.swap(offset, index-1);
117                    slice[offset].2.clone_from(&accum);
118                    offset += 1;
119                }
120                accum.clone_from(&slice[index].2);
121            }
122        }
123        if !accum.is_zero() {
124            slice.swap(offset, slice.len()-1);
125            slice[offset].2 = accum;
126            offset += 1;
127        }
128
129        offset
130    }
131    else {
132        slice.iter().filter(|x| !x.2.is_zero()).count()
133    }
134}
135
136
137/// A container builder that consolidates data in-places into fixed-sized containers. Does not
138/// maintain FIFO ordering.
139#[derive(Default)]
140pub struct ConsolidatingContainerBuilder<C>{
141    current: C,
142    empty: Vec<C>,
143    outbound: VecDeque<C>,
144}
145
146impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
147where
148    D: Data,
149    T: Data,
150    R: Semigroup+'static,
151{
152    /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
153    // TODO: Can we replace `multiple` by a bool?
154    #[cold]
155    fn consolidate_and_flush_through(&mut self, multiple: usize) {
156        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
157        consolidate_updates(&mut self.current);
158        let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
159        while drain.peek().is_some() {
160            let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
161            container.clear();
162            container.extend((&mut drain).take(preferred_capacity));
163            self.outbound.push_back(container);
164        }
165    }
166}
167
168impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
169where
170    D: Data,
171    T: Data,
172    R: Semigroup+'static,
173    Vec<(D, T, R)>: PushInto<P>,
174{
175    /// Push an element.
176    ///
177    /// Precondition: `current` is not allocated or has space for at least one element.
178    #[inline]
179    fn push_into(&mut self, item: P) {
180        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
181        if self.current.capacity() < preferred_capacity * 2 {
182            self.current.reserve(preferred_capacity * 2 - self.current.capacity());
183        }
184        self.current.push_into(item);
185        if self.current.len() == self.current.capacity() {
186            // Flush complete containers.
187            self.consolidate_and_flush_through(preferred_capacity);
188        }
189    }
190}
191
192impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
193where
194    D: Data,
195    T: Data,
196    R: Semigroup+'static,
197{
198    type Container = Vec<(D,T,R)>;
199
200    #[inline]
201    fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
202        if let Some(container) = self.outbound.pop_front() {
203            self.empty.push(container);
204            self.empty.last_mut()
205        } else {
206            None
207        }
208    }
209
210    #[inline]
211    fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
212        if !self.current.is_empty() {
213            // Flush all
214            self.consolidate_and_flush_through(1);
215            // Remove all but two elements from the stash of empty to avoid memory leaks. We retain
216            // two to match `current` capacity.
217            self.empty.truncate(2);
218        }
219        self.extract()
220    }
221}
222
223/// Layout of containers and their read items to be consolidated.
224///
225/// This trait specifies behavior to extract keys and diffs from container's read
226/// items. Consolidation accumulates the diffs per key.
227///
228/// The trait requires `Container` to have access to its `Item` GAT.
229pub trait ConsolidateLayout: Container {
230    /// Key portion of data, essentially everything minus the diff
231    type Key<'a>: Eq where Self: 'a;
232
233    /// GAT diff type.
234    type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;
235
236    /// Owned diff type.
237    type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
238
239    /// Deconstruct an item into key and diff. Must be cheap.
240    fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
241
242    /// Push an element to a compatible container.
243    ///
244    /// This function is odd to have, so let's explain why it exists. Ideally, the container
245    /// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
246    /// might never be in a position where this is true: Vectors can push any `T`, which would
247    /// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
248    ///
249    /// For this reason, we expose a function here that takes a GAT key and an owned diff, and
250    /// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
251    fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
252
253    /// Compare two items by key to sort containers.
254    fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
255
256    /// Consolidate the supplied container.
257    fn consolidate_into(&mut self, target: &mut Self) {
258        // Sort input data
259        let mut permutation = Vec::with_capacity(self.len());
260        permutation.extend(self.drain());
261        permutation.sort_by(|a, b| Self::cmp(a, b));
262
263        // Iterate over the data, accumulating diffs for like keys.
264        let mut iter = permutation.drain(..);
265        if let Some(item) = iter.next() {
266
267            let (k, d) = Self::into_parts(item);
268            let mut prev_key = k;
269            let mut prev_diff = d.into_owned();
270
271            for item in iter {
272                let (next_key, next_diff) = Self::into_parts(item);
273                if next_key == prev_key {
274                    prev_diff.plus_equals(&next_diff);
275                }
276                else {
277                    if !prev_diff.is_zero() {
278                        target.push_with_diff(prev_key, prev_diff);
279                    }
280                    prev_key = next_key;
281                    prev_diff = next_diff.into_owned();
282                }
283            }
284
285            if !prev_diff.is_zero() {
286                target.push_with_diff(prev_key, prev_diff);
287            }
288        }
289    }
290}
291
292impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
293where
294    D: Ord + Clone + 'static,
295    T: Ord + Clone + 'static,
296    for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
297{
298    type Key<'a> = (D, T) where Self: 'a;
299    type Diff<'a> = R where Self: 'a;
300    type DiffOwned = R;
301
302    fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
303        ((data, time), diff)
304    }
305
306    fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
307        (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
308    }
309
310    fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
311        self.push((data, time, diff));
312    }
313
314    /// Consolidate the supplied container.
315    fn consolidate_into(&mut self, target: &mut Self) {
316        consolidate_updates(self);
317        std::mem::swap(self, target);
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_consolidate() {
327        let test_cases = vec![
328            (
329                vec![("a", -1), ("b", -2), ("a", 1)],
330                vec![("b", -2)],
331            ),
332            (
333                vec![("a", -1), ("b", 0), ("a", 1)],
334                vec![],
335            ),
336            (
337                vec![("a", 0)],
338                vec![],
339            ),
340            (
341                vec![("a", 0), ("b", 0)],
342                vec![],
343            ),
344            (
345                vec![("a", 1), ("b", 1)],
346                vec![("a", 1), ("b", 1)],
347            ),
348        ];
349
350        for (mut input, output) in test_cases {
351            consolidate(&mut input);
352            assert_eq!(input, output);
353        }
354    }
355
356
357    #[test]
358    fn test_consolidate_updates() {
359        let test_cases = vec![
360            (
361                vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
362                vec![("b", 1, -2)],
363            ),
364            (
365                vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
366                vec![],
367            ),
368            (
369                vec![("a", 1, 0)],
370                vec![],
371            ),
372            (
373                vec![("a", 1, 0), ("b", 1, 0)],
374                vec![],
375            ),
376            (
377                vec![("a", 1, 1), ("b", 2, 1)],
378                vec![("a", 1, 1), ("b", 2, 1)],
379            ),
380        ];
381
382        for (mut input, output) in test_cases {
383            consolidate_updates(&mut input);
384            assert_eq!(input, output);
385        }
386    }
387
388    #[test]
389    fn test_consolidating_container_builder() {
390        let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
391        for _ in 0..1024 {
392            ccb.push_into((0, 0, 0));
393        }
394        assert_eq!(ccb.extract(), None);
395        assert_eq!(ccb.finish(), None);
396
397        for i in 0..1024 {
398            ccb.push_into((i, 0, 1));
399        }
400
401        let mut collected = Vec::default();
402        while let Some(container) = ccb.finish() {
403            collected.append(container);
404        }
405        // The output happens to be sorted, but it's not guaranteed.
406        collected.sort();
407        for i in 0..1024 {
408            assert_eq!((i, 0, 1), collected[i]);
409        }
410    }
411
412    #[test]
413    fn test_consolidate_into() {
414        let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
415        let mut target = Vec::default();
416        data.sort();
417        data.consolidate_into(&mut target);
418        assert_eq!(target, [(2, 1, 1)]);
419    }
420
421    #[cfg(not(debug_assertions))]
422    const LEN: usize = 256 << 10;
423    #[cfg(not(debug_assertions))]
424    const REPS: usize = 10 << 10;
425
426    #[cfg(debug_assertions)]
427    const LEN: usize = 256 << 1;
428    #[cfg(debug_assertions)]
429    const REPS: usize = 10 << 1;
430
431    #[test]
432    fn test_consolidator_duration() {
433        let mut data = Vec::with_capacity(LEN);
434        let mut data2 = Vec::with_capacity(LEN);
435        let mut target = Vec::new();
436        let mut duration = std::time::Duration::default();
437        for _ in 0..REPS {
438            data.clear();
439            data2.clear();
440            target.clear();
441            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
442            data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
443            data.sort_by(|x,y| x.0.cmp(&y.0));
444            let start = std::time::Instant::now();
445            data.consolidate_into(&mut target);
446            duration += start.elapsed();
447
448            consolidate_updates(&mut data2);
449            assert_eq!(target, data2);
450        }
451        println!("elapsed consolidator {duration:?}");
452    }
453
454    #[test]
455    fn test_consolidator_duration_vec() {
456        let mut data = Vec::with_capacity(LEN);
457        let mut duration = std::time::Duration::default();
458        for _ in 0..REPS {
459            data.clear();
460            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
461            data.sort_by(|x,y| x.0.cmp(&y.0));
462            let start = std::time::Instant::now();
463            consolidate_updates(&mut data);
464            duration += start.elapsed();
465        }
466        println!("elapsed vec {duration:?}");
467    }
468}