mz_timely_util/columnar/
builder.rs1use std::collections::VecDeque;
19
20use columnar::bytes::{EncodeDecode, Indexed};
21use columnar::{Clear, Columnar, Len, Push};
22use timely::container::PushInto;
23use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
24
25use crate::columnar::Column;
26
27pub struct ColumnBuilder<C: Columnar> {
29 current: C::Container,
31 finished: Option<Column<C>>,
36 pending: VecDeque<Column<C>>,
38}
39
40impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C>
41where
42 C::Container: Push<T>,
43{
44 #[inline]
45 fn push_into(&mut self, item: T) {
46 self.current.push(item);
47 use columnar::Container;
49 let words = Indexed::length_in_words(&self.current.borrow());
50 let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
51 if round - words < round / 10 {
52 #[cold]
55 fn outlined_align<C>(
56 current: &mut C::Container,
57 round: usize,
58 pending: &mut VecDeque<Column<C>>,
59 ) where
60 C: Columnar,
61 {
62 let mut alloc = crate::containers::alloc_aligned_zeroed(round);
63 let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
64 Indexed::write(writer, ¤t.borrow()).unwrap();
65 pending.push_back(Column::Align(alloc));
66 current.clear();
67 }
68
69 outlined_align(&mut self.current, round, &mut self.pending);
70 }
71 }
72}
73
74impl<C: Columnar> Default for ColumnBuilder<C> {
75 #[inline]
76 fn default() -> Self {
77 ColumnBuilder {
78 current: Default::default(),
79 finished: None,
80 pending: Default::default(),
81 }
82 }
83}
84
85impl<C: Columnar> ContainerBuilder for ColumnBuilder<C>
86where
87 C::Container: Clone,
88{
89 type Container = Column<C>;
90
91 #[inline]
92 fn extract(&mut self) -> Option<&mut Self::Container> {
93 if let Some(container) = self.pending.pop_front() {
94 self.finished = Some(container);
95 self.finished.as_mut()
96 } else {
97 None
98 }
99 }
100
101 #[inline]
102 fn finish(&mut self) -> Option<&mut Self::Container> {
103 if !self.current.is_empty() {
104 self.pending
105 .push_back(Column::Typed(std::mem::take(&mut self.current)));
106 }
107 self.finished = self.pending.pop_front();
108 self.finished.as_mut()
109 }
110}
111
112impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone {}