mz_timely_util/columnar/batcher.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types for consolidating, merging, and extracting columnar update collections.
17
18use std::collections::VecDeque;
19use std::marker::PhantomData;
20
21use crate::columnation::ColumnationStack;
22use columnar::Container as _;
23use columnar::Push as _;
24use columnar::{Clear, Columnar, Index, Len};
25use columnation::Columnation;
26use differential_dataflow::difference::Semigroup;
27use differential_dataflow::trace::implementations::merge_batcher::Merger;
28use timely::Accountable;
29use timely::Container;
30use timely::PartialOrder;
31use timely::container::{ContainerBuilder, PushInto, SizableContainer};
32use timely::progress::frontier::{Antichain, AntichainRef};
33
34use crate::columnar::Column;
35
36/// A chunker to transform input data into sorted columns.
37#[derive(Default)]
38pub struct Chunker<C> {
39 /// Buffer into which we'll consolidate.
40 ///
41 /// Also the buffer where we'll stage responses to `extract` and `finish`.
42 /// When these calls return, the buffer is available for reuse.
43 target: C,
44 /// Consolidated buffers ready to go.
45 ready: VecDeque<C>,
46}
47
48impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
49 type Container = C;
50
51 fn extract(&mut self) -> Option<&mut Self::Container> {
52 if let Some(ready) = self.ready.pop_front() {
53 self.target = ready;
54 Some(&mut self.target)
55 } else {
56 None
57 }
58 }
59
60 fn finish(&mut self) -> Option<&mut Self::Container> {
61 self.extract()
62 }
63}
64
65impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for Chunker<ColumnationStack<(D, T, R)>>
66where
67 D: Columnar + Columnation,
68 for<'b> columnar::Ref<'b, D>: Ord + Copy,
69 T: Columnar + Columnation,
70 for<'b> columnar::Ref<'b, T>: Ord + Copy,
71 R: Columnar + Columnation + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
72 for<'b> columnar::Ref<'b, R>: Ord,
73{
74 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
75 // Sort input data
76 // TODO: consider `Vec<usize>` that we retain, containing indexes.
77 let borrowed = container.borrow();
78 let mut permutation = Vec::with_capacity(borrowed.len());
79 Extend::extend(&mut permutation, borrowed.into_index_iter());
80 permutation.sort();
81
82 self.target.clear();
83 // Iterate over the data, accumulating diffs for like keys.
84 let mut iter = permutation.drain(..);
85 if let Some((data, time, diff)) = iter.next() {
86 let mut owned_data = D::into_owned(data);
87 let mut owned_time = T::into_owned(time);
88
89 let mut prev_data = data;
90 let mut prev_time = time;
91 let mut prev_diff = <R as Columnar>::into_owned(diff);
92
93 for (data, time, diff) in iter {
94 if (&prev_data, &prev_time) == (&data, &time) {
95 prev_diff.plus_equals(&diff);
96 } else {
97 if !prev_diff.is_zero() {
98 D::copy_from(&mut owned_data, prev_data);
99 T::copy_from(&mut owned_time, prev_time);
100 let tuple = (owned_data, owned_time, prev_diff);
101 self.target.push_into(&tuple);
102 (owned_data, owned_time, prev_diff) = tuple;
103 }
104 prev_data = data;
105 prev_time = time;
106 R::copy_from(&mut prev_diff, diff);
107 }
108 }
109
110 if !prev_diff.is_zero() {
111 D::copy_from(&mut owned_data, prev_data);
112 T::copy_from(&mut owned_time, prev_time);
113 let tuple = (owned_data, owned_time, prev_diff);
114 self.target.push_into(&tuple);
115 }
116 }
117
118 if !self.target.is_empty() {
119 self.ready.push_back(std::mem::take(&mut self.target));
120 }
121 }
122}
123
124/// A chunker that consolidates `Column<(D, T, R)>` updates into sorted `Column`
125/// chunks, without round-tripping through columnation.
126///
127/// Drop-in counterpart to [`Chunker`] for the merge-batcher path: same control
128/// flow (sort borrowed refs, fold equal `(data, time)` runs, drop zero diffs),
129/// but the consolidated output stays in [`Column`].
130pub struct ColumnChunker<U: Columnar> {
131 /// Container we consolidate into and present to extract/finish callers.
132 /// Always `Column::Typed` between calls so we can push into it.
133 target: Column<U>,
134 /// Sorted, consolidated chunks pending extraction.
135 ready: VecDeque<Column<U>>,
136}
137
138// Manual impl rather than `#[derive(Default)]`: the derive would synthesize
139// `impl<U: Columnar + Default>`, but `Column<U>: Default` only requires
140// `U: Columnar`, and adding a spurious `U: Default` bound would propagate
141// through every `ContainerBuilder for ColumnChunker<U>` impl.
142impl<U: Columnar> Default for ColumnChunker<U> {
143 fn default() -> Self {
144 Self {
145 target: Column::default(),
146 ready: VecDeque::new(),
147 }
148 }
149}
150
151impl<U: Columnar> ContainerBuilder for ColumnChunker<U>
152where
153 U::Container: Clone + 'static,
154{
155 type Container = Column<U>;
156
157 fn extract(&mut self) -> Option<&mut Self::Container> {
158 if let Some(ready) = self.ready.pop_front() {
159 self.target = ready;
160 Some(&mut self.target)
161 } else {
162 None
163 }
164 }
165
166 fn finish(&mut self) -> Option<&mut Self::Container> {
167 self.extract()
168 }
169}
170
171impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for ColumnChunker<(D, T, R)>
172where
173 D: Columnar,
174 for<'b> columnar::Ref<'b, D>: Copy + Ord,
175 T: Columnar,
176 for<'b> columnar::Ref<'b, T>: Copy + Ord,
177 R: Columnar + Default + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
178 for<'b> columnar::Ref<'b, R>: Ord,
179 for<'b> <D as Columnar>::Container: columnar::Push<columnar::Ref<'b, D>>,
180 for<'b> <T as Columnar>::Container: columnar::Push<columnar::Ref<'b, T>>,
181 for<'b> <R as Columnar>::Container: columnar::Push<&'b R>,
182{
183 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
184 // Reset target to an empty owned container. If it's already `Typed`
185 // (steady state, possibly recycling a chunk just handed back via
186 // `extract`), clear in place to reuse buffer allocations. Otherwise
187 // start fresh — the bytes/align variants don't support push.
188 match &mut self.target {
189 Column::Typed(c) => c.clear(),
190 Column::Bytes(_) | Column::Align(_) => {
191 self.target = Column::Typed(Default::default());
192 }
193 }
194
195 // Sort input by columnar ref order.
196 let borrowed = container.borrow();
197 let mut permutation = Vec::with_capacity(borrowed.len());
198 Extend::extend(&mut permutation, borrowed.into_index_iter());
199 permutation.sort();
200
201 // Sweep sorted refs, accumulating diffs over equal `(data, time)`
202 // pairs and pushing non-zero results to the target's leaves. Refs
203 // from the input borrow are valid through the sweep, so D and T
204 // are pushed directly via each leaf's `Push<Ref<_>>` impl. Only R
205 // needs an owned scratch since it carries the consolidated sum.
206 {
207 let Column::Typed(target_c) = &mut self.target else {
208 unreachable!("target reset to Typed above");
209 };
210 let (target_d, target_t, target_r) = target_c;
211
212 let mut iter = permutation.drain(..);
213 if let Some((data, time, diff)) = iter.next() {
214 let mut prev_data = data;
215 let mut prev_time = time;
216 let mut prev_diff = <R as Columnar>::into_owned(diff);
217
218 for (data, time, diff) in iter {
219 if (&prev_data, &prev_time) == (&data, &time) {
220 prev_diff.plus_equals(&diff);
221 } else {
222 if !prev_diff.is_zero() {
223 target_d.push(prev_data);
224 target_t.push(prev_time);
225 target_r.push(&prev_diff);
226 }
227 prev_data = data;
228 prev_time = time;
229 R::copy_from(&mut prev_diff, diff);
230 }
231 }
232
233 if !prev_diff.is_zero() {
234 target_d.push(prev_data);
235 target_t.push(prev_time);
236 target_r.push(&prev_diff);
237 }
238 }
239 }
240
241 if self.target.borrow().len() > 0 {
242 let chunk = std::mem::replace(&mut self.target, Column::Typed(Default::default()));
243 self.ready.push_back(chunk);
244 }
245 }
246}
247
248/// Advance `*lower` past every position in `[*lower, upper)` where `cmp`
249/// returns true.
250///
251/// On return, `*lower` is the first index `>= initial *lower` where `cmp`
252/// returns false, or `upper` if `cmp` holds through the end.
253///
254/// Takes the predicate as `FnMut(usize) -> bool` rather than a value-bearing
255/// closure so callers can index whichever subset of the input columns they
256/// actually need to compare — for the merger's `(d, t)`-keyed sort, this lets
257/// each probe touch only the D and T leaf views, skipping the diff column.
258///
259/// Compared to a linear scan, this is `O(log K)` for a run of length `K`
260/// satisfying `cmp` — useful when one side of a sorted merge has long runs
261/// dominated by the other side.
262fn gallop(upper: usize, lower: &mut usize, mut cmp: impl FnMut(usize) -> bool) {
263 // If `cmp` is already false at `*lower`, the run is empty — nothing to do.
264 if *lower < upper && cmp(*lower) {
265 // Phase 1 (overshoot): advance by exponentially growing steps as long
266 // as `cmp` holds. After this loop, `*lower` is the last position we
267 // confirmed satisfies `cmp`, and `*lower + step` either falls off the
268 // end or fails `cmp`. The boundary is somewhere in `(*lower, *lower +
269 // step]`.
270 let mut step = 1;
271 while *lower + step < upper && cmp(*lower + step) {
272 *lower += step;
273 step <<= 1;
274 }
275
276 // Phase 2 (binary descent): halve `step` and probe `*lower + step`,
277 // accepting the advance only when `cmp` still holds. This narrows the
278 // search range by half each iteration, settling on the largest index
279 // still satisfying `cmp`.
280 step >>= 1;
281 while step > 0 {
282 if *lower + step < upper && cmp(*lower + step) {
283 *lower += step;
284 }
285 step >>= 1;
286 }
287
288 // `*lower` now points at the last index where `cmp` holds; the caller
289 // wants the first index where it doesn't, so step past it.
290 *lower += 1;
291 }
292}
293
294/// Counterpart to `ColInternalMerger` (which merges `ColumnationStack` chunks).
295/// Drives the merge batcher with [`Column`]-shaped chunks, no columnation
296/// detour, by way of the inherent `merge_from` / `extract` methods on
297/// `Column<(D, T, R)>` below.
298pub struct ColumnMerger<D, T, R> {
299 _marker: PhantomData<(D, T, R)>,
300}
301
302impl<D, T, R> Default for ColumnMerger<D, T, R> {
303 fn default() -> Self {
304 Self {
305 _marker: PhantomData,
306 }
307 }
308}
309
310/// Per-chunk merge and extract for [`Column`]-shaped sorted chunks.
311///
312/// These are the building blocks that [`Merger for ColumnMerger`] orchestrates
313/// over chains of chunks. They're inherent methods rather than a trait impl
314/// so the merger can call them without going through any wrapper indirection.
315impl<D, T, R> Column<(D, T, R)>
316where
317 D: Columnar,
318 for<'a> columnar::Ref<'a, D>: Copy + Ord,
319 T: Columnar + Default + Clone + PartialOrder,
320 for<'a> columnar::Ref<'a, T>: Copy + Ord,
321 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
322{
323 /// Merge items from sorted inputs into `self`, advancing positions.
324 ///
325 /// Mirrors the dispatch shape used by the merge-batcher framework:
326 /// - **0**: no-op
327 /// - **1**: bulk copy (or swap, if `self` is empty and `*pos == 0`)
328 /// - **2**: merge two sorted streams, with diff consolidation on equal
329 /// `(data, time)` keys and gallop bulk-copy of long single-side runs.
330 ///
331 /// Returns `true` if the merge stopped because the amortized ship-threshold
332 /// check inside the inner loop fired (the caller should ship `self` before
333 /// the next call). Returns `false` if the merge stopped because at least
334 /// one input was exhausted at its position (the caller should refill that
335 /// side; `self` may still be at capacity from accumulation across short
336 /// calls and the caller should also check `at_capacity` in that case).
337 ///
338 /// The 0- and 1-input dispatches always return `false`: 0 does no work,
339 /// 1 is a bulk copy or swap that runs to completion.
340 #[must_use]
341 pub fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) -> bool {
342 match others.len() {
343 0 => false,
344 1 => {
345 let other = &mut others[0];
346 let pos = &mut positions[0];
347 // If `self` is empty and `*pos == 0`, we can bulk swap in the other chunk.
348 if self.is_empty() && *pos == 0 {
349 std::mem::swap(self, other);
350 return false;
351 }
352 // Otherwise, bulk copy the remaining data from `other[*pos..]` into `self`.
353 let Column::Typed(self_c) = self else {
354 unreachable!("merger chunks are always Column::Typed");
355 };
356 let src_c = other.borrow();
357 self_c.extend_from_self(src_c, *pos..other.borrow().len());
358 *pos = other.borrow().len();
359 false
360 }
361 2 => {
362 let (left, right) = others.split_at(1);
363 let (left_pos, right_pos) = positions.split_at_mut(1);
364 let left_borrow = left[0].borrow();
365 let right_borrow = right[0].borrow();
366
367 let Column::Typed(self_c) = self else {
368 unreachable!("merger chunks are always Column::Typed");
369 };
370
371 // Split the input borrows into per-leaf views.
372 //
373 // A columnar tuple `Borrow::Ref` is recursive: indexing the
374 // tuple borrow walks every leaf (and reconstructs the nested
375 // ref tuple) regardless of which leaves the caller actually
376 // reads. Indexing each leaf view directly cuts probe-path
377 // work to the columns we consult — for the merge step
378 // that's the `(D, T)` key, with the diff column read only
379 // when we push.
380 let l_d = left_borrow.0;
381 let l_t = left_borrow.1;
382 let l_r = left_borrow.2;
383 let r_d = right_borrow.0;
384 let r_t = right_borrow.1;
385 let r_r = right_borrow.2;
386 let upper_l = l_d.len();
387 let upper_r = r_d.len();
388
389 // Mirror the split on the output container. Tuple
390 // containers split into per-leaf containers, which lets us
391 // address each leaf independently — both gallop bulk-copies
392 // and single-record pushes resolve to a primitive operation
393 // per leaf. The leaves stay length-synchronized as long as
394 // every record path pushes exactly one element to each.
395 let (sd, st, sr) = self_c;
396
397 // Pre-size each output leaf for the worst-case merge
398 // (no consolidation): `len(left) + len(right)` records.
399 // `reserve_for` walks each input's `as_bytes`, which is
400 // accurate for variable-length leaves (where reserving a
401 // record count wouldn't size the byte buffer correctly).
402 //
403 // Gated by record count: above a few hundred thousand
404 // records the input bound over-reserves any time
405 // consolidation is heavy, and the framework's outer
406 // ship-threshold check yields us before we'd use the
407 // headroom. For inputs past that point, geometric grow
408 // is bounded by 2× the actual output and avoids
409 // committing pages we'd never touch.
410 const RESERVE_RECORD_THRESHOLD: usize = 1_000_000;
411 if upper_l + upper_r <= RESERVE_RECORD_THRESHOLD {
412 use columnar::Container as _;
413 let inputs = [left_borrow, right_borrow];
414 sd.reserve_for(inputs.iter().map(|b| b.0));
415 st.reserve_for(inputs.iter().map(|b| b.1));
416 sr.reserve_for(inputs.iter().map(|b| b.2));
417 }
418
419 let mut stash = R::default();
420
421 // Mid-merge ship-threshold check, matching the heuristic
422 // used by `Column::at_capacity` and `ColumnBuilder`. The
423 // tuple `(sd.borrow(), st.borrow(), sr.borrow())` chains
424 // its leaves' `as_bytes` iterators, so passing it to
425 // `at_serialized_capacity` reuses the canonical
426 // `indexed::length_in_words` formula without needing the
427 // parent borrow we destructured.
428 //
429 // The check walks every leaf slice once per call, which
430 // is non-trivial on variable-length leaves; the caller
431 // runs it every `THRESHOLD_PERIOD_MASK + 1` iterations
432 // rather than per-iter. The ship threshold is ~65 K
433 // records, so overshooting by ~1 K records before the
434 // check fires has no practical impact — the framework's
435 // outer `at_capacity` check sees the oversize chunk and
436 // ships it regardless.
437 let at_ship_threshold =
438 |sd: &D::Container, st: &T::Container, sr: &R::Container| {
439 use columnar::Borrow as _;
440 crate::columnar::at_serialized_capacity(&(
441 sd.borrow(),
442 st.borrow(),
443 sr.borrow(),
444 ))
445 };
446 const THRESHOLD_PERIOD_MASK: u32 = 1023;
447 let mut iter: u32 = 0;
448 let mut yielded = false;
449
450 while left_pos[0] < upper_l && right_pos[0] < upper_r {
451 let d1 = l_d.get(left_pos[0]);
452 let t1 = l_t.get(left_pos[0]);
453 let d2 = r_d.get(right_pos[0]);
454 let t2 = r_t.get(right_pos[0]);
455 match (d1, t1).cmp(&(d2, t2)) {
456 std::cmp::Ordering::Less => {
457 // Common case (interleaved data): single-record
458 // advance. Skip the gallop call entirely — its
459 // setup plus the first cmp probe is more
460 // expensive than just pushing this record and
461 // re-entering the outer loop. Galloping is only
462 // worthwhile when there's an actual run, which
463 // we detect with the peek check below.
464 sd.push(d1);
465 st.push(t1);
466 sr.push(l_r.get(left_pos[0]));
467 left_pos[0] += 1;
468 // Long-run case: peek at the next record; if
469 // it's still strictly less than `(d2, t2)`,
470 // we have a run worth galloping (and bulk-
471 // copying).
472 if left_pos[0] < upper_l
473 && (l_d.get(left_pos[0]), l_t.get(left_pos[0])) < (d2, t2)
474 {
475 let start = left_pos[0];
476 gallop(upper_l, &mut left_pos[0], |i| {
477 (l_d.get(i), l_t.get(i)) < (d2, t2)
478 });
479 // Per-leaf bulk copy of the run. Each
480 // call resolves to an `extend_from_slice`
481 // on its leaf (recursively for nested
482 // leaves), which the compiler can
483 // autovectorize.
484 sd.extend_from_self(l_d, start..left_pos[0]);
485 st.extend_from_self(l_t, start..left_pos[0]);
486 sr.extend_from_self(l_r, start..left_pos[0]);
487 }
488 }
489 std::cmp::Ordering::Greater => {
490 // Symmetric on the right side.
491 sd.push(d2);
492 st.push(t2);
493 sr.push(r_r.get(right_pos[0]));
494 right_pos[0] += 1;
495 if right_pos[0] < upper_r
496 && (r_d.get(right_pos[0]), r_t.get(right_pos[0])) < (d1, t1)
497 {
498 let start = right_pos[0];
499 gallop(upper_r, &mut right_pos[0], |i| {
500 (r_d.get(i), r_t.get(i)) < (d1, t1)
501 });
502 sd.extend_from_self(r_d, start..right_pos[0]);
503 st.extend_from_self(r_t, start..right_pos[0]);
504 sr.extend_from_self(r_r, start..right_pos[0]);
505 }
506 }
507 std::cmp::Ordering::Equal => {
508 let r1 = l_r.get(left_pos[0]);
509 let r2 = r_r.get(right_pos[0]);
510 R::copy_from(&mut stash, r1);
511 stash.plus_equals(&r2);
512 if !stash.is_zero() {
513 sd.push(d1);
514 st.push(t1);
515 sr.push(&stash);
516 }
517 left_pos[0] += 1;
518 right_pos[0] += 1;
519 }
520 }
521
522 // Amortized ship-threshold check; see comment above
523 // `at_ship_threshold` for rationale.
524 iter = iter.wrapping_add(1);
525 if iter & THRESHOLD_PERIOD_MASK == 0 && at_ship_threshold(sd, st, sr) {
526 yielded = true;
527 break;
528 }
529 }
530 yielded
531 }
532 // `Merger::merge` only ever calls `merge_from` with 0/1/2-input
533 // slices (k-way merge isn't part of the merge-batcher contract).
534 // Defensive guard: if someone bumps that, this will panic
535 // immediately rather than silently produce wrong output.
536 n => unreachable!("merge_from called with {n} inputs; expected 0, 1, or 2"),
537 }
538 }
539
540 /// Partition records starting at `*position` into `keep` (times beyond
541 /// `upper`, retained for the next round) and `ship` (times not beyond
542 /// `upper`, sealed into the output batch). Updates `frontier` with the
543 /// times of kept records.
544 ///
545 /// The caller invokes `extract` repeatedly until `*position >= self.len()`,
546 /// swapping out a full output buffer between calls. This shape exists
547 /// because the framework only checks `at_capacity()` between calls, so
548 /// without an inner-loop yield a single call could quietly produce
549 /// oversized output chunks.
550 pub fn extract(
551 &mut self,
552 position: &mut usize,
553 upper: AntichainRef<T>,
554 frontier: &mut Antichain<T>,
555 keep: &mut Self,
556 ship: &mut Self,
557 ) {
558 let Column::Typed(keep_c) = keep else {
559 unreachable!("merger chunks are always Column::Typed");
560 };
561 let Column::Typed(ship_c) = ship else {
562 unreachable!("merger chunks are always Column::Typed");
563 };
564
565 let self_view = self.borrow();
566 let len = self_view.len();
567
568 // Yield to the framework when either output buffer reaches the
569 // ship threshold, so it can ship a full chunk and hand back a
570 // fresh one. Required by the merger's extract contract: the
571 // framework only checks `at_capacity` between calls, so without
572 // an inner-loop yield a single call can fill an output well past
573 // threshold.
574 use columnar::Borrow as _;
575 let mut owned_t = T::default();
576 while *position < len
577 && !crate::columnar::at_serialized_capacity(&keep_c.borrow())
578 && !crate::columnar::at_serialized_capacity(&ship_c.borrow())
579 {
580 let (_, time, _) = self_view.get(*position);
581 T::copy_from(&mut owned_t, time);
582 if upper.less_equal(&owned_t) {
583 // `insert_with` only clones when the time isn't already
584 // present in the antichain.
585 frontier.insert_with(&owned_t, |t| t.clone());
586 keep_c.extend_from_self(self_view, *position..*position + 1);
587 } else {
588 ship_c.extend_from_self(self_view, *position..*position + 1);
589 }
590 *position += 1;
591 }
592 }
593}
594
595/// `Merger` impl driving [`MergeBatcher`] over [`Column`]-shaped chunks.
596///
597/// `merge` walks two sorted chains of chunks in lockstep, calling
598/// `Column::merge_from` to consume up to one ship-threshold's worth of input
599/// per pass and shipping `result` to `output` whenever it crosses
600/// `at_capacity`. Exhausted input chunks are reset and pushed to `stash` for
601/// reuse. The drain phase appends remaining full chunks to `output`
602/// directly, with no per-element copy.
603///
604/// `extract` walks each chunk via `Column::extract`, partitioning records
605/// into `kept` (times beyond `upper`) and `ship` (sealed into the output
606/// batch); both grow chunk-by-chunk under the same `at_capacity` ship
607/// signal.
608///
609/// [`MergeBatcher`]: differential_dataflow::trace::implementations::merge_batcher::MergeBatcher
610impl<D, T, R> Merger for ColumnMerger<D, T, R>
611where
612 D: Columnar,
613 for<'a> columnar::Ref<'a, D>: Copy + Ord,
614 T: Columnar + Default + Clone + Ord + PartialOrder,
615 for<'a> columnar::Ref<'a, T>: Copy + Ord,
616 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
617{
618 type Time = T;
619 type Chunk = Column<(D, T, R)>;
620
621 fn merge(
622 &mut self,
623 list1: Vec<Self::Chunk>,
624 list2: Vec<Self::Chunk>,
625 output: &mut Vec<Self::Chunk>,
626 stash: &mut Vec<Self::Chunk>,
627 ) {
628 let mut list1 = list1.into_iter();
629 let mut list2 = list2.into_iter();
630
631 let mut heads = [
632 list1.next().unwrap_or_default(),
633 list2.next().unwrap_or_default(),
634 ];
635 let mut positions = [0usize, 0usize];
636
637 let mut result = empty_chunk(stash);
638
639 // Main merge loop: both sides have data.
640 loop {
641 let upper_l = heads[0].borrow().len();
642 let upper_r = heads[1].borrow().len();
643 if positions[0] >= upper_l || positions[1] >= upper_r {
644 break;
645 }
646
647 // Whole-chunk passthrough fast path. When one head's tail (from
648 // its current position) is sortable-before the other head's
649 // current record, the entire tail can be appended to `output`
650 // without per-record compares or per-leaf byte copies.
651 //
652 // Two probes (one record from each side) settle this — when it
653 // fires, it skips an entire `merge_from` invocation, including
654 // its gallop bulk-copies, and replaces the byte-level extend
655 // with a `mem::replace` of the head into `output`.
656 //
657 // Restricted to `positions[i] == 0` so we can hand the head off
658 // wholesale; partial-tail passthrough would require a 1-input
659 // `merge_from` to materialize the tail into a new chunk, which
660 // is what gallop already handles inside the merge loop.
661 let lhs_passthrough = positions[0] == 0 && upper_l > 0 && {
662 let lhs = heads[0].borrow();
663 let rhs = heads[1].borrow();
664 let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1));
665 let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1]));
666 last_l < cur_r
667 };
668 if lhs_passthrough {
669 if !result.is_empty() {
670 output.push(std::mem::take(&mut result));
671 result = empty_chunk(stash);
672 }
673 let head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
674 output.push(head);
675 positions[0] = 0;
676 continue;
677 }
678
679 let rhs_passthrough = positions[1] == 0 && upper_r > 0 && {
680 let lhs = heads[0].borrow();
681 let rhs = heads[1].borrow();
682 let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1));
683 let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0]));
684 last_r < cur_l
685 };
686 if rhs_passthrough {
687 if !result.is_empty() {
688 output.push(std::mem::take(&mut result));
689 result = empty_chunk(stash);
690 }
691 let head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
692 output.push(head);
693 positions[1] = 0;
694 continue;
695 }
696
697 // Per-record merge. `merge_from` returns `true` when its inner
698 // amortized ship-threshold check fires — short-circuit the
699 // outer `at_capacity` walk in that case.
700 let yielded = result.merge_from(&mut heads, &mut positions);
701
702 if positions[0] >= heads[0].borrow().len() {
703 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
704 recycle_chunk(old, stash);
705 positions[0] = 0;
706 }
707 if positions[1] >= heads[1].borrow().len() {
708 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
709 recycle_chunk(old, stash);
710 positions[1] = 0;
711 }
712 if yielded || result.at_capacity() {
713 output.push(std::mem::take(&mut result));
714 result = empty_chunk(stash);
715 }
716 }
717
718 // Drain remaining from each side: copy partial head, then append
719 // full chunks directly to output (no per-element copy).
720 drain_side(
721 &mut heads[0],
722 &mut positions[0],
723 &mut list1,
724 &mut result,
725 output,
726 stash,
727 );
728 drain_side(
729 &mut heads[1],
730 &mut positions[1],
731 &mut list2,
732 &mut result,
733 output,
734 stash,
735 );
736 if !result.is_empty() {
737 output.push(result);
738 }
739 }
740
741 fn extract(
742 &mut self,
743 merged: Vec<Self::Chunk>,
744 upper: AntichainRef<Self::Time>,
745 frontier: &mut Antichain<Self::Time>,
746 ship: &mut Vec<Self::Chunk>,
747 kept: &mut Vec<Self::Chunk>,
748 stash: &mut Vec<Self::Chunk>,
749 ) {
750 let mut keep = empty_chunk(stash);
751 let mut ready = empty_chunk(stash);
752
753 for mut buffer in merged {
754 let mut position = 0;
755 let len = buffer.borrow().len();
756 while position < len {
757 buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
758 if keep.at_capacity() {
759 kept.push(std::mem::take(&mut keep));
760 keep = empty_chunk(stash);
761 }
762 if ready.at_capacity() {
763 ship.push(std::mem::take(&mut ready));
764 ready = empty_chunk(stash);
765 }
766 }
767 recycle_chunk(buffer, stash);
768 }
769 if !keep.is_empty() {
770 kept.push(keep);
771 }
772 if !ready.is_empty() {
773 ship.push(ready);
774 }
775 }
776
777 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
778 use timely::dataflow::channels::ContainerBytes;
779 let records = usize::try_from(chunk.record_count()).expect("record_count is non-negative");
780 // Serialized footprint stands in for both `size` and `capacity`: the
781 // chunk owns one logical allocation worth of leaf storage, and we
782 // ship/recycle the whole thing rather than tracking per-leaf
783 // capacities. Treating `size == capacity` matches how the framework
784 // accounts already-shipped chunks (no slack to absorb).
785 let bytes = chunk.length_in_bytes();
786 (records, bytes, bytes, 1)
787 }
788}
789
790/// Pop a chunk from `stash` or allocate a fresh one. Stashed chunks are
791/// already cleared via `recycle_chunk`, so they're ready for push.
792#[inline]
793pub(crate) fn empty_chunk<C: Columnar>(stash: &mut Vec<Column<C>>) -> Column<C> {
794 stash.pop().unwrap_or_default()
795}
796
797/// Reset `chunk` to an empty `Typed` and push it to `stash` for reuse.
798///
799/// Chunks recycled here come from the merger and chunker, both of which
800/// produce `Typed`; only the typed allocations are worth caching for reuse.
801/// `Bytes` / `Align` chunks have no typed-side allocation to preserve, so we
802/// simply drop them — `empty_chunk` will produce a fresh default just as
803/// cheaply, and pushing them onto `stash` would only displace useful
804/// recycled allocations.
805#[inline]
806pub(crate) fn recycle_chunk<C: Columnar>(mut chunk: Column<C>, stash: &mut Vec<Column<C>>) {
807 if let Column::Typed(c) = &mut chunk {
808 c.clear();
809 stash.push(chunk);
810 }
811}
812
813/// Drain remaining items from one side into `result` / `output`.
814///
815/// Copies the partially-consumed head into `result` via `merge_from`'s 1-input
816/// path, then appends remaining full chunks directly to `output` without
817/// per-element copy.
818fn drain_side<D, T, R>(
819 head: &mut Column<(D, T, R)>,
820 pos: &mut usize,
821 list: &mut std::vec::IntoIter<Column<(D, T, R)>>,
822 result: &mut Column<(D, T, R)>,
823 output: &mut Vec<Column<(D, T, R)>>,
824 stash: &mut Vec<Column<(D, T, R)>>,
825) where
826 D: Columnar,
827 for<'a> columnar::Ref<'a, D>: Copy + Ord,
828 T: Columnar + Default + Clone + PartialOrder,
829 for<'a> columnar::Ref<'a, T>: Copy + Ord,
830 R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
831{
832 if *pos < head.borrow().len() {
833 // 1-input dispatch — bulk copy that runs to completion; the yield
834 // signal is unused.
835 let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos));
836 }
837 if !result.is_empty() {
838 output.push(std::mem::take(result));
839 *result = empty_chunk(stash);
840 }
841 Extend::extend(output, list);
842}
843
844#[cfg(test)]
845mod tests {
846 use super::*;
847
848 /// Drive a single `push_into` call with `inputs` and collect the
849 /// consolidated output (if any) as owned tuples.
850 fn run_chunker<D, T, R>(inputs: &[(D, T, R)]) -> Vec<(D, T, R)>
851 where
852 D: Columnar + Clone,
853 for<'a> columnar::Ref<'a, D>: Copy + Ord,
854 T: Columnar + Clone,
855 for<'a> columnar::Ref<'a, T>: Copy + Ord,
856 R: Columnar + Clone + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
857 for<'a> columnar::Ref<'a, R>: Ord,
858 <(D, T, R) as Columnar>::Container: Clone,
859 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
860 <(D, T, R) as Columnar>::Container: columnar::Push<(D, T, R)>,
861 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
862 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
863 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
864 {
865 let mut input: Column<(D, T, R)> = Default::default();
866 for tuple in inputs.iter().cloned() {
867 input.push_into(tuple);
868 }
869
870 let mut chunker: ColumnChunker<(D, T, R)> = Default::default();
871 chunker.push_into(&mut input);
872
873 let mut out = Vec::new();
874 while let Some(chunk) = chunker.extract() {
875 for (d, t, r) in chunk.borrow().into_index_iter() {
876 out.push((D::into_owned(d), T::into_owned(t), R::into_owned(r)));
877 }
878 }
879 out
880 }
881
882 #[mz_ore::test]
883 fn empty_input_yields_no_chunk() {
884 let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
885 let mut input: Column<(u64, u64, i64)> = Default::default();
886 chunker.push_into(&mut input);
887 assert!(chunker.extract().is_none());
888 assert!(chunker.finish().is_none());
889 }
890
891 #[mz_ore::test]
892 fn unsorted_input_is_sorted() {
893 let out = run_chunker(&[(3u64, 0u64, 1i64), (1u64, 0u64, 1i64), (2u64, 0u64, 1i64)]);
894 assert_eq!(out, vec![(1, 0, 1), (2, 0, 1), (3, 0, 1)]);
895 }
896
897 #[mz_ore::test]
898 fn duplicate_keys_consolidate() {
899 let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, 2i64), (1u64, 0u64, -1i64)]);
900 assert_eq!(out, vec![(1, 0, 2)]);
901 }
902
903 #[mz_ore::test]
904 fn diffs_summing_to_zero_are_dropped() {
905 let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, -1i64)]);
906 assert!(out.is_empty());
907 }
908
909 #[mz_ore::test]
910 fn mixed_consolidation() {
911 // (1, 0): 1 + 2 + (-3) = 0 -> dropped
912 // (2, 0): 1 = 1 -> kept
913 // (1, 1): 5 = 5 -> kept (different time from the (1, 0) group)
914 let out = run_chunker(&[
915 (1u64, 0u64, 1i64),
916 (2u64, 0u64, 1i64),
917 (1u64, 0u64, 2i64),
918 (1u64, 1u64, 5i64),
919 (1u64, 0u64, -3i64),
920 ]);
921 assert_eq!(out, vec![(1, 1, 5), (2, 0, 1)]);
922 }
923
924 #[mz_ore::test]
925 fn key_val_tuple_data() {
926 // Exercise the actual val-batcher shape: `D = (K, V)`.
927 let out = run_chunker(&[
928 ((1u64, 10u64), 0u64, 1i64),
929 ((1u64, 10u64), 0u64, 1i64),
930 ((1u64, 11u64), 0u64, 1i64),
931 ((2u64, 10u64), 0u64, 1i64),
932 ]);
933 assert_eq!(
934 out,
935 vec![((1, 10), 0, 2), ((1, 11), 0, 1), ((2, 10), 0, 1),]
936 );
937 }
938
939 #[mz_ore::test]
940 fn buffer_reuse_across_calls() {
941 // Two sequential push_into calls; second runs after extract returned
942 // the first chunk, exercising the in-place clear path.
943 let mut input1: Column<(u64, u64, i64)> = Default::default();
944 input1.push_into((1u64, 0u64, 1i64));
945 input1.push_into((2u64, 0u64, 1i64));
946
947 let mut input2: Column<(u64, u64, i64)> = Default::default();
948 input2.push_into((3u64, 0u64, 1i64));
949 input2.push_into((1u64, 0u64, 1i64));
950
951 let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
952 chunker.push_into(&mut input1);
953
954 // Hand back the first chunk via extract, simulating the merge batcher
955 // taking ownership of the &mut and then returning.
956 {
957 let _ = chunker.extract().expect("first chunk");
958 }
959
960 chunker.push_into(&mut input2);
961
962 let chunk = chunker.extract().expect("second chunk");
963 let collected: Vec<_> = chunk
964 .borrow()
965 .into_index_iter()
966 .map(|(d, t, r)| (u64::into_owned(d), u64::into_owned(t), i64::into_owned(r)))
967 .collect();
968 assert_eq!(collected, vec![(1, 0, 1), (3, 0, 1)]);
969 }
970
971 /// Build a `Column<((u64, u64), u64, i64)>` from a slice of tuples.
972 fn col(rows: &[((u64, u64), u64, i64)]) -> Column<((u64, u64), u64, i64)> {
973 let mut c: Column<((u64, u64), u64, i64)> = Default::default();
974 for &t in rows {
975 c.push_into(t);
976 }
977 c
978 }
979
980 fn collect_chunks(chunks: &[Column<((u64, u64), u64, i64)>]) -> Vec<((u64, u64), u64, i64)> {
981 chunks
982 .iter()
983 .flat_map(|c| {
984 c.borrow().into_index_iter().map(|((k, v), t, r)| {
985 (
986 (u64::into_owned(k), u64::into_owned(v)),
987 u64::into_owned(t),
988 i64::into_owned(r),
989 )
990 })
991 })
992 .collect()
993 }
994
995 /// Disjoint-range chains exercise the whole-chunk passthrough fast path:
996 /// every chunk in chain1 is sortable-before every chunk in chain2, so
997 /// each outer-loop iteration should hand a chunk straight to `output`
998 /// without recursing through the per-record merge.
999 #[mz_ore::test]
1000 fn merger_disjoint_chains_passthrough() {
1001 let chain1 = vec![
1002 col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]),
1003 col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]),
1004 ];
1005 let chain2 = vec![
1006 col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]),
1007 col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]),
1008 ];
1009
1010 let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1011 let mut output = Vec::new();
1012 let mut stash = Vec::new();
1013 Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1014
1015 let collected = collect_chunks(&output);
1016 let expected: Vec<_> = (0..4u64)
1017 .map(|d| ((d, 0u64), 0u64, 1i64))
1018 .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64)))
1019 .collect();
1020 assert_eq!(collected, expected);
1021 }
1022
1023 /// Interleaved chains never satisfy the passthrough condition; each
1024 /// outer iteration falls through to `merge_from`. Same correctness
1025 /// expectation, exercises the non-passthrough path under
1026 /// `Merger::merge`.
1027 #[mz_ore::test]
1028 fn merger_interleaved_chains() {
1029 // Even keys on one chain, odd on the other; chunks alternate so the
1030 // per-record path is the only viable route.
1031 let chain1 = vec![
1032 col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]),
1033 col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]),
1034 ];
1035 let chain2 = vec![
1036 col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]),
1037 col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]),
1038 ];
1039
1040 let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1041 let mut output = Vec::new();
1042 let mut stash = Vec::new();
1043 Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1044
1045 let collected = collect_chunks(&output);
1046 let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
1047 assert_eq!(collected, expected);
1048 }
1049
1050 /// Passthrough must consolidate adjacent equal keys at chunk
1051 /// boundaries — i.e., must NOT fire when `chain1`'s last record's
1052 /// `(d, t)` equals `chain2`'s first.
1053 #[mz_ore::test]
1054 fn merger_passthrough_respects_equal_boundary() {
1055 // chain1's last == chain2's first key: equal-key consolidation
1056 // must kick in (sum of diffs would be 2). If passthrough fired
1057 // erroneously, both records would land in different output chunks
1058 // unconsolidated.
1059 let chain1 = vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])];
1060 let chain2 = vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])];
1061
1062 let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1063 let mut output = Vec::new();
1064 let mut stash = Vec::new();
1065 Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1066
1067 let collected = collect_chunks(&output);
1068 assert_eq!(
1069 collected,
1070 vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]
1071 );
1072 }
1073}
1074
1075#[cfg(test)]
1076mod proptests {
1077 //! Property tests for `Column::merge_from` and `Column::extract`.
1078 //!
1079 //! Strategy: generate sorted+consolidated inputs (the merger's input
1080 //! contract), drive `merge_from` / `extract` the same way the framework
1081 //! would, and compare against a brute-force reference impl.
1082 //!
1083 //! Test types are `D = (u64, u64)`, `T = u64`, `R = i64` drawn from small
1084 //! ranges so that equal-key collisions are common and the consolidation
1085 //! path actually runs.
1086 use super::*;
1087 use mz_ore::cast::CastFrom;
1088 use proptest::prelude::*;
1089 use timely::progress::frontier::Antichain;
1090
1091 type Tuple = ((u64, u64), u64, i64);
1092
1093 /// Reference consolidation: sort by `(data, time)`, sum diffs over equal
1094 /// pairs, drop zeros.
1095 fn consolidate(mut v: Vec<Tuple>) -> Vec<Tuple> {
1096 v.sort();
1097 let mut out: Vec<Tuple> = Vec::new();
1098 for (d, t, r) in v {
1099 if let Some(last) = out.last_mut() {
1100 if last.0 == d && last.1 == t {
1101 last.2 += r;
1102 continue;
1103 }
1104 }
1105 out.push((d, t, r));
1106 }
1107 out.retain(|x| x.2 != 0);
1108 out
1109 }
1110
1111 /// Strategy for sorted+consolidated input lists. Ranges are small to
1112 /// encourage equal-key collisions.
1113 fn arb_consolidated() -> impl Strategy<Value = Vec<Tuple>> {
1114 prop::collection::vec(((0u64..5, 0u64..5), 0u64..3, -3i64..=3i64), 0..30)
1115 .prop_map(consolidate)
1116 }
1117
1118 fn build_column(v: &[Tuple]) -> Column<Tuple> {
1119 let mut col: Column<Tuple> = Default::default();
1120 for tup in v {
1121 col.push_into(*tup);
1122 }
1123 col
1124 }
1125
1126 fn collect_column(col: &Column<Tuple>) -> Vec<Tuple> {
1127 col.borrow()
1128 .into_index_iter()
1129 .map(|((k, v), t, r)| {
1130 (
1131 (u64::into_owned(k), u64::into_owned(v)),
1132 u64::into_owned(t),
1133 i64::into_owned(r),
1134 )
1135 })
1136 .collect()
1137 }
1138
1139 /// Drive a 2-way merge the same way `Merger::merge` would: a 2-input
1140 /// call until one side exhausts, then a 1-input drain for whichever
1141 /// side still has data.
1142 fn drive_merge(left: Column<Tuple>, right: Column<Tuple>) -> Column<Tuple> {
1143 let mut self_col: Column<Tuple> = Default::default();
1144 let mut others = [left, right];
1145 let mut positions = [0usize, 0];
1146 let _ = self_col.merge_from(&mut others, &mut positions);
1147
1148 let [left_done, right_done] = others;
1149 let [left_pos, right_pos] = positions;
1150
1151 if left_pos < left_done.borrow().len() {
1152 let mut tail = [left_done];
1153 let mut p = [left_pos];
1154 let _ = self_col.merge_from(&mut tail, &mut p);
1155 } else if right_pos < right_done.borrow().len() {
1156 let mut tail = [right_done];
1157 let mut p = [right_pos];
1158 let _ = self_col.merge_from(&mut tail, &mut p);
1159 }
1160
1161 self_col
1162 }
1163
1164 proptest! {
1165 /// `merge_from` with two sorted+consolidated inputs equals the
1166 /// reference consolidate(union).
1167 #[mz_ore::test]
1168 #[cfg_attr(miri, ignore)]
1169 fn merge_from_equals_consolidated_union(
1170 a in arb_consolidated(),
1171 b in arb_consolidated(),
1172 ) {
1173 let merged = drive_merge(build_column(&a), build_column(&b));
1174
1175 let mut union = a.clone();
1176 Extend::extend(&mut union, b.iter().copied());
1177 let expected = consolidate(union);
1178
1179 prop_assert_eq!(collect_column(&merged), expected);
1180 }
1181
1182 /// `merge_from` 1-input bulk-copy from a non-zero position equals
1183 /// `other[*pos..]`.
1184 #[mz_ore::test]
1185 #[cfg_attr(miri, ignore)]
1186 fn merge_from_one_input_drains_tail(
1187 data in arb_consolidated(),
1188 pos_frac in 0u32..=100,
1189 ) {
1190 // Cap at len so we always have a valid position.
1191 let len = data.len();
1192 let start_pos = if len == 0 { 0 } else {
1193 (usize::cast_from(pos_frac) * len) / 101
1194 };
1195
1196 // Self starts non-empty so we exercise the bulk-copy path, not the
1197 // empty-self swap shortcut.
1198 let mut self_col: Column<Tuple> = Default::default();
1199 let sentinel: Tuple = ((u64::MAX, u64::MAX), 0, 1);
1200 self_col.push_into(sentinel);
1201
1202 let mut others = [build_column(&data)];
1203 let mut positions = [start_pos];
1204 let _ = self_col.merge_from(&mut others, &mut positions);
1205
1206 let mut expected = vec![sentinel];
1207 Extend::extend(&mut expected, data[start_pos..].iter().copied());
1208
1209 prop_assert_eq!(collect_column(&self_col), expected);
1210 prop_assert_eq!(positions[0], len);
1211 }
1212
1213 /// `merge_from` 1-input swap shortcut: empty self + pos=0 should
1214 /// produce a column equal to the input.
1215 #[mz_ore::test]
1216 #[cfg_attr(miri, ignore)]
1217 fn merge_from_empty_self_swap(data in arb_consolidated()) {
1218 let mut self_col: Column<Tuple> = Default::default();
1219 let mut others = [build_column(&data)];
1220 let mut positions = [0usize];
1221 let _ = self_col.merge_from(&mut others, &mut positions);
1222
1223 prop_assert_eq!(collect_column(&self_col), data);
1224 }
1225
1226 /// `extract` partitions correctly:
1227 /// - keep ∪ ship multiset-equals self
1228 /// - upper.less_equal(t) for every kept time
1229 /// - !upper.less_equal(t) for every shipped time
1230 /// - frontier covers every kept time
1231 #[mz_ore::test]
1232 #[cfg_attr(miri, ignore)]
1233 fn extract_partitions_by_frontier(
1234 data in arb_consolidated(),
1235 upper_time in 0u64..=4,
1236 ) {
1237 let mut self_col = build_column(&data);
1238 let upper = Antichain::from_elem(upper_time);
1239 let mut frontier: Antichain<u64> = Antichain::new();
1240 let mut keep: Column<Tuple> = Default::default();
1241 let mut ship: Column<Tuple> = Default::default();
1242 let mut position = 0;
1243
1244 self_col.extract(
1245 &mut position,
1246 upper.borrow(),
1247 &mut frontier,
1248 &mut keep,
1249 &mut ship,
1250 );
1251
1252 // Single call drains the input (we removed the at_capacity yield).
1253 prop_assert_eq!(position, data.len());
1254
1255 let kept = collect_column(&keep);
1256 let shipped = collect_column(&ship);
1257
1258 // Partition predicate: kept times >= upper, shipped times < upper.
1259 for (_, t, _) in &kept {
1260 prop_assert!(
1261 upper.borrow().less_equal(t),
1262 "kept time {} should satisfy upper.less_equal", t,
1263 );
1264 }
1265 for (_, t, _) in &shipped {
1266 prop_assert!(
1267 !upper.borrow().less_equal(t),
1268 "shipped time {} should NOT satisfy upper.less_equal", t,
1269 );
1270 }
1271
1272 // Union (multiset) equals input.
1273 let mut union = kept.clone();
1274 Extend::extend(&mut union, shipped.iter().copied());
1275 union.sort();
1276 let mut expected_sorted = data.clone();
1277 expected_sorted.sort();
1278 prop_assert_eq!(union, expected_sorted);
1279
1280 // Frontier dominates every kept time.
1281 for (_, t, _) in &kept {
1282 prop_assert!(
1283 frontier.less_equal(t),
1284 "frontier should dominate kept time {}", t,
1285 );
1286 }
1287 }
1288
1289 /// Empty input → no work, frontier untouched, position = 0.
1290 #[mz_ore::test]
1291 #[cfg_attr(miri, ignore)]
1292 fn extract_empty_input(upper_time in 0u64..=4) {
1293 let mut self_col: Column<Tuple> = Default::default();
1294 let upper = Antichain::from_elem(upper_time);
1295 let mut frontier: Antichain<u64> = Antichain::new();
1296 let mut keep: Column<Tuple> = Default::default();
1297 let mut ship: Column<Tuple> = Default::default();
1298 let mut position = 0;
1299
1300 self_col.extract(
1301 &mut position,
1302 upper.borrow(),
1303 &mut frontier,
1304 &mut keep,
1305 &mut ship,
1306 );
1307
1308 prop_assert_eq!(position, 0);
1309 prop_assert!(collect_column(&keep).is_empty());
1310 prop_assert!(collect_column(&ship).is_empty());
1311 prop_assert!(frontier.elements().is_empty());
1312 }
1313 }
1314}