mz_timely_util/columnar/
batcher.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types for consolidating, merging, and extracting columnar update collections.
17
18use std::collections::VecDeque;
19
20use columnar::{Columnar, Index, Len};
21use columnation::Columnation;
22use differential_dataflow::containers::TimelyStack;
23use differential_dataflow::difference::Semigroup;
24use timely::Container;
25use timely::container::{ContainerBuilder, PushInto};
26
27use crate::columnar::Column;
28
29/// A chunker to transform input data into sorted columns.
30#[derive(Default)]
31pub struct Chunker<C> {
32    /// Buffer into which we'll consolidate.
33    ///
34    /// Also the buffer where we'll stage responses to `extract` and `finish`.
35    /// When these calls return, the buffer is available for reuse.
36    target: C,
37    /// Consolidated buffers ready to go.
38    ready: VecDeque<C>,
39}
40
41impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
42    type Container = C;
43
44    fn extract(&mut self) -> Option<&mut Self::Container> {
45        if let Some(ready) = self.ready.pop_front() {
46            self.target = ready;
47            Some(&mut self.target)
48        } else {
49            None
50        }
51    }
52
53    fn finish(&mut self) -> Option<&mut Self::Container> {
54        self.extract()
55    }
56}
57
58impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for Chunker<TimelyStack<(D, T, R)>>
59where
60    D: Columnar + Columnation,
61    for<'b> columnar::Ref<'b, D>: Ord + Copy,
62    T: Columnar + Columnation,
63    for<'b> columnar::Ref<'b, T>: Ord + Copy,
64    R: Columnar + Columnation + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
65    for<'b> columnar::Ref<'b, R>: Ord,
66    // C2: Container + for<'b> PushInto<&'b (D, T, R)>,
67{
68    fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
69        // Sort input data
70        // TODO: consider `Vec<usize>` that we retain, containing indexes.
71        let borrowed = container.borrow();
72        let mut permutation = Vec::with_capacity(borrowed.len());
73        permutation.extend(borrowed.into_index_iter());
74        permutation.sort();
75
76        self.target.clear();
77        // Iterate over the data, accumulating diffs for like keys.
78        let mut iter = permutation.drain(..);
79        if let Some((data, time, diff)) = iter.next() {
80            let mut owned_data = D::into_owned(data);
81            let mut owned_time = T::into_owned(time);
82
83            let mut prev_data = data;
84            let mut prev_time = time;
85            let mut prev_diff = <R as Columnar>::into_owned(diff);
86
87            for (data, time, diff) in iter {
88                if (&prev_data, &prev_time) == (&data, &time) {
89                    prev_diff.plus_equals(&diff);
90                } else {
91                    if !prev_diff.is_zero() {
92                        D::copy_from(&mut owned_data, prev_data);
93                        T::copy_from(&mut owned_time, prev_time);
94                        let tuple = (owned_data, owned_time, prev_diff);
95                        self.target.push_into(&tuple);
96                        (owned_data, owned_time, prev_diff) = tuple;
97                    }
98                    prev_data = data;
99                    prev_time = time;
100                    R::copy_from(&mut prev_diff, diff);
101                }
102            }
103
104            if !prev_diff.is_zero() {
105                D::copy_from(&mut owned_data, prev_data);
106                T::copy_from(&mut owned_time, prev_time);
107                let tuple = (owned_data, owned_time, prev_diff);
108                self.target.push_into(&tuple);
109            }
110        }
111
112        if !self.target.is_empty() {
113            self.ready.push_back(std::mem::take(&mut self.target));
114        }
115    }
116}