Skip to main content

mz_timely_util/columnar/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! A container builder for columns.
17
18use std::collections::VecDeque;
19
20use columnar::bytes::indexed;
21use columnar::{Clear, Columnar, Len, Push};
22use timely::container::PushInto;
23use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
24
25use crate::columnar::Column;
26
27/// A container builder for `Column<C>`.
28pub struct ColumnBuilder<C: Columnar> {
29    /// Container that we're writing to.
30    current: C::Container,
31    /// Finished container that we presented to callers of extract/finish.
32    ///
33    /// We don't recycle the column because for extract, it's not typed, and after calls
34    /// to finish it'll be `None`.
35    finished: Option<Column<C>>,
36    /// Completed containers pending to be sent.
37    pending: VecDeque<Column<C>>,
38}
39
40impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C>
41where
42    C::Container: Push<T>,
43{
44    #[inline]
45    fn push_into(&mut self, item: T) {
46        self.current.push(item);
47        // If there is less than 10% slop with 2MB backing allocations, mint a container.
48        use columnar::Borrow;
49        let words = indexed::length_in_words(&self.current.borrow());
50        let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
51        if round - words < round / 10 {
52            /// Move the contents from `current` to a `Vec<u64>` allocation built via
53            /// `indexed::encode` (so no zero-init pre-pass), and push it to `pending`.
54            #[cold]
55            fn outlined_align<C>(
56                current: &mut C::Container,
57                words: usize,
58                pending: &mut VecDeque<Column<C>>,
59            ) where
60                C: Columnar,
61            {
62                let mut alloc: Vec<u64> = Vec::with_capacity(words);
63                indexed::encode(&mut alloc, &current.borrow());
64                pending.push_back(Column::Align(alloc));
65                current.clear();
66            }
67
68            outlined_align(&mut self.current, words, &mut self.pending);
69        }
70    }
71}
72
73impl<C: Columnar> Default for ColumnBuilder<C> {
74    #[inline]
75    fn default() -> Self {
76        ColumnBuilder {
77            current: Default::default(),
78            finished: None,
79            pending: Default::default(),
80        }
81    }
82}
83
84impl<C: Columnar> ContainerBuilder for ColumnBuilder<C>
85where
86    C::Container: Clone,
87{
88    type Container = Column<C>;
89
90    #[inline]
91    fn extract(&mut self) -> Option<&mut Self::Container> {
92        if let Some(container) = self.pending.pop_front() {
93            self.finished = Some(container);
94            self.finished.as_mut()
95        } else {
96            None
97        }
98    }
99
100    #[inline]
101    fn finish(&mut self) -> Option<&mut Self::Container> {
102        if !self.current.is_empty() {
103            self.pending
104                .push_back(Column::Typed(std::mem::take(&mut self.current)));
105        }
106        self.finished = self.pending.pop_front();
107        self.finished.as_mut()
108    }
109}
110
111impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone {}