1use std::collections::VecDeque;
19use std::marker::PhantomData;
20
21use crate::columnation::ColumnationStack;
22use columnar::Container as _;
23use columnar::Push as _;
24use columnar::{Clear, Columnar, Index, Len};
25use columnation::Columnation;
26use differential_dataflow::difference::Semigroup;
27use differential_dataflow::trace::implementations::merge_batcher::Merger;
28use timely::Accountable;
29use timely::Container;
30use timely::PartialOrder;
31use timely::container::{ContainerBuilder, PushInto, SizableContainer};
32use timely::progress::frontier::{Antichain, AntichainRef};
33
34use crate::columnar::Column;
35
36#[derive(Default)]
38pub struct Chunker<C> {
39 target: C,
44 ready: VecDeque<C>,
46}
47
48impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
49 type Container = C;
50
51 fn extract(&mut self) -> Option<&mut Self::Container> {
52 if let Some(ready) = self.ready.pop_front() {
53 self.target = ready;
54 Some(&mut self.target)
55 } else {
56 None
57 }
58 }
59
60 fn finish(&mut self) -> Option<&mut Self::Container> {
61 self.extract()
62 }
63}
64
65impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for Chunker<ColumnationStack<(D, T, R)>>
66where
67 D: Columnar + Columnation,
68 for<'b> columnar::Ref<'b, D>: Ord + Copy,
69 T: Columnar + Columnation,
70 for<'b> columnar::Ref<'b, T>: Ord + Copy,
71 R: Columnar + Columnation + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
72 for<'b> columnar::Ref<'b, R>: Ord,
73{
74 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
75 let borrowed = container.borrow();
78 let mut permutation = Vec::with_capacity(borrowed.len());
79 Extend::extend(&mut permutation, borrowed.into_index_iter());
80 permutation.sort();
81
82 self.target.clear();
83 let mut iter = permutation.drain(..);
85 if let Some((data, time, diff)) = iter.next() {
86 let mut owned_data = D::into_owned(data);
87 let mut owned_time = T::into_owned(time);
88
89 let mut prev_data = data;
90 let mut prev_time = time;
91 let mut prev_diff = <R as Columnar>::into_owned(diff);
92
93 for (data, time, diff) in iter {
94 if (&prev_data, &prev_time) == (&data, &time) {
95 prev_diff.plus_equals(&diff);
96 } else {
97 if !prev_diff.is_zero() {
98 D::copy_from(&mut owned_data, prev_data);
99 T::copy_from(&mut owned_time, prev_time);
100 let tuple = (owned_data, owned_time, prev_diff);
101 self.target.push_into(&tuple);
102 (owned_data, owned_time, prev_diff) = tuple;
103 }
104 prev_data = data;
105 prev_time = time;
106 R::copy_from(&mut prev_diff, diff);
107 }
108 }
109
110 if !prev_diff.is_zero() {
111 D::copy_from(&mut owned_data, prev_data);
112 T::copy_from(&mut owned_time, prev_time);
113 let tuple = (owned_data, owned_time, prev_diff);
114 self.target.push_into(&tuple);
115 }
116 }
117
118 if !self.target.is_empty() {
119 self.ready.push_back(std::mem::take(&mut self.target));
120 }
121 }
122}
123
124pub struct ColumnChunker<U: Columnar> {
131 target: Column<U>,
134 ready: VecDeque<Column<U>>,
136}
137
138impl<U: Columnar> Default for ColumnChunker<U> {
143 fn default() -> Self {
144 Self {
145 target: Column::default(),
146 ready: VecDeque::new(),
147 }
148 }
149}
150
151impl<U: Columnar> ContainerBuilder for ColumnChunker<U>
152where
153 U::Container: Clone + 'static,
154{
155 type Container = Column<U>;
156
157 fn extract(&mut self) -> Option<&mut Self::Container> {
158 if let Some(ready) = self.ready.pop_front() {
159 self.target = ready;
160 Some(&mut self.target)
161 } else {
162 None
163 }
164 }
165
166 fn finish(&mut self) -> Option<&mut Self::Container> {
167 self.extract()
168 }
169}
170
171impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for ColumnChunker<(D, T, R)>
172where
173 D: Columnar,
174 for<'b> columnar::Ref<'b, D>: Copy + Ord,
175 T: Columnar,
176 for<'b> columnar::Ref<'b, T>: Copy + Ord,
177 R: Columnar + Default + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
178 for<'b> columnar::Ref<'b, R>: Ord,
179 for<'b> <D as Columnar>::Container: columnar::Push<columnar::Ref<'b, D>>,
180 for<'b> <T as Columnar>::Container: columnar::Push<columnar::Ref<'b, T>>,
181 for<'b> <R as Columnar>::Container: columnar::Push<&'b R>,
182{
183 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
184 match &mut self.target {
189 Column::Typed(c) => c.clear(),
190 Column::Bytes(_) | Column::Align(_) => {
191 self.target = Column::Typed(Default::default());
192 }
193 }
194
195 let borrowed = container.borrow();
197 let mut permutation = Vec::with_capacity(borrowed.len());
198 Extend::extend(&mut permutation, borrowed.into_index_iter());
199 permutation.sort();
200
201 {
207 let Column::Typed(target_c) = &mut self.target else {
208 unreachable!("target reset to Typed above");
209 };
210 let (target_d, target_t, target_r) = target_c;
211
212 let mut iter = permutation.drain(..);
213 if let Some((data, time, diff)) = iter.next() {
214 let mut prev_data = data;
215 let mut prev_time = time;
216 let mut prev_diff = <R as Columnar>::into_owned(diff);
217
218 for (data, time, diff) in iter {
219 if (&prev_data, &prev_time) == (&data, &time) {
220 prev_diff.plus_equals(&diff);
221 } else {
222 if !prev_diff.is_zero() {
223 target_d.push(prev_data);
224 target_t.push(prev_time);
225 target_r.push(&prev_diff);
226 }
227 prev_data = data;
228 prev_time = time;
229 R::copy_from(&mut prev_diff, diff);
230 }
231 }
232
233 if !prev_diff.is_zero() {
234 target_d.push(prev_data);
235 target_t.push(prev_time);
236 target_r.push(&prev_diff);
237 }
238 }
239 }
240
241 if self.target.borrow().len() > 0 {
242 let chunk = std::mem::replace(&mut self.target, Column::Typed(Default::default()));
243 self.ready.push_back(chunk);
244 }
245 }
246}
247
248fn gallop(upper: usize, lower: &mut usize, mut cmp: impl FnMut(usize) -> bool) {
263 if *lower < upper && cmp(*lower) {
265 let mut step = 1;
271 while *lower + step < upper && cmp(*lower + step) {
272 *lower += step;
273 step <<= 1;
274 }
275
276 step >>= 1;
281 while step > 0 {
282 if *lower + step < upper && cmp(*lower + step) {
283 *lower += step;
284 }
285 step >>= 1;
286 }
287
288 *lower += 1;
291 }
292}
293
294pub struct ColumnMerger<D, T, R> {
299 _marker: PhantomData<(D, T, R)>,
300}
301
302impl<D, T, R> Default for ColumnMerger<D, T, R> {
303 fn default() -> Self {
304 Self {
305 _marker: PhantomData,
306 }
307 }
308}
309
310impl<D, T, R> Column<(D, T, R)>
316where
317 D: Columnar + Default,
318 for<'a> columnar::Ref<'a, D>: Copy + Ord,
319 T: Columnar + Default + Clone + PartialOrder,
320 for<'a> columnar::Ref<'a, T>: Copy + Ord,
321 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
322 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
323 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
324 for<'a> <D as Columnar>::Container: columnar::Push<&'a D>,
325 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
326 for<'a> <T as Columnar>::Container: columnar::Push<&'a T>,
327 for<'a> <R as Columnar>::Container: columnar::Push<columnar::Ref<'a, R>>,
328 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
329{
330 #[must_use]
348 pub fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) -> bool {
349 match others.len() {
350 0 => false,
351 1 => {
352 let other = &mut others[0];
353 let pos = &mut positions[0];
354 if self.is_empty() && *pos == 0 {
356 std::mem::swap(self, other);
357 return false;
358 }
359 let Column::Typed(self_c) = self else {
361 unreachable!("merger chunks are always Column::Typed");
362 };
363 let src_c = other.borrow();
364 self_c.extend_from_self(src_c, *pos..other.borrow().len());
365 *pos = other.borrow().len();
366 false
367 }
368 2 => {
369 let (left, right) = others.split_at(1);
370 let (left_pos, right_pos) = positions.split_at_mut(1);
371 let left_borrow = left[0].borrow();
372 let right_borrow = right[0].borrow();
373
374 let Column::Typed(self_c) = self else {
375 unreachable!("merger chunks are always Column::Typed");
376 };
377
378 let l_d = left_borrow.0;
388 let l_t = left_borrow.1;
389 let l_r = left_borrow.2;
390 let r_d = right_borrow.0;
391 let r_t = right_borrow.1;
392 let r_r = right_borrow.2;
393 let upper_l = l_d.len();
394 let upper_r = r_d.len();
395
396 let (sd, st, sr) = self_c;
403
404 const RESERVE_RECORD_THRESHOLD: usize = 1_000_000;
418 if upper_l + upper_r <= RESERVE_RECORD_THRESHOLD {
419 use columnar::Container as _;
420 let inputs = [left_borrow, right_borrow];
421 sd.reserve_for(inputs.iter().map(|b| b.0));
422 st.reserve_for(inputs.iter().map(|b| b.1));
423 sr.reserve_for(inputs.iter().map(|b| b.2));
424 }
425
426 let mut stash = R::default();
427
428 let at_ship_threshold =
445 |sd: &D::Container, st: &T::Container, sr: &R::Container| {
446 use columnar::Borrow as _;
447 crate::columnar::at_serialized_capacity(&(
448 sd.borrow(),
449 st.borrow(),
450 sr.borrow(),
451 ))
452 };
453 const THRESHOLD_PERIOD_MASK: u32 = 1023;
454 let mut iter: u32 = 0;
455 let mut yielded = false;
456
457 while left_pos[0] < upper_l && right_pos[0] < upper_r {
458 let d1 = l_d.get(left_pos[0]);
459 let t1 = l_t.get(left_pos[0]);
460 let d2 = r_d.get(right_pos[0]);
461 let t2 = r_t.get(right_pos[0]);
462 match (d1, t1).cmp(&(d2, t2)) {
463 std::cmp::Ordering::Less => {
464 sd.push(d1);
472 st.push(t1);
473 sr.push(l_r.get(left_pos[0]));
474 left_pos[0] += 1;
475 if left_pos[0] < upper_l
480 && (l_d.get(left_pos[0]), l_t.get(left_pos[0])) < (d2, t2)
481 {
482 let start = left_pos[0];
483 gallop(upper_l, &mut left_pos[0], |i| {
484 (l_d.get(i), l_t.get(i)) < (d2, t2)
485 });
486 sd.extend_from_self(l_d, start..left_pos[0]);
492 st.extend_from_self(l_t, start..left_pos[0]);
493 sr.extend_from_self(l_r, start..left_pos[0]);
494 }
495 }
496 std::cmp::Ordering::Greater => {
497 sd.push(d2);
499 st.push(t2);
500 sr.push(r_r.get(right_pos[0]));
501 right_pos[0] += 1;
502 if right_pos[0] < upper_r
503 && (r_d.get(right_pos[0]), r_t.get(right_pos[0])) < (d1, t1)
504 {
505 let start = right_pos[0];
506 gallop(upper_r, &mut right_pos[0], |i| {
507 (r_d.get(i), r_t.get(i)) < (d1, t1)
508 });
509 sd.extend_from_self(r_d, start..right_pos[0]);
510 st.extend_from_self(r_t, start..right_pos[0]);
511 sr.extend_from_self(r_r, start..right_pos[0]);
512 }
513 }
514 std::cmp::Ordering::Equal => {
515 let r1 = l_r.get(left_pos[0]);
516 let r2 = r_r.get(right_pos[0]);
517 R::copy_from(&mut stash, r1);
518 stash.plus_equals(&r2);
519 if !stash.is_zero() {
520 sd.push(d1);
521 st.push(t1);
522 sr.push(&stash);
523 }
524 left_pos[0] += 1;
525 right_pos[0] += 1;
526 }
527 }
528
529 iter = iter.wrapping_add(1);
532 if iter & THRESHOLD_PERIOD_MASK == 0 && at_ship_threshold(sd, st, sr) {
533 yielded = true;
534 break;
535 }
536 }
537 yielded
538 }
539 n => unreachable!("merge_from called with {n} inputs; expected 0, 1, or 2"),
544 }
545 }
546
547 pub fn extract(
558 &mut self,
559 position: &mut usize,
560 upper: AntichainRef<T>,
561 frontier: &mut Antichain<T>,
562 keep: &mut Self,
563 ship: &mut Self,
564 ) {
565 let Column::Typed(keep_c) = keep else {
566 unreachable!("merger chunks are always Column::Typed");
567 };
568 let Column::Typed(ship_c) = ship else {
569 unreachable!("merger chunks are always Column::Typed");
570 };
571
572 let self_view = self.borrow();
573 let len = self_view.len();
574
575 let mut owned_t = T::default();
576 use columnar::Borrow as _;
583 while *position < len
584 && !crate::columnar::at_serialized_capacity(&keep_c.borrow())
585 && !crate::columnar::at_serialized_capacity(&ship_c.borrow())
586 {
587 let (_, time, _) = self_view.get(*position);
588 T::copy_from(&mut owned_t, time);
589 if upper.less_equal(&owned_t) {
590 frontier.insert_with(&owned_t, |t| t.clone());
593 keep_c.extend_from_self(self_view, *position..*position + 1);
594 } else {
595 ship_c.extend_from_self(self_view, *position..*position + 1);
596 }
597 *position += 1;
598 }
599 }
600}
601
602impl<D, T, R> Merger for ColumnMerger<D, T, R>
618where
619 D: Columnar + Default + 'static,
620 for<'a> columnar::Ref<'a, D>: Copy + Ord,
621 T: Columnar + Default + Clone + Ord + PartialOrder + 'static,
622 for<'a> columnar::Ref<'a, T>: Copy + Ord,
623 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>> + 'static,
624 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
625 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
626 for<'a> <D as Columnar>::Container: columnar::Push<&'a D>,
627 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
628 for<'a> <T as Columnar>::Container: columnar::Push<&'a T>,
629 for<'a> <R as Columnar>::Container: columnar::Push<columnar::Ref<'a, R>>,
630 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
631{
632 type Time = T;
633 type Chunk = Column<(D, T, R)>;
634
635 fn merge(
636 &mut self,
637 list1: Vec<Self::Chunk>,
638 list2: Vec<Self::Chunk>,
639 output: &mut Vec<Self::Chunk>,
640 stash: &mut Vec<Self::Chunk>,
641 ) {
642 let mut list1 = list1.into_iter();
643 let mut list2 = list2.into_iter();
644
645 let mut heads = [
646 list1.next().unwrap_or_default(),
647 list2.next().unwrap_or_default(),
648 ];
649 let mut positions = [0usize, 0usize];
650
651 let mut result = empty_chunk(stash);
652
653 loop {
655 let upper_l = heads[0].borrow().len();
656 let upper_r = heads[1].borrow().len();
657 if positions[0] >= upper_l || positions[1] >= upper_r {
658 break;
659 }
660
661 let lhs_passthrough = positions[0] == 0 && upper_l > 0 && {
676 let lhs = heads[0].borrow();
677 let rhs = heads[1].borrow();
678 let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1));
679 let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1]));
680 last_l < cur_r
681 };
682 if lhs_passthrough {
683 if !result.is_empty() {
684 output.push(std::mem::take(&mut result));
685 result = empty_chunk(stash);
686 }
687 let head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
688 output.push(head);
689 positions[0] = 0;
690 continue;
691 }
692
693 let rhs_passthrough = positions[1] == 0 && upper_r > 0 && {
694 let lhs = heads[0].borrow();
695 let rhs = heads[1].borrow();
696 let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1));
697 let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0]));
698 last_r < cur_l
699 };
700 if rhs_passthrough {
701 if !result.is_empty() {
702 output.push(std::mem::take(&mut result));
703 result = empty_chunk(stash);
704 }
705 let head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
706 output.push(head);
707 positions[1] = 0;
708 continue;
709 }
710
711 let yielded = result.merge_from(&mut heads, &mut positions);
715
716 if positions[0] >= heads[0].borrow().len() {
717 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
718 recycle_chunk(old, stash);
719 positions[0] = 0;
720 }
721 if positions[1] >= heads[1].borrow().len() {
722 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
723 recycle_chunk(old, stash);
724 positions[1] = 0;
725 }
726 if yielded || result.at_capacity() {
727 output.push(std::mem::take(&mut result));
728 result = empty_chunk(stash);
729 }
730 }
731
732 drain_side(
735 &mut heads[0],
736 &mut positions[0],
737 &mut list1,
738 &mut result,
739 output,
740 stash,
741 );
742 drain_side(
743 &mut heads[1],
744 &mut positions[1],
745 &mut list2,
746 &mut result,
747 output,
748 stash,
749 );
750 if !result.is_empty() {
751 output.push(result);
752 }
753 }
754
755 fn extract(
756 &mut self,
757 merged: Vec<Self::Chunk>,
758 upper: AntichainRef<Self::Time>,
759 frontier: &mut Antichain<Self::Time>,
760 ship: &mut Vec<Self::Chunk>,
761 kept: &mut Vec<Self::Chunk>,
762 stash: &mut Vec<Self::Chunk>,
763 ) {
764 let mut keep = empty_chunk(stash);
765 let mut ready = empty_chunk(stash);
766
767 for mut buffer in merged {
768 let mut position = 0;
769 let len = buffer.borrow().len();
770 while position < len {
771 buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
772 if keep.at_capacity() {
773 kept.push(std::mem::take(&mut keep));
774 keep = empty_chunk(stash);
775 }
776 if ready.at_capacity() {
777 ship.push(std::mem::take(&mut ready));
778 ready = empty_chunk(stash);
779 }
780 }
781 recycle_chunk(buffer, stash);
782 }
783 if !keep.is_empty() {
784 kept.push(keep);
785 }
786 if !ready.is_empty() {
787 ship.push(ready);
788 }
789 }
790
791 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
792 use timely::dataflow::channels::ContainerBytes;
793 let records = usize::try_from(chunk.record_count()).expect("record_count is non-negative");
794 let bytes = chunk.length_in_bytes();
800 (records, bytes, bytes, 1)
801 }
802}
803
804#[inline]
807fn empty_chunk<C: Columnar>(stash: &mut Vec<Column<C>>) -> Column<C> {
808 stash.pop().unwrap_or_default()
809}
810
811#[inline]
820fn recycle_chunk<C: Columnar>(mut chunk: Column<C>, stash: &mut Vec<Column<C>>) {
821 if let Column::Typed(c) = &mut chunk {
822 c.clear();
823 stash.push(chunk);
824 }
825}
826
827fn drain_side<D, T, R>(
833 head: &mut Column<(D, T, R)>,
834 pos: &mut usize,
835 list: &mut std::vec::IntoIter<Column<(D, T, R)>>,
836 result: &mut Column<(D, T, R)>,
837 output: &mut Vec<Column<(D, T, R)>>,
838 stash: &mut Vec<Column<(D, T, R)>>,
839) where
840 D: Columnar + Default,
841 for<'a> columnar::Ref<'a, D>: Copy + Ord,
842 T: Columnar + Default + Clone + PartialOrder,
843 for<'a> columnar::Ref<'a, T>: Copy + Ord,
844 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
845 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
846 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
847 for<'a> <D as Columnar>::Container: columnar::Push<&'a D>,
848 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
849 for<'a> <T as Columnar>::Container: columnar::Push<&'a T>,
850 for<'a> <R as Columnar>::Container: columnar::Push<columnar::Ref<'a, R>>,
851 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
852{
853 if *pos < head.borrow().len() {
854 let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos));
857 }
858 if !result.is_empty() {
859 output.push(std::mem::take(result));
860 *result = empty_chunk(stash);
861 }
862 Extend::extend(output, list);
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868
869 fn run_chunker<D, T, R>(inputs: &[(D, T, R)]) -> Vec<(D, T, R)>
872 where
873 D: Columnar + Clone,
874 for<'a> columnar::Ref<'a, D>: Copy + Ord,
875 T: Columnar + Clone,
876 for<'a> columnar::Ref<'a, T>: Copy + Ord,
877 R: Columnar + Clone + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
878 for<'a> columnar::Ref<'a, R>: Ord,
879 <(D, T, R) as Columnar>::Container: Clone,
880 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
881 <(D, T, R) as Columnar>::Container: columnar::Push<(D, T, R)>,
882 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
883 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
884 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
885 {
886 let mut input: Column<(D, T, R)> = Default::default();
887 for tuple in inputs.iter().cloned() {
888 input.push_into(tuple);
889 }
890
891 let mut chunker: ColumnChunker<(D, T, R)> = Default::default();
892 chunker.push_into(&mut input);
893
894 let mut out = Vec::new();
895 while let Some(chunk) = chunker.extract() {
896 for (d, t, r) in chunk.borrow().into_index_iter() {
897 out.push((D::into_owned(d), T::into_owned(t), R::into_owned(r)));
898 }
899 }
900 out
901 }
902
903 #[mz_ore::test]
904 fn empty_input_yields_no_chunk() {
905 let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
906 let mut input: Column<(u64, u64, i64)> = Default::default();
907 chunker.push_into(&mut input);
908 assert!(chunker.extract().is_none());
909 assert!(chunker.finish().is_none());
910 }
911
912 #[mz_ore::test]
913 fn unsorted_input_is_sorted() {
914 let out = run_chunker(&[(3u64, 0u64, 1i64), (1u64, 0u64, 1i64), (2u64, 0u64, 1i64)]);
915 assert_eq!(out, vec![(1, 0, 1), (2, 0, 1), (3, 0, 1)]);
916 }
917
918 #[mz_ore::test]
919 fn duplicate_keys_consolidate() {
920 let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, 2i64), (1u64, 0u64, -1i64)]);
921 assert_eq!(out, vec![(1, 0, 2)]);
922 }
923
924 #[mz_ore::test]
925 fn diffs_summing_to_zero_are_dropped() {
926 let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, -1i64)]);
927 assert!(out.is_empty());
928 }
929
930 #[mz_ore::test]
931 fn mixed_consolidation() {
932 let out = run_chunker(&[
936 (1u64, 0u64, 1i64),
937 (2u64, 0u64, 1i64),
938 (1u64, 0u64, 2i64),
939 (1u64, 1u64, 5i64),
940 (1u64, 0u64, -3i64),
941 ]);
942 assert_eq!(out, vec![(1, 1, 5), (2, 0, 1)]);
943 }
944
945 #[mz_ore::test]
946 fn key_val_tuple_data() {
947 let out = run_chunker(&[
949 ((1u64, 10u64), 0u64, 1i64),
950 ((1u64, 10u64), 0u64, 1i64),
951 ((1u64, 11u64), 0u64, 1i64),
952 ((2u64, 10u64), 0u64, 1i64),
953 ]);
954 assert_eq!(
955 out,
956 vec![((1, 10), 0, 2), ((1, 11), 0, 1), ((2, 10), 0, 1),]
957 );
958 }
959
960 #[mz_ore::test]
961 fn buffer_reuse_across_calls() {
962 let mut input1: Column<(u64, u64, i64)> = Default::default();
965 input1.push_into((1u64, 0u64, 1i64));
966 input1.push_into((2u64, 0u64, 1i64));
967
968 let mut input2: Column<(u64, u64, i64)> = Default::default();
969 input2.push_into((3u64, 0u64, 1i64));
970 input2.push_into((1u64, 0u64, 1i64));
971
972 let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
973 chunker.push_into(&mut input1);
974
975 {
978 let _ = chunker.extract().expect("first chunk");
979 }
980
981 chunker.push_into(&mut input2);
982
983 let chunk = chunker.extract().expect("second chunk");
984 let collected: Vec<_> = chunk
985 .borrow()
986 .into_index_iter()
987 .map(|(d, t, r)| (u64::into_owned(d), u64::into_owned(t), i64::into_owned(r)))
988 .collect();
989 assert_eq!(collected, vec![(1, 0, 1), (3, 0, 1)]);
990 }
991
992 fn col(rows: &[((u64, u64), u64, i64)]) -> Column<((u64, u64), u64, i64)> {
994 let mut c: Column<((u64, u64), u64, i64)> = Default::default();
995 for &t in rows {
996 c.push_into(t);
997 }
998 c
999 }
1000
1001 fn collect_chunks(chunks: &[Column<((u64, u64), u64, i64)>]) -> Vec<((u64, u64), u64, i64)> {
1002 chunks
1003 .iter()
1004 .flat_map(|c| {
1005 c.borrow().into_index_iter().map(|((k, v), t, r)| {
1006 (
1007 (u64::into_owned(k), u64::into_owned(v)),
1008 u64::into_owned(t),
1009 i64::into_owned(r),
1010 )
1011 })
1012 })
1013 .collect()
1014 }
1015
1016 #[mz_ore::test]
1021 fn merger_disjoint_chains_passthrough() {
1022 let chain1 = vec![
1023 col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]),
1024 col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]),
1025 ];
1026 let chain2 = vec![
1027 col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]),
1028 col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]),
1029 ];
1030
1031 let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1032 let mut output = Vec::new();
1033 let mut stash = Vec::new();
1034 Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1035
1036 let collected = collect_chunks(&output);
1037 let expected: Vec<_> = (0..4u64)
1038 .map(|d| ((d, 0u64), 0u64, 1i64))
1039 .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64)))
1040 .collect();
1041 assert_eq!(collected, expected);
1042 }
1043
1044 #[mz_ore::test]
1049 fn merger_interleaved_chains() {
1050 let chain1 = vec![
1053 col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]),
1054 col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]),
1055 ];
1056 let chain2 = vec![
1057 col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]),
1058 col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]),
1059 ];
1060
1061 let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1062 let mut output = Vec::new();
1063 let mut stash = Vec::new();
1064 Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1065
1066 let collected = collect_chunks(&output);
1067 let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
1068 assert_eq!(collected, expected);
1069 }
1070
1071 #[mz_ore::test]
1075 fn merger_passthrough_respects_equal_boundary() {
1076 let chain1 = vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])];
1081 let chain2 = vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])];
1082
1083 let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1084 let mut output = Vec::new();
1085 let mut stash = Vec::new();
1086 Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1087
1088 let collected = collect_chunks(&output);
1089 assert_eq!(
1090 collected,
1091 vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]
1092 );
1093 }
1094}
1095
1096#[cfg(test)]
1097mod proptests {
1098 use super::*;
1108 use mz_ore::cast::CastFrom;
1109 use proptest::prelude::*;
1110 use timely::progress::frontier::Antichain;
1111
1112 type Tuple = ((u64, u64), u64, i64);
1113
1114 fn consolidate(mut v: Vec<Tuple>) -> Vec<Tuple> {
1117 v.sort();
1118 let mut out: Vec<Tuple> = Vec::new();
1119 for (d, t, r) in v {
1120 if let Some(last) = out.last_mut() {
1121 if last.0 == d && last.1 == t {
1122 last.2 += r;
1123 continue;
1124 }
1125 }
1126 out.push((d, t, r));
1127 }
1128 out.retain(|x| x.2 != 0);
1129 out
1130 }
1131
1132 fn arb_consolidated() -> impl Strategy<Value = Vec<Tuple>> {
1135 prop::collection::vec(((0u64..5, 0u64..5), 0u64..3, -3i64..=3i64), 0..30)
1136 .prop_map(consolidate)
1137 }
1138
1139 fn build_column(v: &[Tuple]) -> Column<Tuple> {
1140 let mut col: Column<Tuple> = Default::default();
1141 for tup in v {
1142 col.push_into(*tup);
1143 }
1144 col
1145 }
1146
1147 fn collect_column(col: &Column<Tuple>) -> Vec<Tuple> {
1148 col.borrow()
1149 .into_index_iter()
1150 .map(|((k, v), t, r)| {
1151 (
1152 (u64::into_owned(k), u64::into_owned(v)),
1153 u64::into_owned(t),
1154 i64::into_owned(r),
1155 )
1156 })
1157 .collect()
1158 }
1159
1160 fn drive_merge(left: Column<Tuple>, right: Column<Tuple>) -> Column<Tuple> {
1164 let mut self_col: Column<Tuple> = Default::default();
1165 let mut others = [left, right];
1166 let mut positions = [0usize, 0];
1167 let _ = self_col.merge_from(&mut others, &mut positions);
1168
1169 let [left_done, right_done] = others;
1170 let [left_pos, right_pos] = positions;
1171
1172 if left_pos < left_done.borrow().len() {
1173 let mut tail = [left_done];
1174 let mut p = [left_pos];
1175 let _ = self_col.merge_from(&mut tail, &mut p);
1176 } else if right_pos < right_done.borrow().len() {
1177 let mut tail = [right_done];
1178 let mut p = [right_pos];
1179 let _ = self_col.merge_from(&mut tail, &mut p);
1180 }
1181
1182 self_col
1183 }
1184
1185 proptest! {
1186 #[mz_ore::test]
1189 #[cfg_attr(miri, ignore)]
1190 fn merge_from_equals_consolidated_union(
1191 a in arb_consolidated(),
1192 b in arb_consolidated(),
1193 ) {
1194 let merged = drive_merge(build_column(&a), build_column(&b));
1195
1196 let mut union = a.clone();
1197 Extend::extend(&mut union, b.iter().copied());
1198 let expected = consolidate(union);
1199
1200 prop_assert_eq!(collect_column(&merged), expected);
1201 }
1202
1203 #[mz_ore::test]
1206 #[cfg_attr(miri, ignore)]
1207 fn merge_from_one_input_drains_tail(
1208 data in arb_consolidated(),
1209 pos_frac in 0u32..=100,
1210 ) {
1211 let len = data.len();
1213 let start_pos = if len == 0 { 0 } else {
1214 (usize::cast_from(pos_frac) * len) / 101
1215 };
1216
1217 let mut self_col: Column<Tuple> = Default::default();
1220 let sentinel: Tuple = ((u64::MAX, u64::MAX), 0, 1);
1221 self_col.push_into(sentinel);
1222
1223 let mut others = [build_column(&data)];
1224 let mut positions = [start_pos];
1225 let _ = self_col.merge_from(&mut others, &mut positions);
1226
1227 let mut expected = vec![sentinel];
1228 Extend::extend(&mut expected, data[start_pos..].iter().copied());
1229
1230 prop_assert_eq!(collect_column(&self_col), expected);
1231 prop_assert_eq!(positions[0], len);
1232 }
1233
1234 #[mz_ore::test]
1237 #[cfg_attr(miri, ignore)]
1238 fn merge_from_empty_self_swap(data in arb_consolidated()) {
1239 let mut self_col: Column<Tuple> = Default::default();
1240 let mut others = [build_column(&data)];
1241 let mut positions = [0usize];
1242 let _ = self_col.merge_from(&mut others, &mut positions);
1243
1244 prop_assert_eq!(collect_column(&self_col), data);
1245 }
1246
1247 #[mz_ore::test]
1253 #[cfg_attr(miri, ignore)]
1254 fn extract_partitions_by_frontier(
1255 data in arb_consolidated(),
1256 upper_time in 0u64..=4,
1257 ) {
1258 let mut self_col = build_column(&data);
1259 let upper = Antichain::from_elem(upper_time);
1260 let mut frontier: Antichain<u64> = Antichain::new();
1261 let mut keep: Column<Tuple> = Default::default();
1262 let mut ship: Column<Tuple> = Default::default();
1263 let mut position = 0;
1264
1265 self_col.extract(
1266 &mut position,
1267 upper.borrow(),
1268 &mut frontier,
1269 &mut keep,
1270 &mut ship,
1271 );
1272
1273 prop_assert_eq!(position, data.len());
1275
1276 let kept = collect_column(&keep);
1277 let shipped = collect_column(&ship);
1278
1279 for (_, t, _) in &kept {
1281 prop_assert!(
1282 upper.borrow().less_equal(t),
1283 "kept time {} should satisfy upper.less_equal", t,
1284 );
1285 }
1286 for (_, t, _) in &shipped {
1287 prop_assert!(
1288 !upper.borrow().less_equal(t),
1289 "shipped time {} should NOT satisfy upper.less_equal", t,
1290 );
1291 }
1292
1293 let mut union = kept.clone();
1295 Extend::extend(&mut union, shipped.iter().copied());
1296 union.sort();
1297 let mut expected_sorted = data.clone();
1298 expected_sorted.sort();
1299 prop_assert_eq!(union, expected_sorted);
1300
1301 for (_, t, _) in &kept {
1303 prop_assert!(
1304 frontier.less_equal(t),
1305 "frontier should dominate kept time {}", t,
1306 );
1307 }
1308 }
1309
1310 #[mz_ore::test]
1312 #[cfg_attr(miri, ignore)]
1313 fn extract_empty_input(upper_time in 0u64..=4) {
1314 let mut self_col: Column<Tuple> = Default::default();
1315 let upper = Antichain::from_elem(upper_time);
1316 let mut frontier: Antichain<u64> = Antichain::new();
1317 let mut keep: Column<Tuple> = Default::default();
1318 let mut ship: Column<Tuple> = Default::default();
1319 let mut position = 0;
1320
1321 self_col.extract(
1322 &mut position,
1323 upper.borrow(),
1324 &mut frontier,
1325 &mut keep,
1326 &mut ship,
1327 );
1328
1329 prop_assert_eq!(position, 0);
1330 prop_assert!(collect_column(&keep).is_empty());
1331 prop_assert!(collect_column(&ship).is_empty());
1332 prop_assert!(frontier.elements().is_empty());
1333 }
1334 }
1335}