1use std::collections::VecDeque;
32
33use columnar::{Columnar, Index, Len};
34use differential_dataflow::difference::Semigroup;
35use differential_dataflow::logging::{BatcherEvent, Logger};
36use differential_dataflow::trace::{Batcher, Description};
37use timely::Accountable;
38use timely::PartialOrder;
39use timely::container::{PushInto, SizableContainer};
40use timely::dataflow::channels::ContainerBytes;
41use timely::progress::Timestamp;
42use timely::progress::frontier::{Antichain, AntichainRef};
43
44use crate::column_pager::{self, ColumnPager, PagedColumn};
45use crate::columnar::Column;
46use crate::columnar::batcher::{empty_chunk, recycle_chunk};
47
48const STASH_CAP: usize = 2;
63
64const MAX_RECYCLE_BYTES: usize = 1 << 22;
71
72fn recycle_capped<C: Columnar>(chunk: Column<C>, stash: &mut Vec<Column<C>>) {
77 if stash.len() < STASH_CAP && chunk.length_in_bytes() <= MAX_RECYCLE_BYTES {
78 recycle_chunk(chunk, stash);
79 }
80}
81
82pub struct ColumnMergeBatcher<D, T, R>
95where
96 D: Columnar,
97 T: Columnar,
98 R: Columnar,
99{
100 chains: Vec<VecDeque<PagedColumn<(D, T, R)>>>,
101 lower: Antichain<T>,
102 frontier: Antichain<T>,
103 stash: Vec<Column<(D, T, R)>>,
111 pager_override: Option<ColumnPager>,
116 logger: Option<Logger>,
117 operator_id: usize,
118}
119
120impl<D, T, R> ColumnMergeBatcher<D, T, R>
121where
122 D: Columnar,
123 T: Columnar,
124 R: Columnar,
125{
126 pub fn set_pager(&mut self, pager: ColumnPager) {
130 self.pager_override = Some(pager);
131 }
132
133 fn pager(&self) -> ColumnPager {
137 self.pager_override
138 .clone()
139 .unwrap_or_else(column_pager::global_pager)
140 }
141
142 fn chain_push(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>) {
145 self.emit_account(&chain, 1);
146 self.chains.push(chain);
147 }
148
149 fn chain_pop(&mut self) -> Option<VecDeque<PagedColumn<(D, T, R)>>> {
160 let chain = self.chains.pop()?;
161 self.emit_account(&chain, -1);
162 Some(chain)
163 }
164
165 fn emit_account(&self, chain: &VecDeque<PagedColumn<(D, T, R)>>, diff: isize) {
168 let Some(logger) = &self.logger else {
169 return;
170 };
171 let (mut records, mut size, mut capacity, mut allocations) =
172 (0isize, 0isize, 0isize, 0isize);
173 for entry in chain {
174 let (r, s, c, a) = account_chunk(entry);
175 records = records.saturating_add_unsigned(r);
176 size = size.saturating_add_unsigned(s);
177 capacity = capacity.saturating_add_unsigned(c);
178 allocations = allocations.saturating_add_unsigned(a);
179 }
180 logger.log(BatcherEvent {
181 operator: self.operator_id,
182 records_diff: records.saturating_mul(diff),
183 size_diff: size.saturating_mul(diff),
184 capacity_diff: capacity.saturating_mul(diff),
185 allocations_diff: allocations.saturating_mul(diff),
186 });
187 }
188}
189
190impl<D, T, R> Drop for ColumnMergeBatcher<D, T, R>
191where
192 D: Columnar,
193 T: Columnar,
194 R: Columnar,
195{
196 fn drop(&mut self) {
197 while self.chain_pop().is_some() {}
200 }
201}
202
203fn account_chunk<C: Columnar>(entry: &PagedColumn<C>) -> (usize, usize, usize, usize) {
212 match entry {
213 PagedColumn::Resident(col, _) => {
214 let records = usize::try_from(col.record_count()).expect("non-negative");
215 let bytes = col.length_in_bytes();
216 (records, bytes, bytes, 1)
217 }
218 PagedColumn::Paged { .. } | PagedColumn::Compressed { .. } => (0, 0, 0, 0),
219 }
220}
221
222impl<D, T, R> Batcher for ColumnMergeBatcher<D, T, R>
223where
224 D: Columnar,
225 for<'a> columnar::Ref<'a, D>: Copy + Ord,
226 T: Columnar + Default + Timestamp + PartialOrder,
227 for<'a> columnar::Ref<'a, T>: Copy + Ord,
228 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
229 for<'a> columnar::Ref<'a, R>: Ord,
230{
231 type Output = Column<(D, T, R)>;
232 type Time = T;
233
234 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
235 Self {
239 chains: Vec::new(),
240 lower: Antichain::from_elem(T::minimum()),
241 frontier: Antichain::new(),
242 stash: Vec::new(),
243 pager_override: None,
244 logger,
245 operator_id,
246 }
247 }
248
249 fn seal(
250 &mut self,
251 upper: Antichain<Self::Time>,
252 ) -> (Vec<Self::Output>, Description<Self::Time>) {
253 let pager = self.pager();
254 while self.chains.len() > 1 {
256 let a = self.chain_pop().unwrap();
257 let b = self.chain_pop().unwrap();
258 let merged = self.merge_by(a, b);
259 self.chain_push(merged);
260 }
261 let merged = self.chain_pop().unwrap_or_default();
262
263 let mut readied: Vec<Column<(D, T, R)>> = Vec::new();
267 let mut kept_chain: VecDeque<PagedColumn<(D, T, R)>> = VecDeque::new();
268 self.frontier.clear();
269 {
270 let pager = &pager;
271 let frontier = &mut self.frontier;
272 let stash = &mut self.stash;
273 extract_chain(
274 FetchIter::new(merged, pager),
275 upper.borrow(),
276 frontier,
277 |paged| readied.push(pager.take(paged)),
278 |paged| kept_chain.push_back(paged),
279 stash,
280 );
281 }
282
283 if !kept_chain.is_empty() {
284 self.chain_push(kept_chain);
285 }
286
287 let description = Description::new(
288 self.lower.clone(),
289 upper.clone(),
290 Antichain::from_elem(T::minimum()),
291 );
292 self.lower = upper;
293
294 self.stash.clear();
301
302 (readied, description)
303 }
304
305 fn frontier(&mut self) -> AntichainRef<'_, Self::Time> {
306 self.frontier.borrow()
307 }
308}
309
310impl<D, T, R> PushInto<Column<(D, T, R)>> for ColumnMergeBatcher<D, T, R>
311where
312 D: Columnar,
313 for<'a> columnar::Ref<'a, D>: Copy + Ord,
314 T: Columnar + Default + Clone + PartialOrder,
315 for<'a> columnar::Ref<'a, T>: Copy + Ord,
316 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
317{
318 fn push_into(&mut self, mut chunk: Column<(D, T, R)>) {
321 let pager = self.pager();
322 let paged = pager.page(&mut chunk);
323 self.insert_chain(VecDeque::from([paged]));
324 }
325}
326
327impl<D, T, R> ColumnMergeBatcher<D, T, R>
328where
329 D: Columnar,
330 for<'a> columnar::Ref<'a, D>: Copy + Ord,
331 T: Columnar + Default + Clone + PartialOrder,
332 for<'a> columnar::Ref<'a, T>: Copy + Ord,
333 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
334{
335 fn insert_chain(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>) {
338 if chain.is_empty() {
339 return;
340 }
341 self.chain_push(chain);
342 while self.chains.len() > 1
343 && self.chains[self.chains.len() - 1].len()
344 >= self.chains[self.chains.len() - 2].len() / 2
345 {
346 let a = self.chain_pop().unwrap();
347 let b = self.chain_pop().unwrap();
348 let merged = self.merge_by(a, b);
349 self.chain_push(merged);
350 }
351 }
352
353 fn merge_by(
357 &mut self,
358 a: VecDeque<PagedColumn<(D, T, R)>>,
359 b: VecDeque<PagedColumn<(D, T, R)>>,
360 ) -> VecDeque<PagedColumn<(D, T, R)>> {
361 let mut output: VecDeque<PagedColumn<(D, T, R)>> = VecDeque::new();
362 let pager = self.pager();
363 let pager = &pager;
364 let stash = &mut self.stash;
365 merge_chains(
366 FetchIter::new(a, pager),
367 FetchIter::new(b, pager),
368 |paged| output.push_back(paged),
369 stash,
370 );
371 output
372 }
373}
374
375pub struct FetchIter<'a, D, T, R>
381where
382 (D, T, R): Columnar,
383{
384 queue: VecDeque<PagedColumn<(D, T, R)>>,
385 pager: &'a ColumnPager,
386}
387
388impl<'a, D, T, R> FetchIter<'a, D, T, R>
389where
390 (D, T, R): Columnar,
391{
392 pub fn new(queue: VecDeque<PagedColumn<(D, T, R)>>, pager: &'a ColumnPager) -> Self {
394 Self { queue, pager }
395 }
396
397 pub fn pager(&self) -> &'a ColumnPager {
402 self.pager
403 }
404
405 pub fn into_paged(self) -> std::collections::vec_deque::IntoIter<PagedColumn<(D, T, R)>> {
410 self.queue.into_iter()
411 }
412}
413
414impl<D, T, R> Iterator for FetchIter<'_, D, T, R>
415where
416 (D, T, R): Columnar,
417{
418 type Item = Column<(D, T, R)>;
419
420 fn next(&mut self) -> Option<Self::Item> {
421 self.queue.pop_front().map(|p| self.pager.take(p))
422 }
423}
424
425pub fn merge_chains<D, T, R, Sink>(
444 list1: FetchIter<'_, D, T, R>,
445 list2: FetchIter<'_, D, T, R>,
446 mut sink: Sink,
447 stash: &mut Vec<Column<(D, T, R)>>,
448) where
449 D: Columnar,
450 for<'a> columnar::Ref<'a, D>: Copy + Ord,
451 T: Columnar + Default + Clone + PartialOrder,
452 for<'a> columnar::Ref<'a, T>: Copy + Ord,
453 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
454 Sink: FnMut(PagedColumn<(D, T, R)>),
455{
456 let pager = list1.pager();
457 let mut list1 = list1;
458 let mut list2 = list2;
459
460 let mut heads = [
461 list1.next().unwrap_or_default(),
462 list2.next().unwrap_or_default(),
463 ];
464 let mut positions = [0usize, 0usize];
465 let mut result: Column<(D, T, R)> = empty_chunk(stash);
466
467 loop {
468 let upper_l = heads[0].borrow().len();
469 let upper_r = heads[1].borrow().len();
470 if positions[0] >= upper_l || positions[1] >= upper_r {
471 break;
472 }
473
474 let lhs_passthrough = positions[0] == 0 && upper_l > 0 && {
476 let lhs = heads[0].borrow();
477 let rhs = heads[1].borrow();
478 let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1));
479 let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1]));
480 last_l < cur_r
481 };
482 if lhs_passthrough {
483 if !result.is_empty() {
484 sink(pager.page(&mut result));
485 if let Some(reuse) = stash.pop() {
486 result = reuse;
487 }
488 }
489 let mut head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
490 sink(pager.page(&mut head));
491 positions[0] = 0;
492 continue;
493 }
494
495 let rhs_passthrough = positions[1] == 0 && upper_r > 0 && {
496 let lhs = heads[0].borrow();
497 let rhs = heads[1].borrow();
498 let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1));
499 let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0]));
500 last_r < cur_l
501 };
502 if rhs_passthrough {
503 if !result.is_empty() {
504 sink(pager.page(&mut result));
505 if let Some(reuse) = stash.pop() {
506 result = reuse;
507 }
508 }
509 let mut head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
510 sink(pager.page(&mut head));
511 positions[1] = 0;
512 continue;
513 }
514
515 let yielded = result.merge_from(&mut heads, &mut positions);
516
517 if positions[0] >= heads[0].borrow().len() {
518 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
519 recycle_capped(old, stash);
520 positions[0] = 0;
521 }
522 if positions[1] >= heads[1].borrow().len() {
523 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
524 recycle_capped(old, stash);
525 positions[1] = 0;
526 }
527 if yielded || result.at_capacity() {
528 sink(pager.page(&mut result));
529 if let Some(reuse) = stash.pop() {
535 result = reuse;
536 }
537 }
538 }
539
540 drain_side(
544 &mut heads[0],
545 &mut positions[0],
546 list1,
547 &mut result,
548 &mut sink,
549 pager,
550 stash,
551 );
552 drain_side(
553 &mut heads[1],
554 &mut positions[1],
555 list2,
556 &mut result,
557 &mut sink,
558 pager,
559 stash,
560 );
561
562 if !result.is_empty() {
563 sink(pager.page(&mut result));
564 } else {
565 recycle_capped(result, stash);
569 }
570 let [h0, h1] = heads;
574 recycle_capped(h0, stash);
575 recycle_capped(h1, stash);
576}
577
578fn drain_side<D, T, R, Sink>(
582 head: &mut Column<(D, T, R)>,
583 pos: &mut usize,
584 rest: FetchIter<'_, D, T, R>,
585 result: &mut Column<(D, T, R)>,
586 sink: &mut Sink,
587 pager: &ColumnPager,
588 stash: &mut Vec<Column<(D, T, R)>>,
589) where
590 D: Columnar,
591 for<'a> columnar::Ref<'a, D>: Copy + Ord,
592 T: Columnar + Default + Clone + PartialOrder,
593 for<'a> columnar::Ref<'a, T>: Copy + Ord,
594 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
595 Sink: FnMut(PagedColumn<(D, T, R)>),
596{
597 if *pos < head.borrow().len() {
598 let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos));
600 }
601 if !result.is_empty() {
602 sink(pager.page(result));
603 if let Some(reuse) = stash.pop() {
604 *result = reuse;
605 }
606 }
607 for paged in rest.into_paged() {
608 sink(paged);
609 }
610}
611
612pub fn extract_chain<D, T, R, SinkShip, SinkKeep>(
621 merged: FetchIter<'_, D, T, R>,
622 upper: AntichainRef<T>,
623 frontier: &mut Antichain<T>,
624 mut ship: SinkShip,
625 mut keep: SinkKeep,
626 stash: &mut Vec<Column<(D, T, R)>>,
627) where
628 D: Columnar,
629 for<'a> columnar::Ref<'a, D>: Copy + Ord,
630 T: Columnar + Default + Clone + PartialOrder,
631 for<'a> columnar::Ref<'a, T>: Copy + Ord,
632 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
633 SinkShip: FnMut(PagedColumn<(D, T, R)>),
634 SinkKeep: FnMut(PagedColumn<(D, T, R)>),
635{
636 let pager = merged.pager();
637 let mut keep_buf: Column<(D, T, R)> = empty_chunk(stash);
638 let mut ship_buf: Column<(D, T, R)> = empty_chunk(stash);
639
640 for mut buffer in merged {
641 let mut position = 0;
642 let len = buffer.borrow().len();
643 while position < len {
644 buffer.extract(&mut position, upper, frontier, &mut keep_buf, &mut ship_buf);
645 if keep_buf.at_capacity() {
646 keep(pager.page(&mut keep_buf));
647 if let Some(reuse) = stash.pop() {
648 keep_buf = reuse;
649 }
650 }
651 if ship_buf.at_capacity() {
652 ship(pager.page(&mut ship_buf));
653 if let Some(reuse) = stash.pop() {
654 ship_buf = reuse;
655 }
656 }
657 }
658 recycle_capped(buffer, stash);
660 }
661 if !keep_buf.is_empty() {
662 keep(pager.page(&mut keep_buf));
663 } else {
664 recycle_capped(keep_buf, stash);
665 }
666 if !ship_buf.is_empty() {
667 ship(pager.page(&mut ship_buf));
668 } else {
669 recycle_capped(ship_buf, stash);
670 }
671}
672
673#[cfg(test)]
674#[allow(clippy::clone_on_ref_ptr)]
675mod tests {
676 use std::sync::Arc;
677
678 use columnar::Index;
679 use timely::container::PushInto as _;
680
681 use super::*;
682 use crate::column_pager::{PageDecision, PageEvent, PageHint, PagingPolicy};
683
684 type KvUpdate = ((u64, u64), u64, i64);
687
688 fn col(rows: &[KvUpdate]) -> Column<KvUpdate> {
689 let mut c: Column<KvUpdate> = Default::default();
690 for &t in rows {
691 c.push_into(t);
692 }
693 c
694 }
695
696 fn collect_pc(chunks: &[PagedColumn<KvUpdate>], pager: &ColumnPager) -> Vec<KvUpdate> {
697 chunks
700 .iter()
701 .flat_map(|p| {
702 let view: Column<KvUpdate> = match p {
703 PagedColumn::Resident(c, _) => clone_column(c),
704 _ => pager.take(clone_paged(p)),
705 };
706 collect_column(&view).into_iter()
707 })
708 .collect()
709 }
710
711 fn collect_column(c: &Column<KvUpdate>) -> Vec<KvUpdate> {
712 c.borrow()
713 .into_index_iter()
714 .map(|((k, v), t, r)| {
715 (
716 (u64::into_owned(k), u64::into_owned(v)),
717 u64::into_owned(t),
718 i64::into_owned(r),
719 )
720 })
721 .collect()
722 }
723
724 fn clone_column(c: &Column<KvUpdate>) -> Column<KvUpdate> {
725 c.clone()
729 }
730
731 fn clone_paged(p: &PagedColumn<KvUpdate>) -> PagedColumn<KvUpdate> {
735 match p {
736 PagedColumn::Resident(c, _) => {
737 let mut c = c.clone();
739 ColumnPager::disabled().page(&mut c)
740 }
741 _ => panic!("clone_paged only supports Resident"),
744 }
745 }
746
747 struct ForcePagePolicy {
750 out: std::sync::atomic::AtomicUsize,
751 r#in: std::sync::atomic::AtomicUsize,
752 }
753 impl ForcePagePolicy {
754 fn new() -> Arc<Self> {
755 Arc::new(Self {
756 out: std::sync::atomic::AtomicUsize::new(0),
757 r#in: std::sync::atomic::AtomicUsize::new(0),
758 })
759 }
760 }
761 impl PagingPolicy for ForcePagePolicy {
762 fn decide(&self, _hint: PageHint) -> PageDecision {
763 PageDecision::Page {
764 backend: mz_ore::pager::Backend::Swap,
765 codec: None,
766 }
767 }
768 fn record(&self, event: PageEvent) {
769 use std::sync::atomic::Ordering;
770 match event {
771 PageEvent::PagedOut { .. } => {
772 self.out.fetch_add(1, Ordering::Relaxed);
773 }
774 PageEvent::PagedIn { .. } => {
775 self.r#in.fetch_add(1, Ordering::Relaxed);
776 }
777 _ => {}
778 }
779 }
780 }
781
782 fn to_chain(
784 cols: Vec<Column<KvUpdate>>,
785 pager: &ColumnPager,
786 ) -> VecDeque<PagedColumn<KvUpdate>> {
787 cols.into_iter().map(|mut c| pager.page(&mut c)).collect()
788 }
789
790 fn drive_merge(chain1: Vec<Column<KvUpdate>>, chain2: Vec<Column<KvUpdate>>) -> Vec<KvUpdate> {
792 let pager = ColumnPager::disabled();
793 let q1 = to_chain(chain1, &pager);
794 let q2 = to_chain(chain2, &pager);
795 let mut output: Vec<PagedColumn<KvUpdate>> = Vec::new();
796 let mut stash: Vec<Column<KvUpdate>> = Vec::new();
797 merge_chains(
798 FetchIter::new(q1, &pager),
799 FetchIter::new(q2, &pager),
800 |paged| output.push(paged),
801 &mut stash,
802 );
803 collect_pc(&output, &pager)
804 }
805
806 #[mz_ore::test]
812 fn merge_chains_disjoint_ranges() {
813 let out = drive_merge(
814 vec![
815 col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]),
816 col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]),
817 ],
818 vec![
819 col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]),
820 col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]),
821 ],
822 );
823 let expected: Vec<_> = (0..4u64)
824 .map(|d| ((d, 0u64), 0u64, 1i64))
825 .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64)))
826 .collect();
827 assert_eq!(out, expected);
828 }
829
830 #[mz_ore::test]
832 fn merge_chains_interleaved() {
833 let out = drive_merge(
834 vec![
835 col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]),
836 col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]),
837 ],
838 vec![
839 col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]),
840 col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]),
841 ],
842 );
843 let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
844 assert_eq!(out, expected);
845 }
846
847 #[mz_ore::test]
851 fn merge_chains_equal_boundary() {
852 let out = drive_merge(
853 vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])],
854 vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])],
855 );
856 assert_eq!(out, vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]);
857 }
858
859 #[mz_ore::test]
862 fn merge_chains_force_paged_round_trip() {
863 let policy = ForcePagePolicy::new();
864 let pager = ColumnPager::new(policy.clone());
865 let q1 = to_chain(vec![col(&[((0, 0), 0, 1), ((2, 0), 0, 1)])], &pager);
866 let q2 = to_chain(vec![col(&[((1, 0), 0, 1), ((3, 0), 0, 1)])], &pager);
867
868 assert!(matches!(q1.front().unwrap(), PagedColumn::Paged { .. }));
870 assert!(matches!(q2.front().unwrap(), PagedColumn::Paged { .. }));
871
872 let mut output: Vec<PagedColumn<KvUpdate>> = Vec::new();
873 let mut stash: Vec<Column<KvUpdate>> = Vec::new();
874 merge_chains(
875 FetchIter::new(q1, &pager),
876 FetchIter::new(q2, &pager),
877 |paged| output.push(paged),
878 &mut stash,
879 );
880
881 for p in &output {
883 assert!(matches!(p, PagedColumn::Paged { .. }));
884 }
885
886 let mut collected = Vec::new();
888 for p in output {
889 let c = pager.take(p);
890 collected.extend(collect_column(&c));
891 }
892 let expected: Vec<_> = (0..4u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
893 assert_eq!(collected, expected);
894 }
895
896 #[mz_ore::test]
899 fn extract_chain_partitions_by_frontier() {
900 let pager = ColumnPager::disabled();
901 let data = vec![
902 ((0, 0), 0u64, 1i64),
903 ((1, 0), 1, 1),
904 ((2, 0), 2, 1),
905 ((3, 0), 3, 1),
906 ];
907 let chain = to_chain(vec![col(&data)], &pager);
908 let upper = Antichain::from_elem(2u64);
909 let mut frontier: Antichain<u64> = Antichain::new();
910 let mut ship: Vec<PagedColumn<KvUpdate>> = Vec::new();
911 let mut keep: Vec<PagedColumn<KvUpdate>> = Vec::new();
912 let mut stash: Vec<Column<KvUpdate>> = Vec::new();
913
914 extract_chain(
915 FetchIter::new(chain, &pager),
916 upper.borrow(),
917 &mut frontier,
918 |p| ship.push(p),
919 |p| keep.push(p),
920 &mut stash,
921 );
922
923 let shipped = collect_pc(&ship, &pager);
924 let kept = collect_pc(&keep, &pager);
925 for (_, t, _) in &shipped {
926 assert!(*t < 2, "shipped time {t} should be < upper");
927 }
928 for (_, t, _) in &kept {
929 assert!(*t >= 2, "kept time {t} should be >= upper");
930 }
931 assert_eq!(shipped.len() + kept.len(), data.len());
932 }
933
934 #[mz_ore::test]
937 fn batcher_seal_round_trip() {
938 let mut b: ColumnMergeBatcher<(u64, u64), u64, i64> =
939 differential_dataflow::trace::Batcher::new(None, 0);
940 let input1 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1), ((3, 0), 0, 1)]);
944 let input2 = col(&[((2, 0), 0, 2), ((4, 0), 0, 1)]);
945 b.push_into(input1);
946 b.push_into(input2);
947
948 let upper = Antichain::from_elem(u64::MAX);
950 let (chain, _description) = differential_dataflow::trace::Batcher::seal(&mut b, upper);
951 let out: Vec<KvUpdate> = chain.iter().flat_map(collect_column).collect();
952
953 let mut expected = vec![
955 ((1u64, 1u64), 0u64, 1i64),
956 ((2, 0), 0, 3),
957 ((3, 0), 0, 1),
958 ((4, 0), 0, 1),
959 ];
960 expected.sort();
961 let mut out_sorted = out.clone();
962 out_sorted.sort();
963 assert_eq!(out_sorted, expected);
964 }
965
966 #[mz_ore::test]
967 fn account_chunk_resident_vs_paged() {
968 let policy = ForcePagePolicy::new();
969 let pager_paged = ColumnPager::new(policy.clone());
970 let pager_res = ColumnPager::disabled();
971
972 let mut c1 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1), ((3, 0), 0, 1)]);
973 let resident = pager_res.page(&mut c1);
974 let (records, size, capacity, allocations) = account_chunk(&resident);
975 assert_eq!(records, 3);
976 assert!(size > 0);
977 assert_eq!(size, capacity);
978 assert_eq!(allocations, 1);
979
980 let mut c2 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1)]);
981 let paged = pager_paged.page(&mut c2);
982 assert!(matches!(paged, PagedColumn::Paged { .. }));
983 assert_eq!(account_chunk(&paged), (0, 0, 0, 0));
985 }
986
987 #[mz_ore::test]
988 fn batcher_seal_keeps_kept_chain_paged() {
989 let policy = ForcePagePolicy::new();
992 let pager = ColumnPager::new(policy.clone());
993
994 let mut b: ColumnMergeBatcher<(u64, u64), u64, i64> =
995 differential_dataflow::trace::Batcher::new(None, 0);
996 b.set_pager(pager);
997
998 let n: u64 = 200;
1001 for i in 0..n {
1002 let input = col(&[((i, 0), i % 10, 1)]);
1003 b.push_into(input);
1004 }
1005 let upper = Antichain::from_elem(5u64);
1006 let _ = differential_dataflow::trace::Batcher::seal(&mut b, upper);
1007
1008 let kept_records: usize = b
1010 .chains
1011 .iter()
1012 .flat_map(|c| c.iter())
1013 .map(|p| match p {
1014 PagedColumn::Paged { meta, .. } => {
1015 let _ = meta;
1018 1
1019 }
1020 PagedColumn::Compressed { meta, .. } => {
1021 let _ = meta;
1022 1
1023 }
1024 PagedColumn::Resident(_, _) => {
1025 panic!("kept chain entry was Resident under ForcePagePolicy");
1026 }
1027 })
1028 .sum();
1029 assert!(kept_records > 0, "expected at least one kept paged entry");
1031 assert!(policy.out.load(std::sync::atomic::Ordering::Relaxed) > 0);
1032 let _ = n;
1033 }
1034}