Skip to main content

mz_timely_util/columnar/
batcher.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types for consolidating, merging, and extracting columnar update collections.
17
18use std::collections::VecDeque;
19use std::marker::PhantomData;
20
21use crate::columnation::ColumnationStack;
22use columnar::Container as _;
23use columnar::Push as _;
24use columnar::{Clear, Columnar, Index, Len};
25use columnation::Columnation;
26use differential_dataflow::difference::Semigroup;
27use differential_dataflow::trace::implementations::merge_batcher::Merger;
28use timely::Accountable;
29use timely::Container;
30use timely::PartialOrder;
31use timely::container::{ContainerBuilder, PushInto, SizableContainer};
32use timely::progress::frontier::{Antichain, AntichainRef};
33
34use crate::columnar::Column;
35
36/// A chunker to transform input data into sorted columns.
37#[derive(Default)]
38pub struct Chunker<C> {
39    /// Buffer into which we'll consolidate.
40    ///
41    /// Also the buffer where we'll stage responses to `extract` and `finish`.
42    /// When these calls return, the buffer is available for reuse.
43    target: C,
44    /// Consolidated buffers ready to go.
45    ready: VecDeque<C>,
46}
47
48impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
49    type Container = C;
50
51    fn extract(&mut self) -> Option<&mut Self::Container> {
52        if let Some(ready) = self.ready.pop_front() {
53            self.target = ready;
54            Some(&mut self.target)
55        } else {
56            None
57        }
58    }
59
60    fn finish(&mut self) -> Option<&mut Self::Container> {
61        self.extract()
62    }
63}
64
65impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for Chunker<ColumnationStack<(D, T, R)>>
66where
67    D: Columnar + Columnation,
68    for<'b> columnar::Ref<'b, D>: Ord + Copy,
69    T: Columnar + Columnation,
70    for<'b> columnar::Ref<'b, T>: Ord + Copy,
71    R: Columnar + Columnation + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
72    for<'b> columnar::Ref<'b, R>: Ord,
73{
74    fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
75        // Sort input data
76        // TODO: consider `Vec<usize>` that we retain, containing indexes.
77        let borrowed = container.borrow();
78        let mut permutation = Vec::with_capacity(borrowed.len());
79        Extend::extend(&mut permutation, borrowed.into_index_iter());
80        permutation.sort();
81
82        self.target.clear();
83        // Iterate over the data, accumulating diffs for like keys.
84        let mut iter = permutation.drain(..);
85        if let Some((data, time, diff)) = iter.next() {
86            let mut owned_data = D::into_owned(data);
87            let mut owned_time = T::into_owned(time);
88
89            let mut prev_data = data;
90            let mut prev_time = time;
91            let mut prev_diff = <R as Columnar>::into_owned(diff);
92
93            for (data, time, diff) in iter {
94                if (&prev_data, &prev_time) == (&data, &time) {
95                    prev_diff.plus_equals(&diff);
96                } else {
97                    if !prev_diff.is_zero() {
98                        D::copy_from(&mut owned_data, prev_data);
99                        T::copy_from(&mut owned_time, prev_time);
100                        let tuple = (owned_data, owned_time, prev_diff);
101                        self.target.push_into(&tuple);
102                        (owned_data, owned_time, prev_diff) = tuple;
103                    }
104                    prev_data = data;
105                    prev_time = time;
106                    R::copy_from(&mut prev_diff, diff);
107                }
108            }
109
110            if !prev_diff.is_zero() {
111                D::copy_from(&mut owned_data, prev_data);
112                T::copy_from(&mut owned_time, prev_time);
113                let tuple = (owned_data, owned_time, prev_diff);
114                self.target.push_into(&tuple);
115            }
116        }
117
118        if !self.target.is_empty() {
119            self.ready.push_back(std::mem::take(&mut self.target));
120        }
121    }
122}
123
124/// A chunker that consolidates `Column<(D, T, R)>` updates into sorted `Column`
125/// chunks, without round-tripping through columnation.
126///
127/// Drop-in counterpart to [`Chunker`] for the merge-batcher path: same control
128/// flow (sort borrowed refs, fold equal `(data, time)` runs, drop zero diffs),
129/// but the consolidated output stays in [`Column`].
130pub struct ColumnChunker<U: Columnar> {
131    /// Container we consolidate into and present to extract/finish callers.
132    /// Always `Column::Typed` between calls so we can push into it.
133    target: Column<U>,
134    /// Sorted, consolidated chunks pending extraction.
135    ready: VecDeque<Column<U>>,
136}
137
138// Manual impl rather than `#[derive(Default)]`: the derive would synthesize
139// `impl<U: Columnar + Default>`, but `Column<U>: Default` only requires
140// `U: Columnar`, and adding a spurious `U: Default` bound would propagate
141// through every `ContainerBuilder for ColumnChunker<U>` impl.
142impl<U: Columnar> Default for ColumnChunker<U> {
143    fn default() -> Self {
144        Self {
145            target: Column::default(),
146            ready: VecDeque::new(),
147        }
148    }
149}
150
151impl<U: Columnar> ContainerBuilder for ColumnChunker<U>
152where
153    U::Container: Clone + 'static,
154{
155    type Container = Column<U>;
156
157    fn extract(&mut self) -> Option<&mut Self::Container> {
158        if let Some(ready) = self.ready.pop_front() {
159            self.target = ready;
160            Some(&mut self.target)
161        } else {
162            None
163        }
164    }
165
166    fn finish(&mut self) -> Option<&mut Self::Container> {
167        self.extract()
168    }
169}
170
171impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for ColumnChunker<(D, T, R)>
172where
173    D: Columnar,
174    for<'b> columnar::Ref<'b, D>: Copy + Ord,
175    T: Columnar,
176    for<'b> columnar::Ref<'b, T>: Copy + Ord,
177    R: Columnar + Default + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
178    for<'b> columnar::Ref<'b, R>: Ord,
179    for<'b> <D as Columnar>::Container: columnar::Push<columnar::Ref<'b, D>>,
180    for<'b> <T as Columnar>::Container: columnar::Push<columnar::Ref<'b, T>>,
181    for<'b> <R as Columnar>::Container: columnar::Push<&'b R>,
182{
183    fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
184        // Reset target to an empty owned container. If it's already `Typed`
185        // (steady state, possibly recycling a chunk just handed back via
186        // `extract`), clear in place to reuse buffer allocations. Otherwise
187        // start fresh — the bytes/align variants don't support push.
188        match &mut self.target {
189            Column::Typed(c) => c.clear(),
190            Column::Bytes(_) | Column::Align(_) => {
191                self.target = Column::Typed(Default::default());
192            }
193        }
194
195        // Sort input by columnar ref order.
196        let borrowed = container.borrow();
197        let mut permutation = Vec::with_capacity(borrowed.len());
198        Extend::extend(&mut permutation, borrowed.into_index_iter());
199        permutation.sort();
200
201        // Sweep sorted refs, accumulating diffs over equal `(data, time)`
202        // pairs and pushing non-zero results to the target's leaves. Refs
203        // from the input borrow are valid through the sweep, so D and T
204        // are pushed directly via each leaf's `Push<Ref<_>>` impl. Only R
205        // needs an owned scratch since it carries the consolidated sum.
206        {
207            let Column::Typed(target_c) = &mut self.target else {
208                unreachable!("target reset to Typed above");
209            };
210            let (target_d, target_t, target_r) = target_c;
211
212            let mut iter = permutation.drain(..);
213            if let Some((data, time, diff)) = iter.next() {
214                let mut prev_data = data;
215                let mut prev_time = time;
216                let mut prev_diff = <R as Columnar>::into_owned(diff);
217
218                for (data, time, diff) in iter {
219                    if (&prev_data, &prev_time) == (&data, &time) {
220                        prev_diff.plus_equals(&diff);
221                    } else {
222                        if !prev_diff.is_zero() {
223                            target_d.push(prev_data);
224                            target_t.push(prev_time);
225                            target_r.push(&prev_diff);
226                        }
227                        prev_data = data;
228                        prev_time = time;
229                        R::copy_from(&mut prev_diff, diff);
230                    }
231                }
232
233                if !prev_diff.is_zero() {
234                    target_d.push(prev_data);
235                    target_t.push(prev_time);
236                    target_r.push(&prev_diff);
237                }
238            }
239        }
240
241        if self.target.borrow().len() > 0 {
242            let chunk = std::mem::replace(&mut self.target, Column::Typed(Default::default()));
243            self.ready.push_back(chunk);
244        }
245    }
246}
247
248/// Advance `*lower` past every position in `[*lower, upper)` where `cmp`
249/// returns true.
250///
251/// On return, `*lower` is the first index `>= initial *lower` where `cmp`
252/// returns false, or `upper` if `cmp` holds through the end.
253///
254/// Takes the predicate as `FnMut(usize) -> bool` rather than a value-bearing
255/// closure so callers can index whichever subset of the input columns they
256/// actually need to compare — for the merger's `(d, t)`-keyed sort, this lets
257/// each probe touch only the D and T leaf views, skipping the diff column.
258///
259/// Compared to a linear scan, this is `O(log K)` for a run of length `K`
260/// satisfying `cmp` — useful when one side of a sorted merge has long runs
261/// dominated by the other side.
262fn gallop(upper: usize, lower: &mut usize, mut cmp: impl FnMut(usize) -> bool) {
263    // If `cmp` is already false at `*lower`, the run is empty — nothing to do.
264    if *lower < upper && cmp(*lower) {
265        // Phase 1 (overshoot): advance by exponentially growing steps as long
266        // as `cmp` holds. After this loop, `*lower` is the last position we
267        // confirmed satisfies `cmp`, and `*lower + step` either falls off the
268        // end or fails `cmp`. The boundary is somewhere in `(*lower, *lower +
269        // step]`.
270        let mut step = 1;
271        while *lower + step < upper && cmp(*lower + step) {
272            *lower += step;
273            step <<= 1;
274        }
275
276        // Phase 2 (binary descent): halve `step` and probe `*lower + step`,
277        // accepting the advance only when `cmp` still holds. This narrows the
278        // search range by half each iteration, settling on the largest index
279        // still satisfying `cmp`.
280        step >>= 1;
281        while step > 0 {
282            if *lower + step < upper && cmp(*lower + step) {
283                *lower += step;
284            }
285            step >>= 1;
286        }
287
288        // `*lower` now points at the last index where `cmp` holds; the caller
289        // wants the first index where it doesn't, so step past it.
290        *lower += 1;
291    }
292}
293
294/// Counterpart to `ColInternalMerger` (which merges `ColumnationStack` chunks).
295/// Drives the merge batcher with [`Column`]-shaped chunks, no columnation
296/// detour, by way of the inherent `merge_from` / `extract` methods on
297/// `Column<(D, T, R)>` below.
298pub struct ColumnMerger<D, T, R> {
299    _marker: PhantomData<(D, T, R)>,
300}
301
302impl<D, T, R> Default for ColumnMerger<D, T, R> {
303    fn default() -> Self {
304        Self {
305            _marker: PhantomData,
306        }
307    }
308}
309
310/// Per-chunk merge and extract for [`Column`]-shaped sorted chunks.
311///
312/// These are the building blocks that [`Merger for ColumnMerger`] orchestrates
313/// over chains of chunks. They're inherent methods rather than a trait impl
314/// so the merger can call them without going through any wrapper indirection.
315impl<D, T, R> Column<(D, T, R)>
316where
317    D: Columnar,
318    for<'a> columnar::Ref<'a, D>: Copy + Ord,
319    T: Columnar + Default + Clone + PartialOrder,
320    for<'a> columnar::Ref<'a, T>: Copy + Ord,
321    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
322{
323    /// Merge items from sorted inputs into `self`, advancing positions.
324    ///
325    /// Mirrors the dispatch shape used by the merge-batcher framework:
326    /// - **0**: no-op
327    /// - **1**: bulk copy (or swap, if `self` is empty and `*pos == 0`)
328    /// - **2**: merge two sorted streams, with diff consolidation on equal
329    ///   `(data, time)` keys and gallop bulk-copy of long single-side runs.
330    ///
331    /// Returns `true` if the merge stopped because the amortized ship-threshold
332    /// check inside the inner loop fired (the caller should ship `self` before
333    /// the next call). Returns `false` if the merge stopped because at least
334    /// one input was exhausted at its position (the caller should refill that
335    /// side; `self` may still be at capacity from accumulation across short
336    /// calls and the caller should also check `at_capacity` in that case).
337    ///
338    /// The 0- and 1-input dispatches always return `false`: 0 does no work,
339    /// 1 is a bulk copy or swap that runs to completion.
340    #[must_use]
341    pub fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) -> bool {
342        match others.len() {
343            0 => false,
344            1 => {
345                let other = &mut others[0];
346                let pos = &mut positions[0];
347                // If `self` is empty and `*pos == 0`, we can bulk swap in the other chunk.
348                if self.is_empty() && *pos == 0 {
349                    std::mem::swap(self, other);
350                    return false;
351                }
352                // Otherwise, bulk copy the remaining data from `other[*pos..]` into `self`.
353                let Column::Typed(self_c) = self else {
354                    unreachable!("merger chunks are always Column::Typed");
355                };
356                let src_c = other.borrow();
357                self_c.extend_from_self(src_c, *pos..other.borrow().len());
358                *pos = other.borrow().len();
359                false
360            }
361            2 => {
362                let (left, right) = others.split_at(1);
363                let (left_pos, right_pos) = positions.split_at_mut(1);
364                let left_borrow = left[0].borrow();
365                let right_borrow = right[0].borrow();
366
367                let Column::Typed(self_c) = self else {
368                    unreachable!("merger chunks are always Column::Typed");
369                };
370
371                // Split the input borrows into per-leaf views.
372                //
373                // A columnar tuple `Borrow::Ref` is recursive: indexing the
374                // tuple borrow walks every leaf (and reconstructs the nested
375                // ref tuple) regardless of which leaves the caller actually
376                // reads. Indexing each leaf view directly cuts probe-path
377                // work to the columns we consult — for the merge step
378                // that's the `(D, T)` key, with the diff column read only
379                // when we push.
380                let l_d = left_borrow.0;
381                let l_t = left_borrow.1;
382                let l_r = left_borrow.2;
383                let r_d = right_borrow.0;
384                let r_t = right_borrow.1;
385                let r_r = right_borrow.2;
386                let upper_l = l_d.len();
387                let upper_r = r_d.len();
388
389                // Mirror the split on the output container. Tuple
390                // containers split into per-leaf containers, which lets us
391                // address each leaf independently — both gallop bulk-copies
392                // and single-record pushes resolve to a primitive operation
393                // per leaf. The leaves stay length-synchronized as long as
394                // every record path pushes exactly one element to each.
395                let (sd, st, sr) = self_c;
396
397                // Pre-size each output leaf for the worst-case merge
398                // (no consolidation): `len(left) + len(right)` records.
399                // `reserve_for` walks each input's `as_bytes`, which is
400                // accurate for variable-length leaves (where reserving a
401                // record count wouldn't size the byte buffer correctly).
402                //
403                // Gated by record count: above a few hundred thousand
404                // records the input bound over-reserves any time
405                // consolidation is heavy, and the framework's outer
406                // ship-threshold check yields us before we'd use the
407                // headroom. For inputs past that point, geometric grow
408                // is bounded by 2× the actual output and avoids
409                // committing pages we'd never touch.
410                const RESERVE_RECORD_THRESHOLD: usize = 1_000_000;
411                if upper_l + upper_r <= RESERVE_RECORD_THRESHOLD {
412                    use columnar::Container as _;
413                    let inputs = [left_borrow, right_borrow];
414                    sd.reserve_for(inputs.iter().map(|b| b.0));
415                    st.reserve_for(inputs.iter().map(|b| b.1));
416                    sr.reserve_for(inputs.iter().map(|b| b.2));
417                }
418
419                let mut stash = R::default();
420
421                // Mid-merge ship-threshold check, matching the heuristic
422                // used by `Column::at_capacity` and `ColumnBuilder`. The
423                // tuple `(sd.borrow(), st.borrow(), sr.borrow())` chains
424                // its leaves' `as_bytes` iterators, so passing it to
425                // `at_serialized_capacity` reuses the canonical
426                // `indexed::length_in_words` formula without needing the
427                // parent borrow we destructured.
428                //
429                // The check walks every leaf slice once per call, which
430                // is non-trivial on variable-length leaves; the caller
431                // runs it every `THRESHOLD_PERIOD_MASK + 1` iterations
432                // rather than per-iter. The ship threshold is ~65 K
433                // records, so overshooting by ~1 K records before the
434                // check fires has no practical impact — the framework's
435                // outer `at_capacity` check sees the oversize chunk and
436                // ships it regardless.
437                let at_ship_threshold =
438                    |sd: &D::Container, st: &T::Container, sr: &R::Container| {
439                        use columnar::Borrow as _;
440                        crate::columnar::at_serialized_capacity(&(
441                            sd.borrow(),
442                            st.borrow(),
443                            sr.borrow(),
444                        ))
445                    };
446                const THRESHOLD_PERIOD_MASK: u32 = 1023;
447                let mut iter: u32 = 0;
448                let mut yielded = false;
449
450                while left_pos[0] < upper_l && right_pos[0] < upper_r {
451                    let d1 = l_d.get(left_pos[0]);
452                    let t1 = l_t.get(left_pos[0]);
453                    let d2 = r_d.get(right_pos[0]);
454                    let t2 = r_t.get(right_pos[0]);
455                    match (d1, t1).cmp(&(d2, t2)) {
456                        std::cmp::Ordering::Less => {
457                            // Common case (interleaved data): single-record
458                            // advance. Skip the gallop call entirely — its
459                            // setup plus the first cmp probe is more
460                            // expensive than just pushing this record and
461                            // re-entering the outer loop. Galloping is only
462                            // worthwhile when there's an actual run, which
463                            // we detect with the peek check below.
464                            sd.push(d1);
465                            st.push(t1);
466                            sr.push(l_r.get(left_pos[0]));
467                            left_pos[0] += 1;
468                            // Long-run case: peek at the next record; if
469                            // it's still strictly less than `(d2, t2)`,
470                            // we have a run worth galloping (and bulk-
471                            // copying).
472                            if left_pos[0] < upper_l
473                                && (l_d.get(left_pos[0]), l_t.get(left_pos[0])) < (d2, t2)
474                            {
475                                let start = left_pos[0];
476                                gallop(upper_l, &mut left_pos[0], |i| {
477                                    (l_d.get(i), l_t.get(i)) < (d2, t2)
478                                });
479                                // Per-leaf bulk copy of the run. Each
480                                // call resolves to an `extend_from_slice`
481                                // on its leaf (recursively for nested
482                                // leaves), which the compiler can
483                                // autovectorize.
484                                sd.extend_from_self(l_d, start..left_pos[0]);
485                                st.extend_from_self(l_t, start..left_pos[0]);
486                                sr.extend_from_self(l_r, start..left_pos[0]);
487                            }
488                        }
489                        std::cmp::Ordering::Greater => {
490                            // Symmetric on the right side.
491                            sd.push(d2);
492                            st.push(t2);
493                            sr.push(r_r.get(right_pos[0]));
494                            right_pos[0] += 1;
495                            if right_pos[0] < upper_r
496                                && (r_d.get(right_pos[0]), r_t.get(right_pos[0])) < (d1, t1)
497                            {
498                                let start = right_pos[0];
499                                gallop(upper_r, &mut right_pos[0], |i| {
500                                    (r_d.get(i), r_t.get(i)) < (d1, t1)
501                                });
502                                sd.extend_from_self(r_d, start..right_pos[0]);
503                                st.extend_from_self(r_t, start..right_pos[0]);
504                                sr.extend_from_self(r_r, start..right_pos[0]);
505                            }
506                        }
507                        std::cmp::Ordering::Equal => {
508                            let r1 = l_r.get(left_pos[0]);
509                            let r2 = r_r.get(right_pos[0]);
510                            R::copy_from(&mut stash, r1);
511                            stash.plus_equals(&r2);
512                            if !stash.is_zero() {
513                                sd.push(d1);
514                                st.push(t1);
515                                sr.push(&stash);
516                            }
517                            left_pos[0] += 1;
518                            right_pos[0] += 1;
519                        }
520                    }
521
522                    // Amortized ship-threshold check; see comment above
523                    // `at_ship_threshold` for rationale.
524                    iter = iter.wrapping_add(1);
525                    if iter & THRESHOLD_PERIOD_MASK == 0 && at_ship_threshold(sd, st, sr) {
526                        yielded = true;
527                        break;
528                    }
529                }
530                yielded
531            }
532            // `Merger::merge` only ever calls `merge_from` with 0/1/2-input
533            // slices (k-way merge isn't part of the merge-batcher contract).
534            // Defensive guard: if someone bumps that, this will panic
535            // immediately rather than silently produce wrong output.
536            n => unreachable!("merge_from called with {n} inputs; expected 0, 1, or 2"),
537        }
538    }
539
540    /// Partition records starting at `*position` into `keep` (times beyond
541    /// `upper`, retained for the next round) and `ship` (times not beyond
542    /// `upper`, sealed into the output batch). Updates `frontier` with the
543    /// times of kept records.
544    ///
545    /// The caller invokes `extract` repeatedly until `*position >= self.len()`,
546    /// swapping out a full output buffer between calls. This shape exists
547    /// because the framework only checks `at_capacity()` between calls, so
548    /// without an inner-loop yield a single call could quietly produce
549    /// oversized output chunks.
550    pub fn extract(
551        &mut self,
552        position: &mut usize,
553        upper: AntichainRef<T>,
554        frontier: &mut Antichain<T>,
555        keep: &mut Self,
556        ship: &mut Self,
557    ) {
558        let Column::Typed(keep_c) = keep else {
559            unreachable!("merger chunks are always Column::Typed");
560        };
561        let Column::Typed(ship_c) = ship else {
562            unreachable!("merger chunks are always Column::Typed");
563        };
564
565        let self_view = self.borrow();
566        let len = self_view.len();
567
568        // Yield to the framework when either output buffer reaches the
569        // ship threshold, so it can ship a full chunk and hand back a
570        // fresh one. Required by the merger's extract contract: the
571        // framework only checks `at_capacity` between calls, so without
572        // an inner-loop yield a single call can fill an output well past
573        // threshold.
574        use columnar::Borrow as _;
575        let mut owned_t = T::default();
576        while *position < len
577            && !crate::columnar::at_serialized_capacity(&keep_c.borrow())
578            && !crate::columnar::at_serialized_capacity(&ship_c.borrow())
579        {
580            let (_, time, _) = self_view.get(*position);
581            T::copy_from(&mut owned_t, time);
582            if upper.less_equal(&owned_t) {
583                // `insert_with` only clones when the time isn't already
584                // present in the antichain.
585                frontier.insert_with(&owned_t, |t| t.clone());
586                keep_c.extend_from_self(self_view, *position..*position + 1);
587            } else {
588                ship_c.extend_from_self(self_view, *position..*position + 1);
589            }
590            *position += 1;
591        }
592    }
593}
594
595/// `Merger` impl driving [`MergeBatcher`] over [`Column`]-shaped chunks.
596///
597/// `merge` walks two sorted chains of chunks in lockstep, calling
598/// `Column::merge_from` to consume up to one ship-threshold's worth of input
599/// per pass and shipping `result` to `output` whenever it crosses
600/// `at_capacity`. Exhausted input chunks are reset and pushed to `stash` for
601/// reuse. The drain phase appends remaining full chunks to `output`
602/// directly, with no per-element copy.
603///
604/// `extract` walks each chunk via `Column::extract`, partitioning records
605/// into `kept` (times beyond `upper`) and `ship` (sealed into the output
606/// batch); both grow chunk-by-chunk under the same `at_capacity` ship
607/// signal.
608///
609/// [`MergeBatcher`]: differential_dataflow::trace::implementations::merge_batcher::MergeBatcher
610impl<D, T, R> Merger for ColumnMerger<D, T, R>
611where
612    D: Columnar,
613    for<'a> columnar::Ref<'a, D>: Copy + Ord,
614    T: Columnar + Default + Clone + Ord + PartialOrder,
615    for<'a> columnar::Ref<'a, T>: Copy + Ord,
616    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
617{
618    type Time = T;
619    type Chunk = Column<(D, T, R)>;
620
621    fn merge(
622        &mut self,
623        list1: Vec<Self::Chunk>,
624        list2: Vec<Self::Chunk>,
625        output: &mut Vec<Self::Chunk>,
626        stash: &mut Vec<Self::Chunk>,
627    ) {
628        let mut list1 = list1.into_iter();
629        let mut list2 = list2.into_iter();
630
631        let mut heads = [
632            list1.next().unwrap_or_default(),
633            list2.next().unwrap_or_default(),
634        ];
635        let mut positions = [0usize, 0usize];
636
637        let mut result = empty_chunk(stash);
638
639        // Main merge loop: both sides have data.
640        loop {
641            let upper_l = heads[0].borrow().len();
642            let upper_r = heads[1].borrow().len();
643            if positions[0] >= upper_l || positions[1] >= upper_r {
644                break;
645            }
646
647            // Whole-chunk passthrough fast path. When one head's tail (from
648            // its current position) is sortable-before the other head's
649            // current record, the entire tail can be appended to `output`
650            // without per-record compares or per-leaf byte copies.
651            //
652            // Two probes (one record from each side) settle this — when it
653            // fires, it skips an entire `merge_from` invocation, including
654            // its gallop bulk-copies, and replaces the byte-level extend
655            // with a `mem::replace` of the head into `output`.
656            //
657            // Restricted to `positions[i] == 0` so we can hand the head off
658            // wholesale; partial-tail passthrough would require a 1-input
659            // `merge_from` to materialize the tail into a new chunk, which
660            // is what gallop already handles inside the merge loop.
661            let lhs_passthrough = positions[0] == 0 && upper_l > 0 && {
662                let lhs = heads[0].borrow();
663                let rhs = heads[1].borrow();
664                let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1));
665                let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1]));
666                last_l < cur_r
667            };
668            if lhs_passthrough {
669                if !result.is_empty() {
670                    output.push(std::mem::take(&mut result));
671                    result = empty_chunk(stash);
672                }
673                let head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
674                output.push(head);
675                positions[0] = 0;
676                continue;
677            }
678
679            let rhs_passthrough = positions[1] == 0 && upper_r > 0 && {
680                let lhs = heads[0].borrow();
681                let rhs = heads[1].borrow();
682                let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1));
683                let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0]));
684                last_r < cur_l
685            };
686            if rhs_passthrough {
687                if !result.is_empty() {
688                    output.push(std::mem::take(&mut result));
689                    result = empty_chunk(stash);
690                }
691                let head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
692                output.push(head);
693                positions[1] = 0;
694                continue;
695            }
696
697            // Per-record merge. `merge_from` returns `true` when its inner
698            // amortized ship-threshold check fires — short-circuit the
699            // outer `at_capacity` walk in that case.
700            let yielded = result.merge_from(&mut heads, &mut positions);
701
702            if positions[0] >= heads[0].borrow().len() {
703                let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
704                recycle_chunk(old, stash);
705                positions[0] = 0;
706            }
707            if positions[1] >= heads[1].borrow().len() {
708                let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
709                recycle_chunk(old, stash);
710                positions[1] = 0;
711            }
712            if yielded || result.at_capacity() {
713                output.push(std::mem::take(&mut result));
714                result = empty_chunk(stash);
715            }
716        }
717
718        // Drain remaining from each side: copy partial head, then append
719        // full chunks directly to output (no per-element copy).
720        drain_side(
721            &mut heads[0],
722            &mut positions[0],
723            &mut list1,
724            &mut result,
725            output,
726            stash,
727        );
728        drain_side(
729            &mut heads[1],
730            &mut positions[1],
731            &mut list2,
732            &mut result,
733            output,
734            stash,
735        );
736        if !result.is_empty() {
737            output.push(result);
738        }
739    }
740
741    fn extract(
742        &mut self,
743        merged: Vec<Self::Chunk>,
744        upper: AntichainRef<Self::Time>,
745        frontier: &mut Antichain<Self::Time>,
746        ship: &mut Vec<Self::Chunk>,
747        kept: &mut Vec<Self::Chunk>,
748        stash: &mut Vec<Self::Chunk>,
749    ) {
750        let mut keep = empty_chunk(stash);
751        let mut ready = empty_chunk(stash);
752
753        for mut buffer in merged {
754            let mut position = 0;
755            let len = buffer.borrow().len();
756            while position < len {
757                buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
758                if keep.at_capacity() {
759                    kept.push(std::mem::take(&mut keep));
760                    keep = empty_chunk(stash);
761                }
762                if ready.at_capacity() {
763                    ship.push(std::mem::take(&mut ready));
764                    ready = empty_chunk(stash);
765                }
766            }
767            recycle_chunk(buffer, stash);
768        }
769        if !keep.is_empty() {
770            kept.push(keep);
771        }
772        if !ready.is_empty() {
773            ship.push(ready);
774        }
775    }
776
777    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
778        use timely::dataflow::channels::ContainerBytes;
779        let records = usize::try_from(chunk.record_count()).expect("record_count is non-negative");
780        // Serialized footprint stands in for both `size` and `capacity`: the
781        // chunk owns one logical allocation worth of leaf storage, and we
782        // ship/recycle the whole thing rather than tracking per-leaf
783        // capacities. Treating `size == capacity` matches how the framework
784        // accounts already-shipped chunks (no slack to absorb).
785        let bytes = chunk.length_in_bytes();
786        (records, bytes, bytes, 1)
787    }
788}
789
790/// Pop a chunk from `stash` or allocate a fresh one. Stashed chunks are
791/// already cleared via `recycle_chunk`, so they're ready for push.
792#[inline]
793pub(crate) fn empty_chunk<C: Columnar>(stash: &mut Vec<Column<C>>) -> Column<C> {
794    stash.pop().unwrap_or_default()
795}
796
797/// Reset `chunk` to an empty `Typed` and push it to `stash` for reuse.
798///
799/// Chunks recycled here come from the merger and chunker, both of which
800/// produce `Typed`; only the typed allocations are worth caching for reuse.
801/// `Bytes` / `Align` chunks have no typed-side allocation to preserve, so we
802/// simply drop them — `empty_chunk` will produce a fresh default just as
803/// cheaply, and pushing them onto `stash` would only displace useful
804/// recycled allocations.
805#[inline]
806pub(crate) fn recycle_chunk<C: Columnar>(mut chunk: Column<C>, stash: &mut Vec<Column<C>>) {
807    if let Column::Typed(c) = &mut chunk {
808        c.clear();
809        stash.push(chunk);
810    }
811}
812
813/// Drain remaining items from one side into `result` / `output`.
814///
815/// Copies the partially-consumed head into `result` via `merge_from`'s 1-input
816/// path, then appends remaining full chunks directly to `output` without
817/// per-element copy.
818fn drain_side<D, T, R>(
819    head: &mut Column<(D, T, R)>,
820    pos: &mut usize,
821    list: &mut std::vec::IntoIter<Column<(D, T, R)>>,
822    result: &mut Column<(D, T, R)>,
823    output: &mut Vec<Column<(D, T, R)>>,
824    stash: &mut Vec<Column<(D, T, R)>>,
825) where
826    D: Columnar,
827    for<'a> columnar::Ref<'a, D>: Copy + Ord,
828    T: Columnar + Default + Clone + PartialOrder,
829    for<'a> columnar::Ref<'a, T>: Copy + Ord,
830    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
831{
832    if *pos < head.borrow().len() {
833        // 1-input dispatch — bulk copy that runs to completion; the yield
834        // signal is unused.
835        let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos));
836    }
837    if !result.is_empty() {
838        output.push(std::mem::take(result));
839        *result = empty_chunk(stash);
840    }
841    Extend::extend(output, list);
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847
848    /// Drive a single `push_into` call with `inputs` and collect the
849    /// consolidated output (if any) as owned tuples.
850    fn run_chunker<D, T, R>(inputs: &[(D, T, R)]) -> Vec<(D, T, R)>
851    where
852        D: Columnar + Clone,
853        for<'a> columnar::Ref<'a, D>: Copy + Ord,
854        T: Columnar + Clone,
855        for<'a> columnar::Ref<'a, T>: Copy + Ord,
856        R: Columnar + Clone + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
857        for<'a> columnar::Ref<'a, R>: Ord,
858        <(D, T, R) as Columnar>::Container: Clone,
859        for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
860        <(D, T, R) as Columnar>::Container: columnar::Push<(D, T, R)>,
861        for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
862        for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
863        for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
864    {
865        let mut input: Column<(D, T, R)> = Default::default();
866        for tuple in inputs.iter().cloned() {
867            input.push_into(tuple);
868        }
869
870        let mut chunker: ColumnChunker<(D, T, R)> = Default::default();
871        chunker.push_into(&mut input);
872
873        let mut out = Vec::new();
874        while let Some(chunk) = chunker.extract() {
875            for (d, t, r) in chunk.borrow().into_index_iter() {
876                out.push((D::into_owned(d), T::into_owned(t), R::into_owned(r)));
877            }
878        }
879        out
880    }
881
882    #[mz_ore::test]
883    fn empty_input_yields_no_chunk() {
884        let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
885        let mut input: Column<(u64, u64, i64)> = Default::default();
886        chunker.push_into(&mut input);
887        assert!(chunker.extract().is_none());
888        assert!(chunker.finish().is_none());
889    }
890
891    #[mz_ore::test]
892    fn unsorted_input_is_sorted() {
893        let out = run_chunker(&[(3u64, 0u64, 1i64), (1u64, 0u64, 1i64), (2u64, 0u64, 1i64)]);
894        assert_eq!(out, vec![(1, 0, 1), (2, 0, 1), (3, 0, 1)]);
895    }
896
897    #[mz_ore::test]
898    fn duplicate_keys_consolidate() {
899        let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, 2i64), (1u64, 0u64, -1i64)]);
900        assert_eq!(out, vec![(1, 0, 2)]);
901    }
902
903    #[mz_ore::test]
904    fn diffs_summing_to_zero_are_dropped() {
905        let out = run_chunker(&[(1u64, 0u64, 1i64), (1u64, 0u64, -1i64)]);
906        assert!(out.is_empty());
907    }
908
909    #[mz_ore::test]
910    fn mixed_consolidation() {
911        // (1, 0): 1 + 2 + (-3) = 0  -> dropped
912        // (2, 0): 1            = 1  -> kept
913        // (1, 1): 5            = 5  -> kept (different time from the (1, 0) group)
914        let out = run_chunker(&[
915            (1u64, 0u64, 1i64),
916            (2u64, 0u64, 1i64),
917            (1u64, 0u64, 2i64),
918            (1u64, 1u64, 5i64),
919            (1u64, 0u64, -3i64),
920        ]);
921        assert_eq!(out, vec![(1, 1, 5), (2, 0, 1)]);
922    }
923
924    #[mz_ore::test]
925    fn key_val_tuple_data() {
926        // Exercise the actual val-batcher shape: `D = (K, V)`.
927        let out = run_chunker(&[
928            ((1u64, 10u64), 0u64, 1i64),
929            ((1u64, 10u64), 0u64, 1i64),
930            ((1u64, 11u64), 0u64, 1i64),
931            ((2u64, 10u64), 0u64, 1i64),
932        ]);
933        assert_eq!(
934            out,
935            vec![((1, 10), 0, 2), ((1, 11), 0, 1), ((2, 10), 0, 1),]
936        );
937    }
938
939    #[mz_ore::test]
940    fn buffer_reuse_across_calls() {
941        // Two sequential push_into calls; second runs after extract returned
942        // the first chunk, exercising the in-place clear path.
943        let mut input1: Column<(u64, u64, i64)> = Default::default();
944        input1.push_into((1u64, 0u64, 1i64));
945        input1.push_into((2u64, 0u64, 1i64));
946
947        let mut input2: Column<(u64, u64, i64)> = Default::default();
948        input2.push_into((3u64, 0u64, 1i64));
949        input2.push_into((1u64, 0u64, 1i64));
950
951        let mut chunker: ColumnChunker<(u64, u64, i64)> = Default::default();
952        chunker.push_into(&mut input1);
953
954        // Hand back the first chunk via extract, simulating the merge batcher
955        // taking ownership of the &mut and then returning.
956        {
957            let _ = chunker.extract().expect("first chunk");
958        }
959
960        chunker.push_into(&mut input2);
961
962        let chunk = chunker.extract().expect("second chunk");
963        let collected: Vec<_> = chunk
964            .borrow()
965            .into_index_iter()
966            .map(|(d, t, r)| (u64::into_owned(d), u64::into_owned(t), i64::into_owned(r)))
967            .collect();
968        assert_eq!(collected, vec![(1, 0, 1), (3, 0, 1)]);
969    }
970
971    /// Build a `Column<((u64, u64), u64, i64)>` from a slice of tuples.
972    fn col(rows: &[((u64, u64), u64, i64)]) -> Column<((u64, u64), u64, i64)> {
973        let mut c: Column<((u64, u64), u64, i64)> = Default::default();
974        for &t in rows {
975            c.push_into(t);
976        }
977        c
978    }
979
980    fn collect_chunks(chunks: &[Column<((u64, u64), u64, i64)>]) -> Vec<((u64, u64), u64, i64)> {
981        chunks
982            .iter()
983            .flat_map(|c| {
984                c.borrow().into_index_iter().map(|((k, v), t, r)| {
985                    (
986                        (u64::into_owned(k), u64::into_owned(v)),
987                        u64::into_owned(t),
988                        i64::into_owned(r),
989                    )
990                })
991            })
992            .collect()
993    }
994
995    /// Disjoint-range chains exercise the whole-chunk passthrough fast path:
996    /// every chunk in chain1 is sortable-before every chunk in chain2, so
997    /// each outer-loop iteration should hand a chunk straight to `output`
998    /// without recursing through the per-record merge.
999    #[mz_ore::test]
1000    fn merger_disjoint_chains_passthrough() {
1001        let chain1 = vec![
1002            col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]),
1003            col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]),
1004        ];
1005        let chain2 = vec![
1006            col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]),
1007            col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]),
1008        ];
1009
1010        let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1011        let mut output = Vec::new();
1012        let mut stash = Vec::new();
1013        Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1014
1015        let collected = collect_chunks(&output);
1016        let expected: Vec<_> = (0..4u64)
1017            .map(|d| ((d, 0u64), 0u64, 1i64))
1018            .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64)))
1019            .collect();
1020        assert_eq!(collected, expected);
1021    }
1022
1023    /// Interleaved chains never satisfy the passthrough condition; each
1024    /// outer iteration falls through to `merge_from`. Same correctness
1025    /// expectation, exercises the non-passthrough path under
1026    /// `Merger::merge`.
1027    #[mz_ore::test]
1028    fn merger_interleaved_chains() {
1029        // Even keys on one chain, odd on the other; chunks alternate so the
1030        // per-record path is the only viable route.
1031        let chain1 = vec![
1032            col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]),
1033            col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]),
1034        ];
1035        let chain2 = vec![
1036            col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]),
1037            col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]),
1038        ];
1039
1040        let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1041        let mut output = Vec::new();
1042        let mut stash = Vec::new();
1043        Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1044
1045        let collected = collect_chunks(&output);
1046        let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
1047        assert_eq!(collected, expected);
1048    }
1049
1050    /// Passthrough must consolidate adjacent equal keys at chunk
1051    /// boundaries — i.e., must NOT fire when `chain1`'s last record's
1052    /// `(d, t)` equals `chain2`'s first.
1053    #[mz_ore::test]
1054    fn merger_passthrough_respects_equal_boundary() {
1055        // chain1's last == chain2's first key: equal-key consolidation
1056        // must kick in (sum of diffs would be 2). If passthrough fired
1057        // erroneously, both records would land in different output chunks
1058        // unconsolidated.
1059        let chain1 = vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])];
1060        let chain2 = vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])];
1061
1062        let mut merger: ColumnMerger<(u64, u64), u64, i64> = Default::default();
1063        let mut output = Vec::new();
1064        let mut stash = Vec::new();
1065        Merger::merge(&mut merger, chain1, chain2, &mut output, &mut stash);
1066
1067        let collected = collect_chunks(&output);
1068        assert_eq!(
1069            collected,
1070            vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]
1071        );
1072    }
1073}
1074
1075#[cfg(test)]
1076mod proptests {
1077    //! Property tests for `Column::merge_from` and `Column::extract`.
1078    //!
1079    //! Strategy: generate sorted+consolidated inputs (the merger's input
1080    //! contract), drive `merge_from` / `extract` the same way the framework
1081    //! would, and compare against a brute-force reference impl.
1082    //!
1083    //! Test types are `D = (u64, u64)`, `T = u64`, `R = i64` drawn from small
1084    //! ranges so that equal-key collisions are common and the consolidation
1085    //! path actually runs.
1086    use super::*;
1087    use mz_ore::cast::CastFrom;
1088    use proptest::prelude::*;
1089    use timely::progress::frontier::Antichain;
1090
1091    type Tuple = ((u64, u64), u64, i64);
1092
1093    /// Reference consolidation: sort by `(data, time)`, sum diffs over equal
1094    /// pairs, drop zeros.
1095    fn consolidate(mut v: Vec<Tuple>) -> Vec<Tuple> {
1096        v.sort();
1097        let mut out: Vec<Tuple> = Vec::new();
1098        for (d, t, r) in v {
1099            if let Some(last) = out.last_mut() {
1100                if last.0 == d && last.1 == t {
1101                    last.2 += r;
1102                    continue;
1103                }
1104            }
1105            out.push((d, t, r));
1106        }
1107        out.retain(|x| x.2 != 0);
1108        out
1109    }
1110
1111    /// Strategy for sorted+consolidated input lists. Ranges are small to
1112    /// encourage equal-key collisions.
1113    fn arb_consolidated() -> impl Strategy<Value = Vec<Tuple>> {
1114        prop::collection::vec(((0u64..5, 0u64..5), 0u64..3, -3i64..=3i64), 0..30)
1115            .prop_map(consolidate)
1116    }
1117
1118    fn build_column(v: &[Tuple]) -> Column<Tuple> {
1119        let mut col: Column<Tuple> = Default::default();
1120        for tup in v {
1121            col.push_into(*tup);
1122        }
1123        col
1124    }
1125
1126    fn collect_column(col: &Column<Tuple>) -> Vec<Tuple> {
1127        col.borrow()
1128            .into_index_iter()
1129            .map(|((k, v), t, r)| {
1130                (
1131                    (u64::into_owned(k), u64::into_owned(v)),
1132                    u64::into_owned(t),
1133                    i64::into_owned(r),
1134                )
1135            })
1136            .collect()
1137    }
1138
1139    /// Drive a 2-way merge the same way `Merger::merge` would: a 2-input
1140    /// call until one side exhausts, then a 1-input drain for whichever
1141    /// side still has data.
1142    fn drive_merge(left: Column<Tuple>, right: Column<Tuple>) -> Column<Tuple> {
1143        let mut self_col: Column<Tuple> = Default::default();
1144        let mut others = [left, right];
1145        let mut positions = [0usize, 0];
1146        let _ = self_col.merge_from(&mut others, &mut positions);
1147
1148        let [left_done, right_done] = others;
1149        let [left_pos, right_pos] = positions;
1150
1151        if left_pos < left_done.borrow().len() {
1152            let mut tail = [left_done];
1153            let mut p = [left_pos];
1154            let _ = self_col.merge_from(&mut tail, &mut p);
1155        } else if right_pos < right_done.borrow().len() {
1156            let mut tail = [right_done];
1157            let mut p = [right_pos];
1158            let _ = self_col.merge_from(&mut tail, &mut p);
1159        }
1160
1161        self_col
1162    }
1163
1164    proptest! {
1165        /// `merge_from` with two sorted+consolidated inputs equals the
1166        /// reference consolidate(union).
1167        #[mz_ore::test]
1168        #[cfg_attr(miri, ignore)]
1169        fn merge_from_equals_consolidated_union(
1170            a in arb_consolidated(),
1171            b in arb_consolidated(),
1172        ) {
1173            let merged = drive_merge(build_column(&a), build_column(&b));
1174
1175            let mut union = a.clone();
1176            Extend::extend(&mut union, b.iter().copied());
1177            let expected = consolidate(union);
1178
1179            prop_assert_eq!(collect_column(&merged), expected);
1180        }
1181
1182        /// `merge_from` 1-input bulk-copy from a non-zero position equals
1183        /// `other[*pos..]`.
1184        #[mz_ore::test]
1185        #[cfg_attr(miri, ignore)]
1186        fn merge_from_one_input_drains_tail(
1187            data in arb_consolidated(),
1188            pos_frac in 0u32..=100,
1189        ) {
1190            // Cap at len so we always have a valid position.
1191            let len = data.len();
1192            let start_pos = if len == 0 { 0 } else {
1193                (usize::cast_from(pos_frac) * len) / 101
1194            };
1195
1196            // Self starts non-empty so we exercise the bulk-copy path, not the
1197            // empty-self swap shortcut.
1198            let mut self_col: Column<Tuple> = Default::default();
1199            let sentinel: Tuple = ((u64::MAX, u64::MAX), 0, 1);
1200            self_col.push_into(sentinel);
1201
1202            let mut others = [build_column(&data)];
1203            let mut positions = [start_pos];
1204            let _ = self_col.merge_from(&mut others, &mut positions);
1205
1206            let mut expected = vec![sentinel];
1207            Extend::extend(&mut expected, data[start_pos..].iter().copied());
1208
1209            prop_assert_eq!(collect_column(&self_col), expected);
1210            prop_assert_eq!(positions[0], len);
1211        }
1212
1213        /// `merge_from` 1-input swap shortcut: empty self + pos=0 should
1214        /// produce a column equal to the input.
1215        #[mz_ore::test]
1216        #[cfg_attr(miri, ignore)]
1217        fn merge_from_empty_self_swap(data in arb_consolidated()) {
1218            let mut self_col: Column<Tuple> = Default::default();
1219            let mut others = [build_column(&data)];
1220            let mut positions = [0usize];
1221            let _ = self_col.merge_from(&mut others, &mut positions);
1222
1223            prop_assert_eq!(collect_column(&self_col), data);
1224        }
1225
1226        /// `extract` partitions correctly:
1227        ///   - keep ∪ ship multiset-equals self
1228        ///   - upper.less_equal(t) for every kept time
1229        ///   - !upper.less_equal(t) for every shipped time
1230        ///   - frontier covers every kept time
1231        #[mz_ore::test]
1232        #[cfg_attr(miri, ignore)]
1233        fn extract_partitions_by_frontier(
1234            data in arb_consolidated(),
1235            upper_time in 0u64..=4,
1236        ) {
1237            let mut self_col = build_column(&data);
1238            let upper = Antichain::from_elem(upper_time);
1239            let mut frontier: Antichain<u64> = Antichain::new();
1240            let mut keep: Column<Tuple> = Default::default();
1241            let mut ship: Column<Tuple> = Default::default();
1242            let mut position = 0;
1243
1244            self_col.extract(
1245                &mut position,
1246                upper.borrow(),
1247                &mut frontier,
1248                &mut keep,
1249                &mut ship,
1250            );
1251
1252            // Single call drains the input (we removed the at_capacity yield).
1253            prop_assert_eq!(position, data.len());
1254
1255            let kept = collect_column(&keep);
1256            let shipped = collect_column(&ship);
1257
1258            // Partition predicate: kept times >= upper, shipped times < upper.
1259            for (_, t, _) in &kept {
1260                prop_assert!(
1261                    upper.borrow().less_equal(t),
1262                    "kept time {} should satisfy upper.less_equal", t,
1263                );
1264            }
1265            for (_, t, _) in &shipped {
1266                prop_assert!(
1267                    !upper.borrow().less_equal(t),
1268                    "shipped time {} should NOT satisfy upper.less_equal", t,
1269                );
1270            }
1271
1272            // Union (multiset) equals input.
1273            let mut union = kept.clone();
1274            Extend::extend(&mut union, shipped.iter().copied());
1275            union.sort();
1276            let mut expected_sorted = data.clone();
1277            expected_sorted.sort();
1278            prop_assert_eq!(union, expected_sorted);
1279
1280            // Frontier dominates every kept time.
1281            for (_, t, _) in &kept {
1282                prop_assert!(
1283                    frontier.less_equal(t),
1284                    "frontier should dominate kept time {}", t,
1285                );
1286            }
1287        }
1288
1289        /// Empty input → no work, frontier untouched, position = 0.
1290        #[mz_ore::test]
1291        #[cfg_attr(miri, ignore)]
1292        fn extract_empty_input(upper_time in 0u64..=4) {
1293            let mut self_col: Column<Tuple> = Default::default();
1294            let upper = Antichain::from_elem(upper_time);
1295            let mut frontier: Antichain<u64> = Antichain::new();
1296            let mut keep: Column<Tuple> = Default::default();
1297            let mut ship: Column<Tuple> = Default::default();
1298            let mut position = 0;
1299
1300            self_col.extract(
1301                &mut position,
1302                upper.borrow(),
1303                &mut frontier,
1304                &mut keep,
1305                &mut ship,
1306            );
1307
1308            prop_assert_eq!(position, 0);
1309            prop_assert!(collect_column(&keep).is_empty());
1310            prop_assert!(collect_column(&ship).is_empty());
1311            prop_assert!(frontier.elements().is_empty());
1312        }
1313    }
1314}