1use std::hash::Hash;
19
20use columnar::Columnar;
21use differential_dataflow::Hashable;
22use differential_dataflow::containers::TimelyStack;
23use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
24
25pub mod stack;
26
27pub(crate) use alloc::alloc_aligned_zeroed;
28pub use alloc::{enable_columnar_lgalloc, set_enable_columnar_lgalloc};
29pub use builder::ColumnBuilder;
30pub use container::Column;
31pub use provided_builder::ProvidedBuilder;
32
33mod alloc {
34 use mz_ore::region::Region;
35
36 #[inline]
39 pub(crate) fn alloc_aligned_zeroed<T: bytemuck::AnyBitPattern>(len: usize) -> Region<T> {
40 if enable_columnar_lgalloc() {
41 Region::new_auto_zeroed(len)
42 } else {
43 Region::new_heap_zeroed(len)
44 }
45 }
46
47 thread_local! {
48 static ENABLE_COLUMNAR_LGALLOC: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
49 }
50
51 #[inline]
53 pub fn enable_columnar_lgalloc() -> bool {
54 ENABLE_COLUMNAR_LGALLOC.get()
55 }
56
57 pub fn set_enable_columnar_lgalloc(enabled: bool) {
59 ENABLE_COLUMNAR_LGALLOC.set(enabled);
60 }
61}
62
63mod container {
64 use columnar::Columnar;
65 use columnar::Container as _;
66 use columnar::bytes::{EncodeDecode, Sequence};
67 use columnar::common::IterOwn;
68 use columnar::{Clear, FromBytes, Index, Len};
69 use mz_ore::region::Region;
70 use timely::Container;
71 use timely::bytes::arc::Bytes;
72 use timely::container::PushInto;
73 use timely::dataflow::channels::ContainerBytes;
74
75 pub enum Column<C: Columnar> {
81 Typed(C::Container),
83 Bytes(Bytes),
85 Align(Region<u64>),
90 }
91
92 impl<C: Columnar> Column<C> {
93 fn borrow(&self) -> <C::Container as columnar::Container<C>>::Borrowed<'_> {
95 match self {
96 Column::Typed(t) => t.borrow(),
97 Column::Bytes(b) => {
98 <<C::Container as columnar::Container<C>>::Borrowed<'_>>::from_bytes(
99 &mut Sequence::decode(bytemuck::cast_slice(b)),
100 )
101 }
102 Column::Align(a) => {
103 <<C::Container as columnar::Container<C>>::Borrowed<'_>>::from_bytes(
104 &mut Sequence::decode(a),
105 )
106 }
107 }
108 }
109 }
110
111 impl<C: Columnar> Default for Column<C> {
112 fn default() -> Self {
113 Self::Typed(Default::default())
114 }
115 }
116
117 impl<C: Columnar> Clone for Column<C>
118 where
119 C::Container: Clone,
120 {
121 fn clone(&self) -> Self {
122 match self {
123 Column::Typed(t) => Column::Typed(t.clone()),
126 Column::Bytes(b) => {
127 assert_eq!(b.len() % 8, 0);
128 let mut alloc: Region<u64> = super::alloc_aligned_zeroed(b.len() / 8);
129 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
130 alloc_bytes[..b.len()].copy_from_slice(b);
131 Self::Align(alloc)
132 }
133 Column::Align(a) => {
134 let mut alloc = super::alloc_aligned_zeroed(a.len());
135 alloc[..a.len()].copy_from_slice(a);
136 Column::Align(alloc)
137 }
138 }
139 }
140 }
141
142 impl<C: Columnar> Container for Column<C> {
143 type ItemRef<'a> = C::Ref<'a>;
144 type Item<'a> = C::Ref<'a>;
145
146 fn len(&self) -> usize {
147 self.borrow().len()
148 }
149
150 fn clear(&mut self) {
152 match self {
153 Column::Typed(t) => t.clear(),
154 Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
155 }
156 }
157
158 type Iter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
159
160 fn iter(&self) -> Self::Iter<'_> {
161 self.borrow().into_index_iter()
162 }
163
164 type DrainIter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
165
166 fn drain(&mut self) -> Self::DrainIter<'_> {
167 self.borrow().into_index_iter()
168 }
169 }
170
171 impl<C: Columnar, T> PushInto<T> for Column<C>
172 where
173 C::Container: columnar::Push<T>,
174 {
175 #[inline]
176 fn push_into(&mut self, item: T) {
177 use columnar::Push;
178 match self {
179 Column::Typed(t) => t.push(item),
180 Column::Align(_) | Column::Bytes(_) => {
181 unimplemented!("Pushing into Column::Bytes without first clearing");
184 }
185 }
186 }
187 }
188
189 impl<C: Columnar> ContainerBytes for Column<C> {
190 fn from_bytes(bytes: Bytes) -> Self {
191 assert_eq!(bytes.len() % 8, 0);
197 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
198 Self::Bytes(bytes)
199 } else {
200 let mut alloc: Region<u64> = super::alloc_aligned_zeroed(bytes.len() / 8);
202 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
203 alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
204 Self::Align(alloc)
205 }
206 }
207
208 fn length_in_bytes(&self) -> usize {
209 match self {
210 Column::Typed(t) => Sequence::length_in_bytes(&t.borrow()),
211 Column::Bytes(b) => b.len(),
212 Column::Align(a) => 8 * a.len(),
213 }
214 }
215
216 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
217 match self {
218 Column::Typed(t) => Sequence::write(writer, &t.borrow()).unwrap(),
219 Column::Bytes(b) => writer.write_all(b).unwrap(),
220 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
221 }
222 }
223 }
224}
225
226mod builder {
227 use std::collections::VecDeque;
228
229 use columnar::bytes::{EncodeDecode, Sequence};
230 use columnar::{Clear, Columnar, Len, Push};
231 use timely::container::PushInto;
232 use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
233
234 use crate::containers::Column;
235
236 pub struct ColumnBuilder<C: Columnar> {
238 current: C::Container,
240 finished: Option<Column<C>>,
245 pending: VecDeque<Column<C>>,
247 }
248
249 impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C>
250 where
251 C::Container: Push<T>,
252 {
253 #[inline]
254 fn push_into(&mut self, item: T) {
255 self.current.push(item);
256 use columnar::Container;
258 let words = Sequence::length_in_words(&self.current.borrow());
259 let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
260 if round - words < round / 10 {
261 #[cold]
264 fn outlined_align<C>(
265 current: &mut C::Container,
266 round: usize,
267 pending: &mut VecDeque<Column<C>>,
268 ) where
269 C: Columnar,
270 {
271 let mut alloc = super::alloc_aligned_zeroed(round);
272 let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
273 Sequence::write(writer, ¤t.borrow()).unwrap();
274 pending.push_back(Column::Align(alloc));
275 current.clear();
276 }
277
278 outlined_align(&mut self.current, round, &mut self.pending);
279 }
280 }
281 }
282
283 impl<C: Columnar> Default for ColumnBuilder<C> {
284 fn default() -> Self {
285 ColumnBuilder {
286 current: Default::default(),
287 finished: None,
288 pending: Default::default(),
289 }
290 }
291 }
292
293 impl<C: Columnar> ContainerBuilder for ColumnBuilder<C>
294 where
295 C::Container: Clone,
296 {
297 type Container = Column<C>;
298
299 #[inline]
300 fn extract(&mut self) -> Option<&mut Self::Container> {
301 if let Some(container) = self.pending.pop_front() {
302 self.finished = Some(container);
303 self.finished.as_mut()
304 } else {
305 None
306 }
307 }
308
309 #[inline]
310 fn finish(&mut self) -> Option<&mut Self::Container> {
311 if !self.current.is_empty() {
312 self.pending
313 .push_back(Column::Typed(std::mem::take(&mut self.current)));
314 }
315 self.finished = self.pending.pop_front();
316 self.finished.as_mut()
317 }
318 }
319
320 impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone {}
321}
322
323pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
325 Column<((K, V), T, R)>,
326 batcher::Chunker<TimelyStack<((K, V), T, R)>>,
327 ColMerger<(K, V), T, R>,
328>;
329pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
330
331#[inline(always)]
335pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &<((K, V), T, D) as Columnar>::Ref<'_>) -> u64
336where
337 K: Columnar,
338 for<'a> K::Ref<'a>: Hash,
339 V: Columnar,
340 D: Columnar,
341 T: Columnar,
342{
343 k.hashed()
344}
345
346pub mod batcher {
348 use std::collections::VecDeque;
349
350 use columnar::Columnar;
351 use differential_dataflow::difference::Semigroup;
352 use timely::Container;
353 use timely::container::{ContainerBuilder, PushInto};
354
355 use crate::containers::Column;
356
357 #[derive(Default)]
358 pub struct Chunker<C> {
359 target: C,
364 ready: VecDeque<C>,
366 }
367
368 impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
369 type Container = C;
370
371 fn extract(&mut self) -> Option<&mut Self::Container> {
372 if let Some(ready) = self.ready.pop_front() {
373 self.target = ready;
374 Some(&mut self.target)
375 } else {
376 None
377 }
378 }
379
380 fn finish(&mut self) -> Option<&mut Self::Container> {
381 self.extract()
382 }
383 }
384
385 impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker<C2>
386 where
387 D: Columnar,
388 for<'b> D::Ref<'b>: Ord + Copy,
389 T: Columnar,
390 for<'b> T::Ref<'b>: Ord + Copy,
391 R: Columnar + Semigroup + for<'b> Semigroup<R::Ref<'b>>,
392 for<'b> R::Ref<'b>: Ord,
393 C2: Container + for<'b> PushInto<&'b (D, T, R)>,
394 {
395 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
396 let mut permutation = Vec::with_capacity(container.len());
399 permutation.extend(container.drain());
400 permutation.sort();
401
402 self.target.clear();
403 let mut iter = permutation.drain(..);
405 if let Some((data, time, diff)) = iter.next() {
406 let mut owned_data = D::into_owned(data);
407 let mut owned_time = T::into_owned(time);
408
409 let mut prev_data = data;
410 let mut prev_time = time;
411 let mut prev_diff = <R as Columnar>::into_owned(diff);
412
413 for (data, time, diff) in iter {
414 if (&prev_data, &prev_time) == (&data, &time) {
415 prev_diff.plus_equals(&diff);
416 } else {
417 if !prev_diff.is_zero() {
418 D::copy_from(&mut owned_data, prev_data);
419 T::copy_from(&mut owned_time, prev_time);
420 let tuple = (owned_data, owned_time, prev_diff);
421 self.target.push_into(&tuple);
422 (owned_data, owned_time, prev_diff) = tuple;
423 }
424 prev_data = data;
425 prev_time = time;
426 R::copy_from(&mut prev_diff, diff);
427 }
428 }
429
430 if !prev_diff.is_zero() {
431 D::copy_from(&mut owned_data, prev_data);
432 T::copy_from(&mut owned_time, prev_time);
433 let tuple = (owned_data, owned_time, prev_diff);
434 self.target.push_into(&tuple);
435 }
436 }
437
438 if !self.target.is_empty() {
439 self.ready.push_back(std::mem::take(&mut self.target));
440 }
441 }
442 }
443}
444
445mod provided_builder {
446 use timely::Container;
447 use timely::container::ContainerBuilder;
448
449 pub struct ProvidedBuilder<C> {
454 _marker: std::marker::PhantomData<C>,
455 }
456
457 impl<C> Default for ProvidedBuilder<C> {
458 fn default() -> Self {
459 Self {
460 _marker: std::marker::PhantomData,
461 }
462 }
463 }
464
465 impl<C: Container + Clone + 'static> ContainerBuilder for ProvidedBuilder<C> {
466 type Container = C;
467
468 #[inline(always)]
469 fn extract(&mut self) -> Option<&mut Self::Container> {
470 None
471 }
472
473 #[inline(always)]
474 fn finish(&mut self) -> Option<&mut Self::Container> {
475 None
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use mz_ore::region::Region;
483 use timely::Container;
484 use timely::bytes::arc::BytesMut;
485 use timely::dataflow::channels::ContainerBytes;
486
487 use super::*;
488
489 fn raw_columnar_bytes() -> Vec<u8> {
491 let mut raw = Vec::new();
492 raw.extend(12_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
494 raw.extend(2_i32.to_le_bytes());
495 raw.extend(3_i32.to_le_bytes());
496 raw.extend([0, 0, 0, 0]); raw
498 }
499
500 #[mz_ore::test]
501 fn test_column_clone() {
502 let columns = Columnar::as_columns([1, 2, 3].iter());
503 let column_typed: Column<i32> = Column::Typed(columns);
504 let column_typed2 = column_typed.clone();
505
506 assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
507
508 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
509 let column_bytes: Column<i32> = Column::Bytes(bytes);
510 let column_bytes2 = column_bytes.clone();
511
512 assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
513
514 let raw = raw_columnar_bytes();
515 let mut region: Region<u64> = alloc_aligned_zeroed(raw.len() / 8);
516 let region_bytes = bytemuck::cast_slice_mut(&mut region);
517 region_bytes[..raw.len()].copy_from_slice(&raw);
518 let column_align: Column<i32> = Column::Align(region);
519 let column_align2 = column_align.clone();
520
521 assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
522 }
523
524 #[mz_ore::test]
525 fn test_column_from_bytes() {
526 let raw = raw_columnar_bytes();
527
528 let buf = vec![0; raw.len() + 8];
529 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
530 let mut bytes_mut = BytesMut::from(buf);
531 let _ = bytes_mut.extract_to(align);
532 bytes_mut[..raw.len()].copy_from_slice(&raw);
533 let aligned_bytes = bytes_mut.extract_to(raw.len());
534
535 let column: Column<i32> = Column::from_bytes(aligned_bytes);
536 assert!(matches!(column, Column::Bytes(_)));
537 assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
538
539 let buf = vec![0; raw.len() + 8];
540 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
541 let mut bytes_mut = BytesMut::from(buf);
542 let _ = bytes_mut.extract_to(align + 1);
543 bytes_mut[..raw.len()].copy_from_slice(&raw);
544 let unaligned_bytes = bytes_mut.extract_to(raw.len());
545
546 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
547 assert!(matches!(column, Column::Align(_)));
548 assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
549 }
550}