1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22pub mod consolidate;
23
24use std::hash::Hash;
25
26use columnar::Borrow;
27use columnar::bytes::indexed;
28use columnar::common::IterOwn;
29use columnar::{Columnar, Ref};
30use columnar::{FromBytes, Index, Len};
31use differential_dataflow::Hashable;
32use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
33use timely::Accountable;
34use timely::bytes::arc::Bytes;
35use timely::container::{DrainContainer, PushInto, SizableContainer};
36use timely::dataflow::channels::ContainerBytes;
37
38use crate::columnation::{ColInternalMerger, ColumnationStack};
39
40pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
42 Column<((K, V), T, R)>,
43 batcher::Chunker<ColumnationStack<((K, V), T, R)>>,
44 ColInternalMerger<(K, V), T, R>,
45>;
46pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
48
49pub enum Column<C: Columnar> {
55 Typed(C::Container),
57 Bytes(Bytes),
59 Align(Vec<u64>),
66}
67
68impl<C: Columnar> Column<C> {
69 #[inline]
71 pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
72 match self {
73 Column::Typed(t) => t.borrow(),
74 Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
75 &mut indexed::decode(bytemuck::cast_slice(b)),
76 ),
77 Column::Align(a) => {
78 <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
79 }
80 }
81 }
82}
83
84impl<C: Columnar> Default for Column<C> {
85 fn default() -> Self {
86 Self::Typed(Default::default())
87 }
88}
89
90impl<C: Columnar> Clone for Column<C>
91where
92 C::Container: Clone,
93{
94 fn clone(&self) -> Self {
95 match self {
96 Column::Typed(t) => Column::Typed(t.clone()),
99 Column::Bytes(b) => {
100 assert_eq!(b.len() % 8, 0);
101 Self::Align(bytemuck::allocation::pod_collect_to_vec(b))
102 }
103 Column::Align(a) => Column::Align(a.clone()),
104 }
105 }
106}
107
108impl<C: Columnar> Accountable for Column<C> {
109 #[inline]
110 fn record_count(&self) -> i64 {
111 self.borrow().len().try_into().expect("Must fit")
112 }
113}
114impl<C: Columnar> DrainContainer for Column<C> {
115 type Item<'a> = Ref<'a, C>;
116 type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
117 #[inline]
118 fn drain(&mut self) -> Self::DrainIter<'_> {
119 self.borrow().into_index_iter()
120 }
121}
122
123impl<C: Columnar, T> PushInto<T> for Column<C>
124where
125 C::Container: columnar::Push<T>,
126{
127 #[inline]
128 fn push_into(&mut self, item: T) {
129 use columnar::Push;
130 match self {
131 Column::Typed(t) => t.push(item),
132 Column::Align(_) | Column::Bytes(_) => {
133 unimplemented!("Pushing into Column::Bytes without first clearing");
136 }
137 }
138 }
139}
140
141const SHIP_WORDS: usize = 1 << 18;
146
147#[inline]
153pub(crate) fn at_serialized_capacity<'a, A>(borrow: &A) -> bool
154where
155 A: columnar::AsBytes<'a>,
156{
157 let words = indexed::length_in_words(borrow);
158 let round = (words + (SHIP_WORDS - 1)) & !(SHIP_WORDS - 1);
159 round - words < round / 10
160}
161
162impl<C: Columnar> SizableContainer for Column<C> {
163 fn at_capacity(&self) -> bool {
164 match self {
172 Column::Typed(c) => at_serialized_capacity(&c.borrow()),
173 Column::Bytes(_) | Column::Align(_) => true,
174 }
175 }
176
177 fn ensure_capacity(&mut self, _stash: &mut Option<Self>) {
178 }
185}
186
187impl<C: Columnar> ContainerBytes for Column<C> {
188 #[inline]
189 fn from_bytes(bytes: Bytes) -> Self {
190 assert_eq!(bytes.len() % 8, 0);
196 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
197 Self::Bytes(bytes)
198 } else {
199 Self::Align(bytemuck::allocation::pod_collect_to_vec(&bytes[..]))
202 }
203 }
204
205 #[inline]
206 fn length_in_bytes(&self) -> usize {
207 match self {
208 Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
209 Column::Bytes(b) => b.len(),
210 Column::Align(a) => 8 * a.len(),
211 }
212 }
213
214 #[inline]
215 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
216 match self {
217 Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
218 Column::Bytes(b) => writer.write_all(b).unwrap(),
219 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
220 }
221 }
222}
223
224#[inline(always)]
228pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
229where
230 K: Columnar,
231 for<'a> Ref<'a, K>: Hash,
232 V: Columnar,
233 D: Columnar,
234 T: Columnar,
235{
236 k.hashed()
237}
238
239#[cfg(test)]
240mod tests {
241 use timely::bytes::arc::BytesMut;
242 use timely::container::PushInto;
243 use timely::dataflow::channels::ContainerBytes;
244
245 use super::*;
246
247 fn raw_columnar_bytes() -> Vec<u8> {
249 let mut raw = Vec::new();
250 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
253 raw.extend(2_i32.to_le_bytes());
254 raw.extend(3_i32.to_le_bytes());
255 raw.extend([0, 0, 0, 0]); raw
257 }
258
259 #[mz_ore::test]
260 fn test_column_clone() {
261 let columns = Columnar::as_columns([1, 2, 3].iter());
262 let column_typed: Column<i32> = Column::Typed(columns);
263 let column_typed2 = column_typed.clone();
264
265 assert_eq!(
266 column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
267 vec![&1, &2, &3]
268 );
269
270 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
271 let column_bytes: Column<i32> = Column::Bytes(bytes);
272 let column_bytes2 = column_bytes.clone();
273
274 assert_eq!(
275 column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
276 vec![&1, &2, &3]
277 );
278
279 let raw = raw_columnar_bytes();
280 let mut region: Vec<u64> = vec![0; raw.len() / 8];
281 let region_bytes = bytemuck::cast_slice_mut(&mut region[..]);
282 region_bytes[..raw.len()].copy_from_slice(&raw);
283 let column_align: Column<i32> = Column::Align(region);
284 let column_align2 = column_align.clone();
285
286 assert_eq!(
287 column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
288 vec![&1, &2, &3]
289 );
290 }
291
292 #[mz_ore::test]
295 fn test_column_known_bytes() {
296 let mut column: Column<i32> = Default::default();
297 column.push_into(1);
298 column.push_into(2);
299 column.push_into(3);
300 let mut data = Vec::new();
301 column.into_bytes(&mut std::io::Cursor::new(&mut data));
302 assert_eq!(data, raw_columnar_bytes());
303 }
304
305 #[mz_ore::test]
306 fn test_column_from_bytes() {
307 let raw = raw_columnar_bytes();
308
309 let buf = vec![0; raw.len() + 8];
310 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
311 let mut bytes_mut = BytesMut::from(buf);
312 let _ = bytes_mut.extract_to(align);
313 bytes_mut[..raw.len()].copy_from_slice(&raw);
314 let aligned_bytes = bytes_mut.extract_to(raw.len());
315
316 let column: Column<i32> = Column::from_bytes(aligned_bytes);
317 assert!(matches!(column, Column::Bytes(_)));
318 assert_eq!(
319 column.borrow().into_index_iter().collect::<Vec<_>>(),
320 vec![&1, &2, &3]
321 );
322
323 let buf = vec![0; raw.len() + 8];
324 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
325 let mut bytes_mut = BytesMut::from(buf);
326 let _ = bytes_mut.extract_to(align + 1);
327 bytes_mut[..raw.len()].copy_from_slice(&raw);
328 let unaligned_bytes = bytes_mut.extract_to(raw.len());
329
330 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
331 assert!(matches!(column, Column::Align(_)));
332 assert_eq!(
333 column.borrow().into_index_iter().collect::<Vec<_>>(),
334 vec![&1, &2, &3]
335 );
336 }
337}