mz_timely_util/columnar/
builder.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! A container builder for columns.
17
18use std::collections::VecDeque;
19
20use columnar::bytes::{EncodeDecode, Indexed};
21use columnar::{Clear, Columnar, Len, Push};
22use timely::container::PushInto;
23use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
24
25use crate::columnar::Column;
26
27/// A container builder for `Column<C>`.
28pub struct ColumnBuilder<C: Columnar> {
29    /// Container that we're writing to.
30    current: C::Container,
31    /// Finished container that we presented to callers of extract/finish.
32    ///
33    /// We don't recycle the column because for extract, it's not typed, and after calls
34    /// to finish it'll be `None`.
35    finished: Option<Column<C>>,
36    /// Completed containers pending to be sent.
37    pending: VecDeque<Column<C>>,
38}
39
40impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C>
41where
42    C::Container: Push<T>,
43{
44    #[inline]
45    fn push_into(&mut self, item: T) {
46        self.current.push(item);
47        // If there is less than 10% slop with 2MB backing allocations, mint a container.
48        use columnar::Container;
49        let words = Indexed::length_in_words(&self.current.borrow());
50        let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
51        if round - words < round / 10 {
52            /// Move the contents from `current` to an aligned allocation, and push it to `pending`.
53            /// The contents must fit in `round` words (u64).
54            #[cold]
55            fn outlined_align<C>(
56                current: &mut C::Container,
57                round: usize,
58                pending: &mut VecDeque<Column<C>>,
59            ) where
60                C: Columnar,
61            {
62                let mut alloc = crate::containers::alloc_aligned_zeroed(round);
63                let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
64                Indexed::write(writer, &current.borrow()).unwrap();
65                pending.push_back(Column::Align(alloc));
66                current.clear();
67            }
68
69            outlined_align(&mut self.current, round, &mut self.pending);
70        }
71    }
72}
73
74impl<C: Columnar> Default for ColumnBuilder<C> {
75    #[inline]
76    fn default() -> Self {
77        ColumnBuilder {
78            current: Default::default(),
79            finished: None,
80            pending: Default::default(),
81        }
82    }
83}
84
85impl<C: Columnar> ContainerBuilder for ColumnBuilder<C>
86where
87    C::Container: Clone,
88{
89    type Container = Column<C>;
90
91    #[inline]
92    fn extract(&mut self) -> Option<&mut Self::Container> {
93        if let Some(container) = self.pending.pop_front() {
94            self.finished = Some(container);
95            self.finished.as_mut()
96        } else {
97            None
98        }
99    }
100
101    #[inline]
102    fn finish(&mut self) -> Option<&mut Self::Container> {
103        if !self.current.is_empty() {
104            self.pending
105                .push_back(Column::Typed(std::mem::take(&mut self.current)));
106        }
107        self.finished = self.pending.pop_front();
108        self.finished.as_mut()
109    }
110}
111
112impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone {}