1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22pub mod builder_input;
23pub mod consolidate;
24pub mod merge_batcher;
25
26use std::hash::Hash;
27
28use columnar::Borrow;
29use columnar::bytes::indexed;
30use columnar::common::IterOwn;
31use columnar::{Clear, FromBytes, Index, Len};
32use columnar::{Columnar, Ref};
33use differential_dataflow::Hashable;
34use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
35use timely::Accountable;
36use timely::bytes::arc::Bytes;
37use timely::container::{DrainContainer, PushInto, SizableContainer};
38use timely::dataflow::channels::ContainerBytes;
39
40use crate::columnation::ColInternalMerger;
41
42pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<ColInternalMerger<(K, V), T, R>>;
49pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
51
52pub type Col2ValPagedBatcher<K, V, T, R> = merge_batcher::ColumnMergeBatcher<(K, V), T, R>;
64
65pub enum Column<C: Columnar> {
71 Typed(C::Container),
73 Bytes(Bytes),
75 Align(Vec<u64>),
82}
83
84impl<C: Columnar> Column<C> {
85 #[inline]
92 pub fn clear(&mut self) {
93 match self {
94 Column::Typed(t) => t.clear(),
95 Column::Bytes(_) | Column::Align(_) => *self = Default::default(),
96 }
97 }
98
99 #[inline]
101 pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
102 match self {
103 Column::Typed(t) => t.borrow(),
104 Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
105 &mut indexed::decode(bytemuck::cast_slice(b)),
106 ),
107 Column::Align(a) => {
108 <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
109 }
110 }
111 }
112}
113
114impl<C: Columnar> Default for Column<C> {
115 fn default() -> Self {
116 Self::Typed(Default::default())
117 }
118}
119
120impl<C: Columnar> Clone for Column<C>
121where
122 C::Container: Clone,
123{
124 fn clone(&self) -> Self {
125 match self {
126 Column::Typed(t) => Column::Typed(t.clone()),
129 Column::Bytes(b) => {
130 assert_eq!(b.len() % 8, 0);
131 Self::Align(bytemuck::allocation::pod_collect_to_vec(b))
132 }
133 Column::Align(a) => Column::Align(a.clone()),
134 }
135 }
136}
137
138impl<C: Columnar> Accountable for Column<C> {
139 #[inline]
140 fn record_count(&self) -> i64 {
141 self.borrow().len().try_into().expect("Must fit")
142 }
143}
144impl<C: Columnar> DrainContainer for Column<C> {
145 type Item<'a> = Ref<'a, C>;
146 type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
147 #[inline]
148 fn drain(&mut self) -> Self::DrainIter<'_> {
149 self.borrow().into_index_iter()
150 }
151}
152
153impl<C: Columnar, T> PushInto<T> for Column<C>
154where
155 C::Container: columnar::Push<T>,
156{
157 #[inline]
158 fn push_into(&mut self, item: T) {
159 use columnar::Push;
160 match self {
161 Column::Typed(t) => t.push(item),
162 Column::Align(_) | Column::Bytes(_) => {
163 unimplemented!("Pushing into Column::Bytes without first clearing");
166 }
167 }
168 }
169}
170
171const SHIP_WORDS: usize = 1 << 18;
176
177#[inline]
183pub(crate) fn at_serialized_capacity<'a, A>(borrow: &A) -> bool
184where
185 A: columnar::AsBytes<'a>,
186{
187 let words = indexed::length_in_words(borrow);
188 let round = (words + (SHIP_WORDS - 1)) & !(SHIP_WORDS - 1);
189 round - words < round / 10
190}
191
192impl<C: Columnar> SizableContainer for Column<C> {
193 fn at_capacity(&self) -> bool {
194 match self {
202 Column::Typed(c) => at_serialized_capacity(&c.borrow()),
203 Column::Bytes(_) | Column::Align(_) => true,
204 }
205 }
206
207 fn ensure_capacity(&mut self, _stash: &mut Option<Self>) {
208 }
215}
216
217impl<C: Columnar> ContainerBytes for Column<C> {
218 #[inline]
219 fn from_bytes(bytes: Bytes) -> Self {
220 assert_eq!(bytes.len() % 8, 0);
226 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
227 Self::Bytes(bytes)
228 } else {
229 Self::Align(bytemuck::allocation::pod_collect_to_vec(&bytes[..]))
232 }
233 }
234
235 #[inline]
236 fn length_in_bytes(&self) -> usize {
237 match self {
238 Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
239 Column::Bytes(b) => b.len(),
240 Column::Align(a) => 8 * a.len(),
241 }
242 }
243
244 #[inline]
245 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
246 match self {
247 Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
248 Column::Bytes(b) => writer.write_all(b).unwrap(),
249 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
250 }
251 }
252}
253
254#[inline(always)]
258pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
259where
260 K: Columnar,
261 for<'a> Ref<'a, K>: Hash,
262 V: Columnar,
263 D: Columnar,
264 T: Columnar,
265{
266 k.hashed()
267}
268
269#[cfg(test)]
270mod tests {
271 use timely::bytes::arc::BytesMut;
272 use timely::container::PushInto;
273 use timely::dataflow::channels::ContainerBytes;
274
275 use super::*;
276
277 fn raw_columnar_bytes() -> Vec<u8> {
279 let mut raw = Vec::new();
280 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
283 raw.extend(2_i32.to_le_bytes());
284 raw.extend(3_i32.to_le_bytes());
285 raw.extend([0, 0, 0, 0]); raw
287 }
288
289 #[mz_ore::test]
290 fn test_column_clone() {
291 let columns = Columnar::as_columns([1, 2, 3].iter());
292 let column_typed: Column<i32> = Column::Typed(columns);
293 let column_typed2 = column_typed.clone();
294
295 assert_eq!(
296 column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
297 vec![&1, &2, &3]
298 );
299
300 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
301 let column_bytes: Column<i32> = Column::Bytes(bytes);
302 let column_bytes2 = column_bytes.clone();
303
304 assert_eq!(
305 column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
306 vec![&1, &2, &3]
307 );
308
309 let raw = raw_columnar_bytes();
310 let mut region: Vec<u64> = vec![0; raw.len() / 8];
311 let region_bytes = bytemuck::cast_slice_mut(&mut region[..]);
312 region_bytes[..raw.len()].copy_from_slice(&raw);
313 let column_align: Column<i32> = Column::Align(region);
314 let column_align2 = column_align.clone();
315
316 assert_eq!(
317 column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
318 vec![&1, &2, &3]
319 );
320 }
321
322 #[mz_ore::test]
325 fn test_column_known_bytes() {
326 let mut column: Column<i32> = Default::default();
327 column.push_into(1);
328 column.push_into(2);
329 column.push_into(3);
330 let mut data = Vec::new();
331 column.into_bytes(&mut std::io::Cursor::new(&mut data));
332 assert_eq!(data, raw_columnar_bytes());
333 }
334
335 #[mz_ore::test]
336 fn test_column_from_bytes() {
337 let raw = raw_columnar_bytes();
338
339 let buf = vec![0; raw.len() + 8];
340 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
341 let mut bytes_mut = BytesMut::from(buf);
342 let _ = bytes_mut.extract_to(align);
343 bytes_mut[..raw.len()].copy_from_slice(&raw);
344 let aligned_bytes = bytes_mut.extract_to(raw.len());
345
346 let column: Column<i32> = Column::from_bytes(aligned_bytes);
347 assert!(matches!(column, Column::Bytes(_)));
348 assert_eq!(
349 column.borrow().into_index_iter().collect::<Vec<_>>(),
350 vec![&1, &2, &3]
351 );
352
353 let buf = vec![0; raw.len() + 8];
354 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
355 let mut bytes_mut = BytesMut::from(buf);
356 let _ = bytes_mut.extract_to(align + 1);
357 bytes_mut[..raw.len()].copy_from_slice(&raw);
358 let unaligned_bytes = bytes_mut.extract_to(raw.len());
359
360 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
361 assert!(matches!(column, Column::Align(_)));
362 assert_eq!(
363 column.borrow().into_index_iter().collect::<Vec<_>>(),
364 vec![&1, &2, &3]
365 );
366 }
367}