1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22pub mod consolidate;
23
24use std::hash::Hash;
25
26use columnar::Borrow;
27use columnar::bytes::indexed;
28use columnar::common::IterOwn;
29use columnar::{Clear, FromBytes, Index, Len};
30use columnar::{Columnar, Ref};
31use differential_dataflow::Hashable;
32use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
33use timely::Accountable;
34use timely::bytes::arc::Bytes;
35use timely::container::{DrainContainer, PushInto, SizableContainer};
36use timely::dataflow::channels::ContainerBytes;
37
38use crate::columnation::{ColInternalMerger, ColumnationStack};
39
40pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
42 Column<((K, V), T, R)>,
43 batcher::Chunker<ColumnationStack<((K, V), T, R)>>,
44 ColInternalMerger<(K, V), T, R>,
45>;
46pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
48
49pub enum Column<C: Columnar> {
55 Typed(C::Container),
57 Bytes(Bytes),
59 Align(Vec<u64>),
66}
67
68impl<C: Columnar> Column<C> {
69 #[inline]
76 pub fn clear(&mut self) {
77 match self {
78 Column::Typed(t) => t.clear(),
79 Column::Bytes(_) | Column::Align(_) => *self = Default::default(),
80 }
81 }
82
83 #[inline]
85 pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
86 match self {
87 Column::Typed(t) => t.borrow(),
88 Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
89 &mut indexed::decode(bytemuck::cast_slice(b)),
90 ),
91 Column::Align(a) => {
92 <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
93 }
94 }
95 }
96}
97
98impl<C: Columnar> Default for Column<C> {
99 fn default() -> Self {
100 Self::Typed(Default::default())
101 }
102}
103
104impl<C: Columnar> Clone for Column<C>
105where
106 C::Container: Clone,
107{
108 fn clone(&self) -> Self {
109 match self {
110 Column::Typed(t) => Column::Typed(t.clone()),
113 Column::Bytes(b) => {
114 assert_eq!(b.len() % 8, 0);
115 Self::Align(bytemuck::allocation::pod_collect_to_vec(b))
116 }
117 Column::Align(a) => Column::Align(a.clone()),
118 }
119 }
120}
121
122impl<C: Columnar> Accountable for Column<C> {
123 #[inline]
124 fn record_count(&self) -> i64 {
125 self.borrow().len().try_into().expect("Must fit")
126 }
127}
128impl<C: Columnar> DrainContainer for Column<C> {
129 type Item<'a> = Ref<'a, C>;
130 type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
131 #[inline]
132 fn drain(&mut self) -> Self::DrainIter<'_> {
133 self.borrow().into_index_iter()
134 }
135}
136
137impl<C: Columnar, T> PushInto<T> for Column<C>
138where
139 C::Container: columnar::Push<T>,
140{
141 #[inline]
142 fn push_into(&mut self, item: T) {
143 use columnar::Push;
144 match self {
145 Column::Typed(t) => t.push(item),
146 Column::Align(_) | Column::Bytes(_) => {
147 unimplemented!("Pushing into Column::Bytes without first clearing");
150 }
151 }
152 }
153}
154
155const SHIP_WORDS: usize = 1 << 18;
160
161#[inline]
167pub(crate) fn at_serialized_capacity<'a, A>(borrow: &A) -> bool
168where
169 A: columnar::AsBytes<'a>,
170{
171 let words = indexed::length_in_words(borrow);
172 let round = (words + (SHIP_WORDS - 1)) & !(SHIP_WORDS - 1);
173 round - words < round / 10
174}
175
176impl<C: Columnar> SizableContainer for Column<C> {
177 fn at_capacity(&self) -> bool {
178 match self {
186 Column::Typed(c) => at_serialized_capacity(&c.borrow()),
187 Column::Bytes(_) | Column::Align(_) => true,
188 }
189 }
190
191 fn ensure_capacity(&mut self, _stash: &mut Option<Self>) {
192 }
199}
200
201impl<C: Columnar> ContainerBytes for Column<C> {
202 #[inline]
203 fn from_bytes(bytes: Bytes) -> Self {
204 assert_eq!(bytes.len() % 8, 0);
210 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
211 Self::Bytes(bytes)
212 } else {
213 Self::Align(bytemuck::allocation::pod_collect_to_vec(&bytes[..]))
216 }
217 }
218
219 #[inline]
220 fn length_in_bytes(&self) -> usize {
221 match self {
222 Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
223 Column::Bytes(b) => b.len(),
224 Column::Align(a) => 8 * a.len(),
225 }
226 }
227
228 #[inline]
229 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
230 match self {
231 Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
232 Column::Bytes(b) => writer.write_all(b).unwrap(),
233 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
234 }
235 }
236}
237
238#[inline(always)]
242pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
243where
244 K: Columnar,
245 for<'a> Ref<'a, K>: Hash,
246 V: Columnar,
247 D: Columnar,
248 T: Columnar,
249{
250 k.hashed()
251}
252
253#[cfg(test)]
254mod tests {
255 use timely::bytes::arc::BytesMut;
256 use timely::container::PushInto;
257 use timely::dataflow::channels::ContainerBytes;
258
259 use super::*;
260
261 fn raw_columnar_bytes() -> Vec<u8> {
263 let mut raw = Vec::new();
264 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
267 raw.extend(2_i32.to_le_bytes());
268 raw.extend(3_i32.to_le_bytes());
269 raw.extend([0, 0, 0, 0]); raw
271 }
272
273 #[mz_ore::test]
274 fn test_column_clone() {
275 let columns = Columnar::as_columns([1, 2, 3].iter());
276 let column_typed: Column<i32> = Column::Typed(columns);
277 let column_typed2 = column_typed.clone();
278
279 assert_eq!(
280 column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
281 vec![&1, &2, &3]
282 );
283
284 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
285 let column_bytes: Column<i32> = Column::Bytes(bytes);
286 let column_bytes2 = column_bytes.clone();
287
288 assert_eq!(
289 column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
290 vec![&1, &2, &3]
291 );
292
293 let raw = raw_columnar_bytes();
294 let mut region: Vec<u64> = vec![0; raw.len() / 8];
295 let region_bytes = bytemuck::cast_slice_mut(&mut region[..]);
296 region_bytes[..raw.len()].copy_from_slice(&raw);
297 let column_align: Column<i32> = Column::Align(region);
298 let column_align2 = column_align.clone();
299
300 assert_eq!(
301 column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
302 vec![&1, &2, &3]
303 );
304 }
305
306 #[mz_ore::test]
309 fn test_column_known_bytes() {
310 let mut column: Column<i32> = Default::default();
311 column.push_into(1);
312 column.push_into(2);
313 column.push_into(3);
314 let mut data = Vec::new();
315 column.into_bytes(&mut std::io::Cursor::new(&mut data));
316 assert_eq!(data, raw_columnar_bytes());
317 }
318
319 #[mz_ore::test]
320 fn test_column_from_bytes() {
321 let raw = raw_columnar_bytes();
322
323 let buf = vec![0; raw.len() + 8];
324 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
325 let mut bytes_mut = BytesMut::from(buf);
326 let _ = bytes_mut.extract_to(align);
327 bytes_mut[..raw.len()].copy_from_slice(&raw);
328 let aligned_bytes = bytes_mut.extract_to(raw.len());
329
330 let column: Column<i32> = Column::from_bytes(aligned_bytes);
331 assert!(matches!(column, Column::Bytes(_)));
332 assert_eq!(
333 column.borrow().into_index_iter().collect::<Vec<_>>(),
334 vec![&1, &2, &3]
335 );
336
337 let buf = vec![0; raw.len() + 8];
338 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
339 let mut bytes_mut = BytesMut::from(buf);
340 let _ = bytes_mut.extract_to(align + 1);
341 bytes_mut[..raw.len()].copy_from_slice(&raw);
342 let unaligned_bytes = bytes_mut.extract_to(raw.len());
343
344 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
345 assert!(matches!(column, Column::Align(_)));
346 assert_eq!(
347 column.borrow().into_index_iter().collect::<Vec<_>>(),
348 vec![&1, &2, &3]
349 );
350 }
351}