Skip to main content

mz_timely_util/columnar/
batcher.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types for consolidating, merging, and extracting columnar update collections.
17
18use std::collections::VecDeque;
19use std::marker::PhantomData;
20
21use crate::columnation::ColumnationStack;
22use columnar::Container as _;
23use columnar::Push as _;
24use columnar::{Clear, Columnar, Index, Len};
25use columnation::Columnation;
26use differential_dataflow::difference::Semigroup;
27use differential_dataflow::trace::implementations::merge_batcher::Merger;
28use timely::Accountable;
29use timely::Container;
30use timely::PartialOrder;
31use timely::container::{ContainerBuilder, PushInto, SizableContainer};
32use timely::progress::frontier::{Antichain, AntichainRef};
33
34use crate::columnar::Column;
35
36/// A chunker to transform input data into sorted columns.
37#[derive(Default)]
38pub struct Chunker<C> {
39    /// Buffer into which we'll consolidate.
40    ///
41    /// Also the buffer where we'll stage responses to `extract` and `finish`.
42    /// When these calls return, the buffer is available for reuse.
43    target: C,
44    /// Consolidated buffers ready to go.
45    ready: VecDeque<C>,
46}
47
48impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
49    type Container = C;
50
51    fn extract(&mut self) -> Option<&mut Self::Container> {
52        if let Some(ready) = self.ready.pop_front() {
53            self.target = ready;
54            Some(&mut self.target)
55        } else {
56            None
57        }
58    }
59
60    fn finish(&mut self) -> Option<&mut Self::Container> {
61        self.extract()
62    }
63}
64
65impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for Chunker<ColumnationStack<(D, T, R)>>
66where
67    D: Columnar + Columnation,
68    for<'b> columnar::Ref<'b, D>: Ord + Copy,
69    T: Columnar + Columnation,
70    for<'b> columnar::Ref<'b, T>: Ord + Copy,
71    R: Columnar + Columnation + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
72    for<'b> columnar::Ref<'b, R>: Ord,
73{
74    fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
75        // Sort input data
76        // TODO: consider `Vec<usize>` that we retain, containing indexes.
77        let borrowed = container.borrow();
78        let mut permutation = Vec::with_capacity(borrowed.len());
79        Extend::extend(&mut permutation, borrowed.into_index_iter());
80        permutation.sort();
81
82        self.target.clear();
83        // Iterate over the data, accumulating diffs for like keys.
84        let mut iter = permutation.drain(..);
85        if let Some((data, time, diff)) = iter.next() {
86            let mut owned_data = D::into_owned(data);
87            let mut owned_time = T::into_owned(time);
88
89            let mut prev_data = data;
90            let mut prev_time = time;
91            let mut prev_diff = <R as Columnar>::into_owned(diff);
92
93            for (data, time, diff) in iter {
94                if (&prev_data, &prev_time) == (&data, &time) {
95                    prev_diff.plus_equals(&diff);
96                } else {
97                    if !prev_diff.is_zero() {
98                        D::copy_from(&mut owned_data, prev_data);
99                        T::copy_from(&mut owned_time, prev_time);
100                        let tuple = (owned_data, owned_time, prev_diff);
101                        self.target.push_into(&tuple);
102                        (owned_data, owned_time, prev_diff) = tuple;
103                    }
104                    prev_data = data;
105                    prev_time = time;
106                    R::copy_from(&mut prev_diff, diff);
107                }
108            }
109
110            if !prev_diff.is_zero() {
111                D::copy_from(&mut owned_data, prev_data);
112                T::copy_from(&mut owned_time, prev_time);
113                let tuple = (owned_data, owned_time, prev_diff);
114                self.target.push_into(&tuple);
115            }
116        }
117
118        if !self.target.is_empty() {
119            self.ready.push_back(std::mem::take(&mut self.target));
120        }
121    }
122}
123
124/// A chunker that consolidates `Column<(D, T, R)>` updates into sorted `Column`
125/// chunks, without round-tripping through columnation.
126///
127/// Drop-in counterpart to [`Chunker`] for the merge-batcher path: same control
128/// flow (sort borrowed refs, fold equal `(data, time)` runs, drop zero diffs),
129/// but the consolidated output stays in [`Column`].
130pub struct ColumnChunker<U: Columnar> {
131    /// Container we consolidate into and present to extract/finish callers.
132    /// Always `Column::Typed` between calls so we can push into it.
133    target: Column<U>,
134    /// Sorted, consolidated chunks pending extraction.
135    ready: VecDeque<Column<U>>,
136}
137
138// Manual impl rather than `#[derive(Default)]`: the derive would synthesize
139// `impl<U: Columnar + Default>`, but `Column<U>: Default` only requires
140// `U: Columnar`, and adding a spurious `U: Default` bound would propagate
141// through every `ContainerBuilder for ColumnChunker<U>` impl.
142impl<U: Columnar> Default for ColumnChunker<U> {
143    fn default() -> Self {
144        Self {
145            target: Column::default(),
146            ready: VecDeque::new(),
147        }
148    }
149}
150
151impl<U: Columnar> ContainerBuilder for ColumnChunker<U>
152where
153    U::Container: Clone + 'static,
154{
155    type Container = Column<U>;
156
157    fn extract(&mut self) -> Option<&mut Self::Container> {
158        if let Some(ready) = self.ready.pop_front() {
159            self.target = ready;
160            Some(&mut self.target)
161        } else {
162            None
163        }
164    }
165
166    fn finish(&mut self) -> Option<&mut Self::Container> {
167        self.extract()
168    }
169}
170
171impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for ColumnChunker<(D, T, R)>
172where
173    D: Columnar,
174    for<'b> columnar::Ref<'b, D>: Copy + Ord,
175    T: Columnar,
176    for<'b> columnar::Ref<'b, T>: Copy + Ord,
177    R: Columnar + Default + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
178    for<'b> columnar::Ref<'b, R>: Ord,
179    for<'b> <D as Columnar>::Container: columnar::Push<columnar::Ref<'b, D>>,
180    for<'b> <T as Columnar>::Container: columnar::Push<columnar::Ref<'b, T>>,
181    for<'b> <R as Columnar>::Container: columnar::Push<&'b R>,
182{
183    fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
184        // Reset target to an empty owned container. If it's already `Typed`
185        // (steady state, possibly recycling a chunk just handed back via
186        // `extract`), clear in place to reuse buffer allocations. Otherwise
187        // start fresh — the bytes/align variants don't support push.
188        match &mut self.target {
189            Column::Typed(c) => c.clear(),
190            Column::Bytes(_) | Column::Align(_) => {
191                self.target = Column::Typed(Default::default());
192            }
193        }
194
195        // Sort input by columnar ref order.
196        let borrowed = container.borrow();
197        let mut permutation = Vec::with_capacity(borrowed.len());
198        Extend::extend(&mut permutation, borrowed.into_index_iter());
199        permutation.sort();
200
201        // Sweep sorted refs, accumulating diffs over equal `(data, time)`
202        // pairs and pushing non-zero results to the target's leaves. Refs
203        // from the input borrow are valid through the sweep, so D and T
204        // are pushed directly via each leaf's `Push<Ref<_>>` impl. Only R
205        // needs an owned scratch since it carries the consolidated sum.
206        {
207            let Column::Typed(target_c) = &mut self.target else {
208                unreachable!("target reset to Typed above");
209            };
210            let (target_d, target_t, target_r) = target_c;
211
212            let mut iter = permutation.drain(..);
213            if let Some((data, time, diff)) = iter.next() {
214                let mut prev_data = data;
215                let mut prev_time = time;
216                let mut prev_diff = <R as Columnar>::into_owned(diff);
217
218                for (data, time, diff) in iter {
219                    if (&prev_data, &prev_time) == (&data, &time) {
220                        prev_diff.plus_equals(&diff);
221                    } else {
222                        if !prev_diff.is_zero() {
223                            target_d.push(prev_data);
224                            target_t.push(prev_time);
225                            target_r.push(&prev_diff);
226                        }
227                        prev_data = data;
228                        prev_time = time;
229                        R::copy_from(&mut prev_diff, diff);
230                    }
231                }
232
233                if !prev_diff.is_zero() {
234                    target_d.push(prev_data);
235                    target_t.push(prev_time);
236                    target_r.push(&prev_diff);
237                }
238            }
239        }
240
241        if self.target.borrow().len() > 0 {
242            let chunk = std::mem::replace(&mut self.target, Column::Typed(Default::default()));
243            self.ready.push_back(chunk);
244        }
245    }
246}
247
248/// Advance `*lower` past every position in `[*lower, upper)` where `cmp`
249/// returns true.
250///
251/// On return, `*lower` is the first index `>= initial *lower` where `cmp`
252/// returns false, or `upper` if `cmp` holds through the end.
253///
254/// Takes the predicate as `FnMut(usize) -> bool` rather than a value-bearing
255/// closure so callers can index whichever subset of the input columns they
256/// actually need to compare — for the merger's `(d, t)`-keyed sort, this lets
257/// each probe touch only the D and T leaf views, skipping the diff column.
258///
259/// Compared to a linear scan, this is `O(log K)` for a run of length `K`
260/// satisfying `cmp` — useful when one side of a sorted merge has long runs
261/// dominated by the other side.
262fn gallop(upper: usize, lower: &mut usize, mut cmp: impl FnMut(usize) -> bool) {
263    // If `cmp` is already false at `*lower`, the run is empty — nothing to do.
264    if *lower < upper && cmp(*lower) {
265        // Phase 1 (overshoot): advance by exponentially growing steps as long
266        // as `cmp` holds. After this loop, `*lower` is the last position we
267        // confirmed satisfies `cmp`, and `*lower + step` either falls off the
268        // end or fails `cmp`. The boundary is somewhere in `(*lower, *lower +
269        // step]`.
270        let mut step = 1;
271        while *lower + step < upper && cmp(*lower + step) {
272            *lower += step;
273            step <<= 1;
274        }
275
276        // Phase 2 (binary descent): halve `step` and probe `*lower + step`,
277        // accepting the advance only when `cmp` still holds. This narrows the
278        // search range by half each iteration, settling on the largest index
279        // still satisfying `cmp`.
280        step >>= 1;
281        while step > 0 {
282            if *lower + step < upper && cmp(*lower + step) {
283                *lower += step;
284            }
285            step >>= 1;
286        }
287
288        // `*lower` now points at the last index where `cmp` holds; the caller
289        // wants the first index where it doesn't, so step past it.
290        *lower += 1;
291    }
292}
293
294/// Counterpart to `ColInternalMerger` (which merges `ColumnationStack` chunks).
295/// Drives the merge batcher with [`Column`]-shaped chunks, no columnation
296/// detour, by way of the inherent `merge_from` / `extract` methods on
297/// `Column<(D, T, R)>` below.
298pub struct ColumnMerger<D, T, R> {
299    _marker: PhantomData<(D, T, R)>,
300}
301
302impl<D, T, R> Default for ColumnMerger<D, T, R> {
303    fn default() -> Self {
304        Self {
305            _marker: PhantomData,
306        }
307    }
308}
309
310/// Per-chunk merge and extract for [`Column`]-shaped sorted chunks.
311///
312/// These are the building blocks that [`Merger for ColumnMerger`] orchestrates
313/// over chains of chunks. They're inherent methods rather than a trait impl
314/// so the merger can call them without going through any wrapper indirection.
315impl<D, T, R> Column<(D, T, R)>
316where
317    D: Columnar + Default,
318    for<'a> columnar::Ref<'a, D>: Copy + Ord,
319    T: Columnar + Default + Clone + PartialOrder,
320    for<'a> columnar::Ref<'a, T>: Copy + Ord,
321    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
322    for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
323    for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
324    for<'a> <D as Columnar>::Container: columnar::Push<&'a D>,
325    for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
326    for<'a> <T as Columnar>::Container: columnar::Push<&'a T>,
327    for<'a> <R as Columnar>::Container: columnar::Push<columnar::Ref<'a, R>>,
328    for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
329{
330    /// Merge items from sorted inputs into `self`, advancing positions.
331    ///
332    /// Mirrors the dispatch shape used by the merge-batcher framework:
333    /// - **0**: no-op
334    /// - **1**: bulk copy (or swap, if `self` is empty and `*pos == 0`)
335    /// - **2**: merge two sorted streams, with diff consolidation on equal
336    ///   `(data, time)` keys and gallop bulk-copy of long single-side runs.
337    ///
338    /// Returns `true` if the merge stopped because the amortized ship-threshold
339    /// check inside the inner loop fired (the caller should ship `self` before
340    /// the next call). Returns `false` if the merge stopped because at least
341    /// one input was exhausted at its position (the caller should refill that
342    /// side; `self` may still be at capacity from accumulation across short
343    /// calls and the caller should also check `at_capacity` in that case).
344    ///
345    /// The 0- and 1-input dispatches always return `false`: 0 does no work,
346    /// 1 is a bulk copy or swap that runs to completion.
347    #[must_use]
348    pub fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) -> bool {
349        match others.len() {
350            0 => false,
351            1 => {
352                let other = &mut others[0];
353                let pos = &mut positions[0];
354                // If `self` is empty and `*pos == 0`, we can bulk swap in the other chunk.
355                if self.is_empty() && *pos == 0 {
356                    std::mem::swap(self, other);
357                    return false;
358                }
359                // Otherwise, bulk copy the remaining data from `other[*pos..]` into `self`.
360                let Column::Typed(self_c) = self else {
361                    unreachable!("merger chunks are always Column::Typed");
362                };
363                let src_c = other.borrow();
364                self_c.extend_from_self(src_c, *pos..other.borrow().len());
365                *pos = other.borrow().len();
366                false
367            }
368            2 => {
369                let (left, right) = others.split_at(1);
370                let (left_pos, right_pos) = positions.split_at_mut(1);
371                let left_borrow = left[0].borrow();
372                let right_borrow = right[0].borrow();
373
374                let Column::Typed(self_c) = self else {
375                    unreachable!("merger chunks are always Column::Typed");
376                };
377
378                // Split the input borrows into per-leaf views.
379                //
380                // A columnar tuple `Borrow::Ref` is recursive: indexing the
381                // tuple borrow walks every leaf (and reconstructs the nested
382                // ref tuple) regardless of which leaves the caller actually
383                // reads. Indexing each leaf view directly cuts probe-path
384                // work to the columns we consult — for the merge step
385                // that's the `(D, T)` key, with the diff column read only
386                // when we push.
387                let l_d = left_borrow.0;
388                let l_t = left_borrow.1;
389                let l_r = left_borrow.2;
390                let r_d = right_borrow.0;
391                let r_t = right_borrow.1;
392                let r_r = right_borrow.2;
393                let upper_l = l_d.len();
394                let upper_r = r_d.len();
395
396                // Mirror the split on the output container. Tuple
397                // containers split into per-leaf containers, which lets us
398                // address each leaf independently — both gallop bulk-copies
399                // and single-record pushes resolve to a primitive operation
400                // per leaf. The leaves stay length-synchronized as long as
401                // every record path pushes exactly one element to each.
402                let (sd, st, sr) = self_c;
403
404                // Pre-size each output leaf for the worst-case merge
405                // (no consolidation): `len(left) + len(right)` records.
406                // `reserve_for` walks each input's `as_bytes`, which is
407                // accurate for variable-length leaves (where reserving a
408                // record count wouldn't size the byte buffer correctly).
409                //
410                // Gated by record count: above a few hundred thousand
411                // records the input bound over-reserves any time
412                // consolidation is heavy, and the framework's outer
413                // ship-threshold check yields us before we'd use the
414                // headroom. For inputs past that point, geometric grow
415                // is bounded by 2× the actual output and avoids
416                // committing pages we'd never touch.
417                const RESERVE_RECORD_THRESHOLD: usize = 1_000_000;
418                if upper_l + upper_r <= RESERVE_RECORD_THRESHOLD {
419                    use columnar::Container as _;
420                    let inputs = [left_borrow, right_borrow];
421                    sd.reserve_for(inputs.iter().map(|b| b.0));
422                    st.reserve_for(inputs.iter().map(|b| b.1));
423                    sr.reserve_for(inputs.iter().map(|b| b.2));
424                }
425
426                let mut stash = R::default();
427
428                // Mid-merge ship-threshold check, matching the heuristic
429                // used by `Column::at_capacity` and `ColumnBuilder`. The
430                // tuple `(sd.borrow(), st.borrow(), sr.borrow())` chains
431                // its leaves' `as_bytes` iterators, so passing it to
432                // `at_serialized_capacity` reuses the canonical
433                // `indexed::length_in_words` formula without needing the
434                // parent borrow we destructured.
435                //
436                // The check walks every leaf slice once per call, which
437                // is non-trivial on variable-length leaves; the caller
438                // runs it every `THRESHOLD_PERIOD_MASK + 1` iterations
439                // rather than per-iter. The ship threshold is ~65 K
440                // records, so overshooting by ~1 K records before the
441                // check fires has no practical impact — the framework's
442                // outer `at_capacity` check sees the oversize chunk and
443                // ships it regardless.
444                let at_ship_threshold =
445                    |sd: &D::Container, st: &T::Container, sr: &R::Container| {
446                        use columnar::Borrow as _;
447                        crate::columnar::at_serialized_capacity(&(
448                            sd.borrow(),
449                            st.borrow(),
450                            sr.borrow(),
451                        ))
452                    };
453                const THRESHOLD_PERIOD_MASK: u32 = 1023;
454                let mut iter: u32 = 0;
455                let mut yielded = false;
456
457                while left_pos[0] < upper_l && right_pos[0] < upper_r {
458                    let d1 = l_d.get(left_pos[0]);
459                    let t1 = l_t.get(left_pos[0]);
460                    let d2 = r_d.get(right_pos[0]);
461                    let t2 = r_t.get(right_pos[0]);
462                    match (d1, t1).cmp(&(d2, t2)) {
463                        std::cmp::Ordering::Less => {
464                            // Common case (interleaved data): single-record
465                            // advance. Skip the gallop call entirely — its
466                            // setup plus the first cmp probe is more
467                            // expensive than just pushing this record and
468                            // re-entering the outer loop. Galloping is only
469                            // worthwhile when there's an actual run, which
470                            // we detect with the peek check below.
471                            sd.push(d1);
472                            st.push(t1);
473                            sr.push(l_r.get(left_pos[0]));
474                            left_pos[0] += 1;
475                            // Long-run case: peek at the next record; if
476                            // it's still strictly less than `(d2, t2)`,
477                            // we have a run worth galloping (and bulk-
478                            // copying).
479                            if left_pos[0] < upper_l
480                                && (l_d.get(left_pos[0]), l_t.get(left_pos[0])) < (d2, t2)
481                            {
482                                let start = left_pos[0];
483                                gallop(upper_l, &mut left_pos[0], |i| {
484                                    (l_d.get(i), l_t.get(i)) < (d2, t2)
485                                });
486                                // Per-leaf bulk copy of the run. Each
487                                // call resolves to an `extend_from_slice`
488                                // on its leaf (recursively for nested
489                                // leaves), which the compiler can
490                                // autovectorize.
491                                sd.extend_from_self(l_d, start..left_pos[0]);
492                                st.extend_from_self(l_t, start..left_pos[0]);
493                                sr.extend_from_self(l_r, start..left_pos[0]);
494                            }
495                        }
496                        std::cmp::Ordering::Greater => {
497                            // Symmetric on the right side.
498                            sd.push(d2);
499                            st.push(t2);
500                            sr.push(r_r.get(right_pos[0]));
501                            right_pos[0] += 1;
502                            if right_pos[0] < upper_r
503                                && (r_d.get(right_pos[0]), r_t.get(right_pos[0])) < (d1, t1)
504                            {
505                                let start = right_pos[0];
506                                gallop(upper_r, &mut right_pos[0], |i| {
507                                    (r_d.get(i), r_t.get(i)) < (d1, t1)
508                                });
509                                sd.extend_from_self(r_d, start..right_pos[0]);
510                                st.extend_from_self(r_t, start..right_pos[0]);
511                                sr.extend_from_self(r_r, start..right_pos[0]);
512                            }
513                        }
514                        std::cmp::Ordering::Equal => {
515                            let r1 = l_r.get(left_pos[0]);
516                            let r2 = r_r.get(right_pos[0]);
517                            R::copy_from(&mut stash, r1);
518                            stash.plus_equals(&r2);
519                            if !stash.is_zero() {
520                                sd.push(d1);
521                                st.push(t1);
522                                sr.push(&stash);
523                            }
524                            left_pos[0] += 1;
525                            right_pos[0] += 1;
526                        }
527                    }
528
529                    // Amortized ship-threshold check; see comment above
530                    // `at_ship_threshold` for rationale.
531                    iter = iter.wrapping_add(1);
532                    if iter & THRESHOLD_PERIOD_MASK == 0 && at_ship_threshold(sd, st, sr) {
533                        yielded = true;
534                        break;
535                    }
536                }
537                yielded
538            }
539            // `Merger::merge` only ever calls `merge_from` with 0/1/2-input
540            // slices (k-way merge isn't part of the merge-batcher contract).
541            // Defensive guard: if someone bumps that, this will panic
542            // immediately rather than silently produce wrong output.
543            n => unreachable!("merge_from called with {n} inputs; expected 0, 1, or 2"),
544        }
545    }
546
547    /// Partition records starting at `*position` into `keep` (times beyond
548    /// `upper`, retained for the next round) and `ship` (times not beyond
549    /// `upper`, sealed into the output batch). Updates `frontier` with the
550    /// times of kept records.
551    ///
552    /// The caller invokes `extract` repeatedly until `*position >= self.len()`,
553    /// swapping out a full output buffer between calls. This shape exists
554    /// because the framework only checks `at_capacity()` between calls, so
555    /// without an inner-loop yield a single call could quietly produce
556    /// oversized output chunks.
557    pub fn extract(
558        &mut self,
559        position: &mut usize,
560        upper: AntichainRef<T>,
561        frontier: &mut Antichain<T>,
562        keep: &mut Self,
563        ship: &mut Self,
564    ) {
565        let Column::Typed(keep_c) = keep else {
566            unreachable!("merger chunks are always Column::Typed");
567        };
568        let Column::Typed(ship_c) = ship else {
569            unreachable!("merger chunks are always Column::Typed");
570        };
571
572        let self_view = self.borrow();
573        let len = self_view.len();
574
575        let mut owned_t = T::default();
576        // Yield to the framework when either output buffer reaches the
577        // ship threshold, so it can ship a full chunk and hand back a
578        // fresh one. Required by the merger's extract contract: the
579        // framework only checks `at_capacity` between calls, so without
580        // an inner-loop yield a single call can fill an output well past
581        // threshold.
582        use columnar::Borrow as _;
583        while *position < len
584            && !crate::columnar::at_serialized_capacity(&keep_c.borrow())
585            && !crate::columnar::at_serialized_capacity(&ship_c.borrow())
586        {
587            let (_, time, _) = self_view.get(*position);
588            T::copy_from(&mut owned_t, time);
589            if upper.less_equal(&owned_t) {
590                // `insert_with` only clones when the time isn't already
591                // present in the antichain.
592                frontier.insert_with(&owned_t, |t| t.clone());
593                keep_c.extend_from_self(self_view, *position..*position + 1);
594            } else {
595                ship_c.extend_from_self(self_view, *position..*position + 1);
596            }
597            *position += 1;
598        }
599    }
600}
601
602/// `Merger` impl driving [`MergeBatcher`] over [`Column`]-shaped chunks.
603///
604/// `merge` walks two sorted chains of chunks in lockstep, calling
605/// `Column::merge_from` to consume up to one ship-threshold's worth of input
606/// per pass and shipping `result` to `output` whenever it crosses
607/// `at_capacity`. Exhausted input chunks are reset and pushed to `stash` for
608/// reuse. The drain phase appends remaining full chunks to `output`
609/// directly, with no per-element copy.
610///
611/// `extract` walks each chunk via `Column::extract`, partitioning records
612/// into `kept` (times beyond `upper`) and `ship` (sealed into the output
613/// batch); both grow chunk-by-chunk under the same `at_capacity` ship
614/// signal.
615///
616/// [`MergeBatcher`]: differential_dataflow::trace::implementations::merge_batcher::MergeBatcher
617impl<D, T, R> Merger for ColumnMerger<D, T, R>
618where
619    D: Columnar + Default + 'static,
620    for<'a> columnar::Ref<'a, D>: Copy + Ord,
621    T: Columnar + Default + Clone + Ord + PartialOrder + 'static,
622    for<'a> columnar::Ref<'a, T>: Copy + Ord,
623    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>> + 'static,
624    for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
625    for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
626    for<'a> <D as Columnar>::Container: columnar::Push<&'a D>,
627    for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
628    for<'a> <T as Columnar>::Container: columnar::Push<&'a T>,
629    for<'a> <R as Columnar>::Container: columnar::Push<columnar::Ref<'a, R>>,
630    for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
631{
632    type Time = T;
633    type Chunk = Column<(D, T, R)>;
634
635    fn merge(
636        &mut self,
637        list1: Vec<Self::Chunk>,
638        list2: Vec<Self::Chunk>,
639        output: &mut Vec<Self::Chunk>,
640        stash: &mut Vec<Self::Chunk>,
641    ) {
642        let mut list1 = list1.into_iter();
643        let mut list2 = list2.into_iter();
644
645        let mut heads = [
646            list1.next().unwrap_or_default(),
647            list2.next().unwrap_or_default(),
648        ];
649        let mut positions = [0usize, 0usize];
650
651        let mut result = empty_chunk(stash);
652
653        // Main merge loop: both sides have data.
654        loop {
655            let upper_l = heads[0].borrow().len();
656            let upper_r = heads[1].borrow().len();
657            if positions[0] >= upper_l || positions[1] >= upper_r {
658                break;
659            }
660
661            // Whole-chunk passthrough fast path. When one head's tail (from
662            // its current position) is sortable-before the other head's
663            // current record, the entire tail can be appended to `output`
664            // without per-record compares or per-leaf byte copies.
665            //
666            // Two probes (one record from each side) settle this — when it
667            // fires, it skips an entire `merge_from` invocation, including
668            // its gallop bulk-copies, and replaces the byte-level extend
669            // with a `mem::replace` of the head into `output`.
670            //
671            // Restricted to `positions[i] == 0` so we can hand the head off
672            // wholesale; partial-tail passthrough would require a 1-input
673            // `merge_from` to materialize the tail into a new chunk, which
674            // is what gallop already handles inside the merge loop.
675            let lhs_passthrough = positions[0] == 0 && upper_l > 0 && {
676                let lhs = heads[0].borrow();
677                let rhs = heads[1].borrow();
678                let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1));
679                let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1]));
680                last_l < cur_r
681            };
682            if lhs_passthrough {
683                if !result.is_empty() {
684                    output.push(std::mem::take(&mut result));
685                    result = empty_chunk(stash);
686                }
687                let head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
688                output.push(head);
689                positions[0] = 0;
690                continue;
691            }
692
693            let rhs_passthrough = positions[1] == 0 && upper_r > 0 && {
694                let lhs = heads[0].borrow();
695                let rhs = heads[1].borrow();
696                let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1));
697                let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0]));
698                last_r < cur_l
699            };
700            if rhs_passthrough {
701                if !result.is_empty() {
702                    output.push(std::mem::take(&mut result));
703                    result = empty_chunk(stash);
704                }
705                let head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
706                output.push(head);
707                positions[1] = 0;
708                continue;
709            }
710
711            // Per-record merge. `merge_from` returns `true` when its inner
712            // amortized ship-threshold check fires — short-circuit the
713            // outer `at_capacity` walk in that case.
714            let yielded = result.merge_from(&mut heads, &mut positions);
715
716            if positions[0] >= heads[0].borrow().len() {
717                let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
718                recycle_chunk(old, stash);
719                positions[0] = 0;
720            }
721            if positions[1] >= heads[1].borrow().len() {
722                let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
723                recycle_chunk(old, stash);
724                positions[1] = 0;
725            }
726            if yielded || result.at_capacity() {
727                output.push(std::mem::take(&mut result));
728                result = empty_chunk(stash);
729            }
730        }
731
732        // Drain remaining from each side: copy partial head, then append
733        // full chunks directly to output (no per-element copy).
734        drain_side(
735            &mut heads[0],
736            &mut positions[0],
737            &mut list1,
738            &mut result,
739            output,
740            stash,
741        );
742        drain_side(
743            &mut heads[1],
744            &mut positions[1],
745            &mut list2,
746            &mut result,
747            output,
748            stash,
749        );
750        if !result.is_empty() {
751            output.push(result);
752        }
753    }
754
755    fn extract(
756        &mut self,
757        merged: Vec<Self::Chunk>,
758        upper: AntichainRef<Self::Time>,
759        frontier: &mut Antichain<Self::Time>,
760        ship: &mut Vec<Self::Chunk>,
761        kept: &mut Vec<Self::Chunk>,
762        stash: &mut Vec<Self::Chunk>,
763    ) {
764        let mut keep = empty_chunk(stash);
765        let mut ready = empty_chunk(stash);
766
767        for mut buffer in merged {
768            let mut position = 0;
769            let len = buffer.borrow().len();
770            while position < len {
771                buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
772                if keep.at_capacity() {
773                    kept.push(std::mem::take(&mut keep));
774                    keep = empty_chunk(stash);
775                }
776                if ready.at_capacity() {
777                    ship.push(std::mem::take(&mut ready));
778                    ready = empty_chunk(stash);
779                }
780            }
781            recycle_chunk(buffer, stash);
782        }
783        if !keep.is_empty() {
784            kept.push(keep);
785        }
786        if !ready.is_empty() {
787            ship.push(ready);
788        }
789    }
790
791    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
792        use timely::dataflow::channels::ContainerBytes;
793        let records = usize::try_from(chunk.record_count()).expect("record_count is non-negative");
794        // Serialized footprint stands in for both `size` and `capacity`: the
795        // chunk owns one logical allocation worth of leaf storage, and we
796        // ship/recycle the whole thing rather than tracking per-leaf
797        // capacities. Treating `size == capacity` matches how the framework
798        // accounts already-shipped chunks (no slack to absorb).
799        let bytes = chunk.length_in_bytes();
800        (records, bytes, bytes, 1)
801    }
802}
803
804/// Pop a chunk from `stash` or allocate a fresh one. Stashed chunks are
805/// already cleared via `recycle_chunk`, so they're ready for push.
806#[inline]
807fn empty_chunk<C: Columnar>(stash: &mut Vec<Column<C>>) -> Column<C> {
808    stash.pop().unwrap_or_default()
809}
810
811/// Reset `chunk` to an empty `Typed` and push it to `stash` for reuse.
812///
813/// Chunks recycled here come from the merger and chunker, both of which
814/// produce `Typed`; only the typed allocations are worth caching for reuse.
815/// `Bytes` / `Align` chunks have no typed-side allocation to preserve, so we
816/// simply drop them — `empty_chunk` will produce a fresh default just as
817/// cheaply, and pushing them onto `stash` would only displace useful
818/// recycled allocations.
819#[inline]
820fn recycle_chunk<C: Columnar>(mut chunk: Column<C>, stash: &mut Vec<Column<C>>) {
821    if let Column::Typed(c) = &mut chunk {
822        c.clear();
823        stash.push(chunk);
824    }
825}
826
827/// Drain remaining items from one side into `result` / `output`.
828///
829/// Copies the partially-consumed head into `result` via `merge_from`'s 1-input
830/// path, then appends remaining full chunks directly to `output` without
831/// per-element copy.
832fn drain_side<D, T, R>(
833    head: &mut Column<(D, T, R)>,
834    pos: &mut usize,
835    list: &mut std::vec::IntoIter<Column<(D, T, R)>>,
836    result: &mut Column<(D, T, R)>,
837    output: &mut Vec<Column<(D, T, R)>>,
838    stash: &mut Vec<Column<(D, T, R)>>,
839) where
840    D: Columnar + Default,
841    for<'a> columnar::Ref<'a, D>: Copy + Ord,
842    T: Columnar + Default + Clone + PartialOrder,
843    for<'a> columnar::Ref<'a, T>: Copy + Ord,
844    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
845    for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
846    for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
847    for<'a> <D as Columnar>::Container: columnar::Push<&'a D>,
848    for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
849    for<'a> <T as Columnar>::Container: columnar::Push<&'a T>,
850    for<'a> <R as Columnar>::Container: columnar::Push<columnar::Ref<'a, R>>,
851    for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
852{
853    if *pos < head.borrow().len() {
854        // 1-input dispatch — bulk copy that runs to completion; the yield
855        // signal is unused.
856        let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos));
857    }
858    if !result.is_empty() {
859        output.push(std::mem::take(result));
860        *result = empty_chunk(stash);
861    }
862    Extend::extend(output, list);
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868
869    /// Drive a single `push_into` call with `inputs` and collect the
870    /// consolidated output (if any) as owned tuples.
871    fn run_chunker<D, T, R>(inputs: &[(D, T, R)]) -> Vec<(D, T, R)>
872    where
873        D: Columnar + Clone,
874        for<'a> columnar::Ref<'a, D>: Copy + Ord,
875        T: Columnar + Clone,
876        for<'a> columnar::Ref<'a, T>: Copy + Ord,
877        R: Columnar + Clone + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
878        for<'a> columnar::Ref<'a, R>: Ord,
879        <(D, T, R) as Columnar>::Container: Clone,
880        for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
881        <(D, T, R) as Columnar>::Container: columnar::Push<(D, T, R)>,
882        for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
883        for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
884        for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
885    {
886        let mut input: Column<(D, T, R)> = Default::default();
887        for tuple in inputs.iter().cloned() {
888            input.push_into(tuple);
889        }
890
891        let mut chunker: ColumnChunker<(D, T, R)> = Default::default();
892        chunker.push_into(&mut input);
893
894        let mut out = Vec::new();
895        while let Some(chunk) = chunker.extract() {
896            for (d, t, r) in chunk.borrow().into_index_iter() {
897                out.push((D::into_owned(d), T::into_owned(t), R::into_owned(r)));
898            }
899        }
900        out
901    }
902
903    #[mz_ore::test]
904    fn empty_input_yields_no_chunk() {
905        let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
906        let mut input: Column<(u64, u64, i64)> = Default::default();
907        chunker.push_into(&mut input);
908        assert!(chunker.extract().is_none());
909        assert!(chunker.finish().is_none());
910    }
911
912    #[mz_ore::test]
913    fn unsorted_input_is_sorted() {
914        let out = run_chunker(&[(3u64, 0u64, 1i64), (1u64, 0u64, 1i64), (2u64, 0u64, 1i64)]);
915        assert_eq!(out, vec![(1, 0, 1), (2, 0, 1), (3, 0, 1)]);
916    }
917
918    #[mz_ore::test]
919    fn duplicate_keys_consolidate() {
920        let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, 2i64), (1u64, 0u64, -1i64)]);
921        assert_eq!(out, vec![(1, 0, 2)]);
922    }
923
924    #[mz_ore::test]
925    fn diffs_summing_to_zero_are_dropped() {
926        let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, -1i64)]);
927        assert!(out.is_empty());
928    }
929
930    #[mz_ore::test]
931    fn mixed_consolidation() {
932        // (1, 0): 1 + 2 + (-3) = 0  -> dropped
933        // (2, 0): 1            = 1  -> kept
934        // (1, 1): 5            = 5  -> kept (different time from the (1, 0) group)
935        let out = run_chunker(&[
936            (1u64, 0u64, 1i64),
937            (2u64, 0u64, 1i64),
938            (1u64, 0u64, 2i64),
939            (1u64, 1u64, 5i64),
940            (1u64, 0u64, -3i64),
941        ]);
942        assert_eq!(out, vec![(1, 1, 5), (2, 0, 1)]);
943    }
944
945    #[mz_ore::test]
946    fn key_val_tuple_data() {
947        // Exercise the actual val-batcher shape: `D = (K, V)`.
948        let out = run_chunker(&[
949            ((1u64, 10u64), 0u64, 1i64),
950            ((1u64, 10u64), 0u64, 1i64),
951            ((1u64, 11u64), 0u64, 1i64),
952            ((2u64, 10u64), 0u64, 1i64),
953        ]);
954        assert_eq!(
955            out,
956            vec![((1, 10), 0, 2), ((1, 11), 0, 1), ((2, 10), 0, 1),]
957        );
958    }
959
960    #[mz_ore::test]
961    fn buffer_reuse_across_calls() {
962        // Two sequential push_into calls; second runs after extract returned
963        // the first chunk, exercising the in-place clear path.
964        let mut input1: Column<(u64, u64, i64)> = Default::default();
965        input1.push_into((1u64, 0u64, 1i64));
966        input1.push_into((2u64, 0u64, 1i64));
967
968        let mut input2: Column<(u64, u64, i64)> = Default::default();
969        input2.push_into((3u64, 0u64, 1i64));
970        input2.push_into((1u64, 0u64, 1i64));
971
972        let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
973        chunker.push_into(&mut input1);
974
975        // Hand back the first chunk via extract, simulating the merge batcher
976        // taking ownership of the &mut and then returning.
977        {
978            let _ = chunker.extract().expect("first chunk");
979        }
980
981        chunker.push_into(&mut input2);
982
983        let chunk = chunker.extract().expect("second chunk");
984        let collected: Vec<_> = chunk
985            .borrow()
986            .into_index_iter()
987            .map(|(d, t, r)| (u64::into_owned(d), u64::into_owned(t), i64::into_owned(r)))
988            .collect();
989        assert_eq!(collected, vec![(1, 0, 1), (3, 0, 1)]);
990    }
991
992    /// Build a `Column<((u64, u64), u64, i64)>` from a slice of tuples.
993    fn col(rows: &[((u64, u64), u64, i64)]) -> Column<((u64, u64), u64, i64)> {
994        let mut c: Column<((u64, u64), u64, i64)> = Default::default();
995        for &t in rows {
996            c.push_into(t);
997        }
998        c
999    }
1000
1001    fn collect_chunks(chunks: &[Column<((u64, u64), u64, i64)>]) -> Vec<((u64, u64), u64, i64)> {
1002        chunks
1003            .iter()
1004            .flat_map(|c| {
1005                c.borrow().into_index_iter().map(|((k, v), t, r)| {
1006                    (
1007                        (u64::into_owned(k), u64::into_owned(v)),
1008                        u64::into_owned(t),
1009                        i64::into_owned(r),
1010                    )
1011                })
1012            })
1013            .collect()
1014    }
1015
1016    /// Disjoint-range chains exercise the whole-chunk passthrough fast path:
1017    /// every chunk in chain1 is sortable-before every chunk in chain2, so
1018    /// each outer-loop iteration should hand a chunk straight to `output`
1019    /// without recursing through the per-record merge.
1020    #[mz_ore::test]
1021    fn merger_disjoint_chains_passthrough() {
1022        let chain1 = vec![
1023            col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]),
1024            col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]),
1025        ];
1026        let chain2 = vec![
1027            col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]),
1028            col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]),
1029        ];
1030
1031        let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1032        let mut output = Vec::new();
1033        let mut stash = Vec::new();
1034        Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1035
1036        let collected = collect_chunks(&output);
1037        let expected: Vec<_> = (0..4u64)
1038            .map(|d| ((d, 0u64), 0u64, 1i64))
1039            .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64)))
1040            .collect();
1041        assert_eq!(collected, expected);
1042    }
1043
1044    /// Interleaved chains never satisfy the passthrough condition; each
1045    /// outer iteration falls through to `merge_from`. Same correctness
1046    /// expectation, exercises the non-passthrough path under
1047    /// `Merger::merge`.
1048    #[mz_ore::test]
1049    fn merger_interleaved_chains() {
1050        // Even keys on one chain, odd on the other; chunks alternate so the
1051        // per-record path is the only viable route.
1052        let chain1 = vec![
1053            col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]),
1054            col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]),
1055        ];
1056        let chain2 = vec![
1057            col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]),
1058            col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]),
1059        ];
1060
1061        let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1062        let mut output = Vec::new();
1063        let mut stash = Vec::new();
1064        Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1065
1066        let collected = collect_chunks(&output);
1067        let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
1068        assert_eq!(collected, expected);
1069    }
1070
1071    /// Passthrough must consolidate adjacent equal keys at chunk
1072    /// boundaries — i.e., must NOT fire when `chain1`'s last record's
1073    /// `(d, t)` equals `chain2`'s first.
1074    #[mz_ore::test]
1075    fn merger_passthrough_respects_equal_boundary() {
1076        // chain1's last == chain2's first key: equal-key consolidation
1077        // must kick in (sum of diffs would be 2). If passthrough fired
1078        // erroneously, both records would land in different output chunks
1079        // unconsolidated.
1080        let chain1 = vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])];
1081        let chain2 = vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])];
1082
1083        let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1084        let mut output = Vec::new();
1085        let mut stash = Vec::new();
1086        Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1087
1088        let collected = collect_chunks(&output);
1089        assert_eq!(
1090            collected,
1091            vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]
1092        );
1093    }
1094}
1095
1096#[cfg(test)]
1097mod proptests {
1098    //! Property tests for `Column::merge_from` and `Column::extract`.
1099    //!
1100    //! Strategy: generate sorted+consolidated inputs (the merger's input
1101    //! contract), drive `merge_from` / `extract` the same way the framework
1102    //! would, and compare against a brute-force reference impl.
1103    //!
1104    //! Test types are `D = (u64, u64)`, `T = u64`, `R = i64` drawn from small
1105    //! ranges so that equal-key collisions are common and the consolidation
1106    //! path actually runs.
1107    use super::*;
1108    use mz_ore::cast::CastFrom;
1109    use proptest::prelude::*;
1110    use timely::progress::frontier::Antichain;
1111
1112    type Tuple = ((u64, u64), u64, i64);
1113
1114    /// Reference consolidation: sort by `(data, time)`, sum diffs over equal
1115    /// pairs, drop zeros.
1116    fn consolidate(mut v: Vec<Tuple>) -> Vec<Tuple> {
1117        v.sort();
1118        let mut out: Vec<Tuple> = Vec::new();
1119        for (d, t, r) in v {
1120            if let Some(last) = out.last_mut() {
1121                if last.0 == d && last.1 == t {
1122                    last.2 += r;
1123                    continue;
1124                }
1125            }
1126            out.push((d, t, r));
1127        }
1128        out.retain(|x| x.2 != 0);
1129        out
1130    }
1131
1132    /// Strategy for sorted+consolidated input lists. Ranges are small to
1133    /// encourage equal-key collisions.
1134    fn arb_consolidated() -> impl Strategy<Value = Vec<Tuple>> {
1135        prop::collection::vec(((0u64..5, 0u64..5), 0u64..3, -3i64..=3i64), 0..30)
1136            .prop_map(consolidate)
1137    }
1138
1139    fn build_column(v: &[Tuple]) -> Column<Tuple> {
1140        let mut col: Column<Tuple> = Default::default();
1141        for tup in v {
1142            col.push_into(*tup);
1143        }
1144        col
1145    }
1146
1147    fn collect_column(col: &Column<Tuple>) -> Vec<Tuple> {
1148        col.borrow()
1149            .into_index_iter()
1150            .map(|((k, v), t, r)| {
1151                (
1152                    (u64::into_owned(k), u64::into_owned(v)),
1153                    u64::into_owned(t),
1154                    i64::into_owned(r),
1155                )
1156            })
1157            .collect()
1158    }
1159
1160    /// Drive a 2-way merge the same way `Merger::merge` would: a 2-input
1161    /// call until one side exhausts, then a 1-input drain for whichever
1162    /// side still has data.
1163    fn drive_merge(left: Column<Tuple>, right: Column<Tuple>) -> Column<Tuple> {
1164        let mut self_col: Column<Tuple> = Default::default();
1165        let mut others = [left, right];
1166        let mut positions = [0usize, 0];
1167        let _ = self_col.merge_from(&mut others, &mut positions);
1168
1169        let [left_done, right_done] = others;
1170        let [left_pos, right_pos] = positions;
1171
1172        if left_pos < left_done.borrow().len() {
1173            let mut tail = [left_done];
1174            let mut p = [left_pos];
1175            let _ = self_col.merge_from(&mut tail, &mut p);
1176        } else if right_pos < right_done.borrow().len() {
1177            let mut tail = [right_done];
1178            let mut p = [right_pos];
1179            let _ = self_col.merge_from(&mut tail, &mut p);
1180        }
1181
1182        self_col
1183    }
1184
1185    proptest! {
1186        /// `merge_from` with two sorted+consolidated inputs equals the
1187        /// reference consolidate(union).
1188        #[mz_ore::test]
1189        #[cfg_attr(miri, ignore)]
1190        fn merge_from_equals_consolidated_union(
1191            a in arb_consolidated(),
1192            b in arb_consolidated(),
1193        ) {
1194            let merged = drive_merge(build_column(&a), build_column(&b));
1195
1196            let mut union = a.clone();
1197            Extend::extend(&mut union, b.iter().copied());
1198            let expected = consolidate(union);
1199
1200            prop_assert_eq!(collect_column(&merged), expected);
1201        }
1202
1203        /// `merge_from` 1-input bulk-copy from a non-zero position equals
1204        /// `other[*pos..]`.
1205        #[mz_ore::test]
1206        #[cfg_attr(miri, ignore)]
1207        fn merge_from_one_input_drains_tail(
1208            data in arb_consolidated(),
1209            pos_frac in 0u32..=100,
1210        ) {
1211            // Cap at len so we always have a valid position.
1212            let len = data.len();
1213            let start_pos = if len == 0 { 0 } else {
1214                (usize::cast_from(pos_frac) * len) / 101
1215            };
1216
1217            // Self starts non-empty so we exercise the bulk-copy path, not the
1218            // empty-self swap shortcut.
1219            let mut self_col: Column<Tuple> = Default::default();
1220            let sentinel: Tuple = ((u64::MAX, u64::MAX), 0, 1);
1221            self_col.push_into(sentinel);
1222
1223            let mut others = [build_column(&data)];
1224            let mut positions = [start_pos];
1225            let _ = self_col.merge_from(&mut others, &mut positions);
1226
1227            let mut expected = vec![sentinel];
1228            Extend::extend(&mut expected, data[start_pos..].iter().copied());
1229
1230            prop_assert_eq!(collect_column(&self_col), expected);
1231            prop_assert_eq!(positions[0], len);
1232        }
1233
1234        /// `merge_from` 1-input swap shortcut: empty self + pos=0 should
1235        /// produce a column equal to the input.
1236        #[mz_ore::test]
1237        #[cfg_attr(miri, ignore)]
1238        fn merge_from_empty_self_swap(data in arb_consolidated()) {
1239            let mut self_col: Column<Tuple> = Default::default();
1240            let mut others = [build_column(&data)];
1241            let mut positions = [0usize];
1242            let _ = self_col.merge_from(&mut others, &mut positions);
1243
1244            prop_assert_eq!(collect_column(&self_col), data);
1245        }
1246
1247        /// `extract` partitions correctly:
1248        ///   - keep ∪ ship multiset-equals self
1249        ///   - upper.less_equal(t) for every kept time
1250        ///   - !upper.less_equal(t) for every shipped time
1251        ///   - frontier covers every kept time
1252        #[mz_ore::test]
1253        #[cfg_attr(miri, ignore)]
1254        fn extract_partitions_by_frontier(
1255            data in arb_consolidated(),
1256            upper_time in 0u64..=4,
1257        ) {
1258            let mut self_col = build_column(&data);
1259            let upper = Antichain::from_elem(upper_time);
1260            let mut frontier: Antichain<u64> = Antichain::new();
1261            let mut keep: Column<Tuple> = Default::default();
1262            let mut ship: Column<Tuple> = Default::default();
1263            let mut position = 0;
1264
1265            self_col.extract(
1266                &mut position,
1267                upper.borrow(),
1268                &mut frontier,
1269                &mut keep,
1270                &mut ship,
1271            );
1272
1273            // Single call drains the input (we removed the at_capacity yield).
1274            prop_assert_eq!(position, data.len());
1275
1276            let kept = collect_column(&keep);
1277            let shipped = collect_column(&ship);
1278
1279            // Partition predicate: kept times >= upper, shipped times < upper.
1280            for (_, t, _) in &kept {
1281                prop_assert!(
1282                    upper.borrow().less_equal(t),
1283                    "kept time {} should satisfy upper.less_equal", t,
1284                );
1285            }
1286            for (_, t, _) in &shipped {
1287                prop_assert!(
1288                    !upper.borrow().less_equal(t),
1289                    "shipped time {} should NOT satisfy upper.less_equal", t,
1290                );
1291            }
1292
1293            // Union (multiset) equals input.
1294            let mut union = kept.clone();
1295            Extend::extend(&mut union, shipped.iter().copied());
1296            union.sort();
1297            let mut expected_sorted = data.clone();
1298            expected_sorted.sort();
1299            prop_assert_eq!(union, expected_sorted);
1300
1301            // Frontier dominates every kept time.
1302            for (_, t, _) in &kept {
1303                prop_assert!(
1304                    frontier.less_equal(t),
1305                    "frontier should dominate kept time {}", t,
1306                );
1307            }
1308        }
1309
1310        /// Empty input → no work, frontier untouched, position = 0.
1311        #[mz_ore::test]
1312        #[cfg_attr(miri, ignore)]
1313        fn extract_empty_input(upper_time in 0u64..=4) {
1314            let mut self_col: Column<Tuple> = Default::default();
1315            let upper = Antichain::from_elem(upper_time);
1316            let mut frontier: Antichain<u64> = Antichain::new();
1317            let mut keep: Column<Tuple> = Default::default();
1318            let mut ship: Column<Tuple> = Default::default();
1319            let mut position = 0;
1320
1321            self_col.extract(
1322                &mut position,
1323                upper.borrow(),
1324                &mut frontier,
1325                &mut keep,
1326                &mut ship,
1327            );
1328
1329            prop_assert_eq!(position, 0);
1330            prop_assert!(collect_column(&keep).is_empty());
1331            prop_assert!(collect_column(&ship).is_empty());
1332            prop_assert!(frontier.elements().is_empty());
1333        }
1334    }
1335}