mz_timely_util/columnar/
batcher.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types for consolidating, merging, and extracting columnar update collections.
17
18use std::collections::VecDeque;
19
20use columnar::Columnar;
21use differential_dataflow::difference::Semigroup;
22use timely::Container;
23use timely::container::{ContainerBuilder, PushInto};
24
25use crate::columnar::Column;
26
27/// A chunker to transform input data into sorted columns.
28#[derive(Default)]
29pub struct Chunker<C> {
30    /// Buffer into which we'll consolidate.
31    ///
32    /// Also the buffer where we'll stage responses to `extract` and `finish`.
33    /// When these calls return, the buffer is available for reuse.
34    target: C,
35    /// Consolidated buffers ready to go.
36    ready: VecDeque<C>,
37}
38
39impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
40    type Container = C;
41
42    fn extract(&mut self) -> Option<&mut Self::Container> {
43        if let Some(ready) = self.ready.pop_front() {
44            self.target = ready;
45            Some(&mut self.target)
46        } else {
47            None
48        }
49    }
50
51    fn finish(&mut self) -> Option<&mut Self::Container> {
52        self.extract()
53    }
54}
55
56impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker<C2>
57where
58    D: Columnar,
59    for<'b> columnar::Ref<'b, D>: Ord + Copy,
60    T: Columnar,
61    for<'b> columnar::Ref<'b, T>: Ord + Copy,
62    R: Columnar + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
63    for<'b> columnar::Ref<'b, R>: Ord,
64    C2: Container + for<'b> PushInto<&'b (D, T, R)>,
65{
66    fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
67        // Sort input data
68        // TODO: consider `Vec<usize>` that we retain, containing indexes.
69        let mut permutation = Vec::with_capacity(container.len());
70        permutation.extend(container.drain());
71        permutation.sort();
72
73        self.target.clear();
74        // Iterate over the data, accumulating diffs for like keys.
75        let mut iter = permutation.drain(..);
76        if let Some((data, time, diff)) = iter.next() {
77            let mut owned_data = D::into_owned(data);
78            let mut owned_time = T::into_owned(time);
79
80            let mut prev_data = data;
81            let mut prev_time = time;
82            let mut prev_diff = <R as Columnar>::into_owned(diff);
83
84            for (data, time, diff) in iter {
85                if (&prev_data, &prev_time) == (&data, &time) {
86                    prev_diff.plus_equals(&diff);
87                } else {
88                    if !prev_diff.is_zero() {
89                        D::copy_from(&mut owned_data, prev_data);
90                        T::copy_from(&mut owned_time, prev_time);
91                        let tuple = (owned_data, owned_time, prev_diff);
92                        self.target.push_into(&tuple);
93                        (owned_data, owned_time, prev_diff) = tuple;
94                    }
95                    prev_data = data;
96                    prev_time = time;
97                    R::copy_from(&mut prev_diff, diff);
98                }
99            }
100
101            if !prev_diff.is_zero() {
102                D::copy_from(&mut owned_data, prev_data);
103                T::copy_from(&mut owned_time, prev_time);
104                let tuple = (owned_data, owned_time, prev_diff);
105                self.target.push_into(&tuple);
106            }
107        }
108
109        if !self.target.is_empty() {
110            self.ready.push_back(std::mem::take(&mut self.target));
111        }
112    }
113}