Skip to main content

mz_timely_util/columnar/
merge_batcher.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Merge-batcher for [`Column`] chunks with per-chunk paging.
17//!
18//! Forks the [`differential_dataflow`] merge-batcher framework so chains can
19//! hold [`PagedColumn`] entries — letting the [`ColumnPager`] page chunks
20//! out as they're produced and fetch them back lazily during merge / extract.
21//!
22//! Reuses the resident building blocks from [`super::batcher`]: the inherent
23//! `Column::merge_from` / `Column::extract` methods (per-chunk merge / split).
24//! Input consolidation happens upstream: the chunker
25//! ([`super::batcher::ColumnChunker`]) is supplied to the arrange operator
26//! separately, so this batcher receives already-consolidated [`Column`] chunks
27//! via [`PushInto`].
28//!
29//! [`differential_dataflow`]: differential_dataflow::trace::implementations::merge_batcher
30
31use std::collections::VecDeque;
32
33use columnar::{Columnar, Index, Len};
34use differential_dataflow::difference::Semigroup;
35use differential_dataflow::logging::{BatcherEvent, Logger};
36use differential_dataflow::trace::{Batcher, Description};
37use timely::Accountable;
38use timely::PartialOrder;
39use timely::container::{PushInto, SizableContainer};
40use timely::dataflow::channels::ContainerBytes;
41use timely::progress::Timestamp;
42use timely::progress::frontier::{Antichain, AntichainRef};
43
44use crate::column_pager::{self, ColumnPager, PagedColumn};
45use crate::columnar::Column;
46use crate::columnar::batcher::{empty_chunk, recycle_chunk};
47
48/// Max recycled empty chunks held in the per-batcher stash. Deliberately
49/// tight: the stash is a hot-buffer cache for the result/keep/ship churn,
50/// not a hoard. Stash entries are cleared `Column::Typed` allocations that
51/// retain capacity but are *not* tracked by [`ColumnPager`]'s
52/// `ResidentTicket` accounting, so each one is a chunk's worth of resident
53/// bytes the pager's budget doesn't see. There's one stash per arrange
54/// batcher per worker, so this multiplies fast.
55///
56/// 2 covers steady-state reuse for both code paths: `merge_chains` ships
57/// `result` and immediately pulls a refill; `extract_chain` ships `keep` /
58/// `ship` and pulls a refill for whichever was at capacity. Heads that
59/// drain mid-loop arrive resident from `FetchIter`, so the whole-chunk
60/// passthrough fast path keeps most of them off the merge inner loop
61/// entirely — only a small minority ever flow back through the stash.
62const STASH_CAP: usize = 2;
63
64/// Don't park a buffer larger than this in the free-list. A transiently
65/// oversize merge buffer (post-explosion, past the natural ship threshold)
66/// held resident would compete with the pager's budget; drop it and let a
67/// fresh default regrow. 2 × the natural ship word count (≈ 4 MiB
68/// serialized) keeps normal ship-sized chunks while excluding pathological
69/// ones.
70const MAX_RECYCLE_BYTES: usize = 1 << 22;
71
72/// Recycle `chunk` only if the stash isn't already at [`STASH_CAP`] and the
73/// chunk isn't oversize per [`MAX_RECYCLE_BYTES`]. `length_in_bytes` is
74/// measured before clear, so it reflects the data the chunk was carrying
75/// (a proxy for the capacity we'd park).
76fn recycle_capped<C: Columnar>(chunk: Column<C>, stash: &mut Vec<Column<C>>) {
77    if stash.len() < STASH_CAP && chunk.length_in_bytes() <= MAX_RECYCLE_BYTES {
78        recycle_chunk(chunk, stash);
79    }
80}
81
82/// Drives the merge-batcher over [`Column`] chunks routed through a
83/// [`ColumnPager`].
84///
85/// Chains hold [`PagedColumn`] entries rather than resident [`Column`]s, so
86/// each insert / merge / extract step can hand its output to the pager and
87/// store whatever the policy returns (resident, paged, or compressed). Reads
88/// during merge materialize lazily via [`FetchIter`].
89///
90/// Resolves its pager lazily per call via [`column_pager::global_pager`], so
91/// late-arriving dyncfg updates (e.g. `enable_column_paged_batcher` flipping
92/// on after the batcher was constructed) take effect without rebuilding the
93/// operator. Tests may override that lookup via [`Self::set_pager`].
94pub struct ColumnMergeBatcher<D, T, R>
95where
96    D: Columnar,
97    T: Columnar,
98    R: Columnar,
99{
100    chains: Vec<VecDeque<PagedColumn<(D, T, R)>>>,
101    lower: Antichain<T>,
102    frontier: Antichain<T>,
103    /// Recycled empty `Column::Typed` chunks. Drained heads and shipped result
104    /// buffers feed in here; subsequent merge / extract calls pop from here
105    /// instead of starting from a zero-capacity `Column::default()`. Mirrors
106    /// the stash carried by the upstream `differential_dataflow` merge-batcher
107    /// framework, which this type forks. Without it, each shipped chunk
108    /// triggers a fresh per-leaf grow cycle and per-merge-round allocation
109    /// dominates the inner loop.
110    stash: Vec<Column<(D, T, R)>>,
111    /// Optional override. `None` means "read [`column_pager::global_pager`]
112    /// fresh on every use" — the production path, so worker_config dyncfg
113    /// changes that re-install the process-global pager take effect on the
114    /// very next chunk this batcher processes.
115    pager_override: Option<ColumnPager>,
116    logger: Option<Logger>,
117    operator_id: usize,
118}
119
120impl<D, T, R> ColumnMergeBatcher<D, T, R>
121where
122    D: Columnar,
123    T: Columnar,
124    R: Columnar,
125{
126    /// Pin the pager this batcher uses, overriding the thread-local lookup.
127    /// Mainly for tests; production should leave the override unset so
128    /// dyncfg-driven re-installs take effect immediately.
129    pub fn set_pager(&mut self, pager: ColumnPager) {
130        self.pager_override = Some(pager);
131    }
132
133    /// Current pager — override if set, else the process-global pager
134    /// installed by `apply_worker_config`. `ColumnPager` is cheaply
135    /// cloneable (Arc inside).
136    fn pager(&self) -> ColumnPager {
137        self.pager_override
138            .clone()
139            .unwrap_or_else(column_pager::global_pager)
140    }
141
142    /// Push a chain into `self.chains`, emitting a positive `BatcherEvent`
143    /// covering its resident entries.
144    fn chain_push(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>) {
145        self.emit_account(&chain, 1);
146        self.chains.push(chain);
147    }
148
149    /// Pop a chain from `self.chains`, emitting a negative `BatcherEvent`
150    /// retracting its resident entries.
151    ///
152    /// Invariant for the retract to reconcile against the matching
153    /// `chain_push`: chain entries are never mutated in place between push
154    /// and pop. The only allowed mutation is a full pop / push pair (see
155    /// `insert_chain` and `merge_by`), so each entry's accounting category
156    /// — `Resident` vs `Paged` vs `Compressed` — is the same at both ends.
157    /// If a future change ever pages an entry out in place after push, this
158    /// path silently double-counts.
159    fn chain_pop(&mut self) -> Option<VecDeque<PagedColumn<(D, T, R)>>> {
160        let chain = self.chains.pop()?;
161        self.emit_account(&chain, -1);
162        Some(chain)
163    }
164
165    /// Emit a single `BatcherEvent` summing resident accounting across
166    /// `chain` with the given sign. No-op when no logger is attached.
167    fn emit_account(&self, chain: &VecDeque<PagedColumn<(D, T, R)>>, diff: isize) {
168        let Some(logger) = &self.logger else {
169            return;
170        };
171        let (mut records, mut size, mut capacity, mut allocations) =
172            (0isize, 0isize, 0isize, 0isize);
173        for entry in chain {
174            let (r, s, c, a) = account_chunk(entry);
175            records = records.saturating_add_unsigned(r);
176            size = size.saturating_add_unsigned(s);
177            capacity = capacity.saturating_add_unsigned(c);
178            allocations = allocations.saturating_add_unsigned(a);
179        }
180        logger.log(BatcherEvent {
181            operator: self.operator_id,
182            records_diff: records.saturating_mul(diff),
183            size_diff: size.saturating_mul(diff),
184            capacity_diff: capacity.saturating_mul(diff),
185            allocations_diff: allocations.saturating_mul(diff),
186        });
187    }
188}
189
190impl<D, T, R> Drop for ColumnMergeBatcher<D, T, R>
191where
192    D: Columnar,
193    T: Columnar,
194    R: Columnar,
195{
196    fn drop(&mut self) {
197        // Retract accounting for any chains still resident at drop time so
198        // the BatcherEvent counters end at zero per-operator.
199        while self.chain_pop().is_some() {}
200    }
201}
202
203/// Resident-only accounting. Returns `(records, size_bytes, capacity_bytes,
204/// allocations)` for a single chain entry; paged-out entries contribute 0
205/// across the board.
206///
207/// `BatcherEvent` feeds the `mz_arrangement_batcher_*_raw` introspection
208/// tables, which downstream surface as memory-resource dashboards. Bytes
209/// living on swap or in a pager file aren't part of RSS and shouldn't be
210/// reported there.
211fn account_chunk<C: Columnar>(entry: &PagedColumn<C>) -> (usize, usize, usize, usize) {
212    match entry {
213        PagedColumn::Resident(col, _) => {
214            let records = usize::try_from(col.record_count()).expect("non-negative");
215            let bytes = col.length_in_bytes();
216            (records, bytes, bytes, 1)
217        }
218        PagedColumn::Paged { .. } | PagedColumn::Compressed { .. } => (0, 0, 0, 0),
219    }
220}
221
222impl<D, T, R> Batcher for ColumnMergeBatcher<D, T, R>
223where
224    D: Columnar,
225    for<'a> columnar::Ref<'a, D>: Copy + Ord,
226    T: Columnar + Default + Timestamp + PartialOrder,
227    for<'a> columnar::Ref<'a, T>: Copy + Ord,
228    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
229    for<'a> columnar::Ref<'a, R>: Ord,
230{
231    type Output = Column<(D, T, R)>;
232    type Time = T;
233
234    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
235        // No pager snapshot taken here — `self.pager()` reads
236        // `column_pager::global_pager` per call, so dyncfg-driven re-installs
237        // take effect on the next chunk.
238        Self {
239            chains: Vec::new(),
240            lower: Antichain::from_elem(T::minimum()),
241            frontier: Antichain::new(),
242            stash: Vec::new(),
243            pager_override: None,
244            logger,
245            operator_id,
246        }
247    }
248
249    fn seal(
250        &mut self,
251        upper: Antichain<Self::Time>,
252    ) -> (Vec<Self::Output>, Description<Self::Time>) {
253        let pager = self.pager();
254        // Merge all remaining chains into one.
255        while self.chains.len() > 1 {
256            let a = self.chain_pop().unwrap();
257            let b = self.chain_pop().unwrap();
258            let merged = self.merge_by(a, b);
259            self.chain_push(merged);
260        }
261        let merged = self.chain_pop().unwrap_or_default();
262
263        // Extract `merged` into `readied` (ship side, materialized for the
264        // builder) and `kept_chain` (keep side, stays paged for the next
265        // round).
266        let mut readied: Vec<Column<(D, T, R)>> = Vec::new();
267        let mut kept_chain: VecDeque<PagedColumn<(D, T, R)>> = VecDeque::new();
268        self.frontier.clear();
269        {
270            let pager = &pager;
271            let frontier = &mut self.frontier;
272            let stash = &mut self.stash;
273            extract_chain(
274                FetchIter::new(merged, pager),
275                upper.borrow(),
276                frontier,
277                |paged| readied.push(pager.take(paged)),
278                |paged| kept_chain.push_back(paged),
279                stash,
280            );
281        }
282
283        if !kept_chain.is_empty() {
284            self.chain_push(kept_chain);
285        }
286
287        let description = Description::new(
288            self.lower.clone(),
289            upper.clone(),
290            Antichain::from_elem(T::minimum()),
291        );
292        self.lower = upper;
293
294        // Drop the recycle stash now that this round's hot work is done.
295        // The next merge after the next `push_into` will re-pay one
296        // chunk's worth of leaf-`Vec` grow tax, but that's a few hundred µs
297        // amortized over a seal cycle, well worth handing the leaf bytes
298        // back to the allocator so they're not held resident across what
299        // may be a quiet stretch.
300        self.stash.clear();
301
302        (readied, description)
303    }
304
305    fn frontier(&mut self) -> AntichainRef<'_, Self::Time> {
306        self.frontier.borrow()
307    }
308}
309
310impl<D, T, R> PushInto<Column<(D, T, R)>> for ColumnMergeBatcher<D, T, R>
311where
312    D: Columnar,
313    for<'a> columnar::Ref<'a, D>: Copy + Ord,
314    T: Columnar + Default + Clone + PartialOrder,
315    for<'a> columnar::Ref<'a, T>: Copy + Ord,
316    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
317{
318    /// Accept an already-consolidated chunk from the upstream chunker, route
319    /// it through the pager, and insert it as a singleton chain.
320    fn push_into(&mut self, mut chunk: Column<(D, T, R)>) {
321        let pager = self.pager();
322        let paged = pager.page(&mut chunk);
323        self.insert_chain(VecDeque::from([paged]));
324    }
325}
326
327impl<D, T, R> ColumnMergeBatcher<D, T, R>
328where
329    D: Columnar,
330    for<'a> columnar::Ref<'a, D>: Copy + Ord,
331    T: Columnar + Default + Clone + PartialOrder,
332    for<'a> columnar::Ref<'a, T>: Copy + Ord,
333    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
334{
335    /// Insert `chain` and rebalance: while the youngest chain is at least
336    /// half the size of its predecessor, merge them.
337    fn insert_chain(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>) {
338        if chain.is_empty() {
339            return;
340        }
341        self.chain_push(chain);
342        while self.chains.len() > 1
343            && self.chains[self.chains.len() - 1].len()
344                >= self.chains[self.chains.len() - 2].len() / 2
345        {
346            let a = self.chain_pop().unwrap();
347            let b = self.chain_pop().unwrap();
348            let merged = self.merge_by(a, b);
349            self.chain_push(merged);
350        }
351    }
352
353    /// Merge two sorted chains. Outputs are routed through `self.pager.page`
354    /// per chunk produced, so the result chain holds `PagedColumn`s and the
355    /// caller never sees a fully materialized merge result.
356    fn merge_by(
357        &mut self,
358        a: VecDeque<PagedColumn<(D, T, R)>>,
359        b: VecDeque<PagedColumn<(D, T, R)>>,
360    ) -> VecDeque<PagedColumn<(D, T, R)>> {
361        let mut output: VecDeque<PagedColumn<(D, T, R)>> = VecDeque::new();
362        let pager = self.pager();
363        let pager = &pager;
364        let stash = &mut self.stash;
365        merge_chains(
366            FetchIter::new(a, pager),
367            FetchIter::new(b, pager),
368            |paged| output.push_back(paged),
369            stash,
370        );
371        output
372    }
373}
374
375/// Streaming materializer over a chain of [`PagedColumn`] entries.
376///
377/// `next` consumes one entry and calls [`ColumnPager::take`] to produce a
378/// resident [`Column`]. Bounds materialized chunks to whatever the consumer
379/// holds (typically one head per chain in [`merge_chains`]).
380pub struct FetchIter<'a, D, T, R>
381where
382    (D, T, R): Columnar,
383{
384    queue: VecDeque<PagedColumn<(D, T, R)>>,
385    pager: &'a ColumnPager,
386}
387
388impl<'a, D, T, R> FetchIter<'a, D, T, R>
389where
390    (D, T, R): Columnar,
391{
392    /// Wraps `queue` for streaming materialization through `pager`.
393    pub fn new(queue: VecDeque<PagedColumn<(D, T, R)>>, pager: &'a ColumnPager) -> Self {
394        Self { queue, pager }
395    }
396
397    /// Borrow the pager backing this iter so drivers can route output chunks
398    /// back through `page()` without threading a separate `&pager`. The
399    /// returned reference is tied to the outer `'a`, not to `&self`, so it
400    /// stays valid across subsequent `next()` calls.
401    pub fn pager(&self) -> &'a ColumnPager {
402        self.pager
403    }
404
405    /// Drain remaining queued entries as `PagedColumn`s without materializing.
406    /// Used by `merge_chains`'s drain-tail phase: once the other side is
407    /// exhausted, the remaining entries on this side can pass straight to the
408    /// output sink.
409    pub fn into_paged(self) -> std::collections::vec_deque::IntoIter<PagedColumn<(D, T, R)>> {
410        self.queue.into_iter()
411    }
412}
413
414impl<D, T, R> Iterator for FetchIter<'_, D, T, R>
415where
416    (D, T, R): Columnar,
417{
418    type Item = Column<(D, T, R)>;
419
420    fn next(&mut self) -> Option<Self::Item> {
421        self.queue.pop_front().map(|p| self.pager.take(p))
422    }
423}
424
425/// Two-way merge driver. Reuses today's per-chunk gallop / ship-threshold
426/// logic from `Column::merge_from`, but pulls heads from [`FetchIter`] and
427/// emits finished output chunks through `sink` after routing them through
428/// the pager exposed by [`FetchIter::pager`].
429///
430/// `stash` is a pool of empty `Column::Typed` chunks. Drained heads and
431/// shipped result buffers get recycled into it; the next result chunk is
432/// pulled from it instead of starting from a zero-capacity default. This
433/// matches the recycling discipline the upstream `differential_dataflow`
434/// merge-batcher carries via `Merger::merge`'s `stash` parameter.
435///
436/// Whole-chunk passthrough: heads arrive materialized from [`FetchIter`], so
437/// peeking endpoints is free. When the current head on one side sorts
438/// entirely before the current record on the other side, ship it wholesale
439/// and skip the per-record merge. Gated on `positions[i] == 0` so we hand
440/// the head off intact — partial-tail passthrough would need a 1-input
441/// `merge_from` to copy the tail, which is what the inner loop's gallop
442/// already covers.
443pub fn merge_chains<D, T, R, Sink>(
444    list1: FetchIter<'_, D, T, R>,
445    list2: FetchIter<'_, D, T, R>,
446    mut sink: Sink,
447    stash: &mut Vec<Column<(D, T, R)>>,
448) where
449    D: Columnar,
450    for<'a> columnar::Ref<'a, D>: Copy + Ord,
451    T: Columnar + Default + Clone + PartialOrder,
452    for<'a> columnar::Ref<'a, T>: Copy + Ord,
453    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
454    Sink: FnMut(PagedColumn<(D, T, R)>),
455{
456    let pager = list1.pager();
457    let mut list1 = list1;
458    let mut list2 = list2;
459
460    let mut heads = [
461        list1.next().unwrap_or_default(),
462        list2.next().unwrap_or_default(),
463    ];
464    let mut positions = [0usize, 0usize];
465    let mut result: Column<(D, T, R)> = empty_chunk(stash);
466
467    loop {
468        let upper_l = heads[0].borrow().len();
469        let upper_r = heads[1].borrow().len();
470        if positions[0] >= upper_l || positions[1] >= upper_r {
471            break;
472        }
473
474        // Whole-chunk passthrough. Two probes on already-resident heads.
475        let lhs_passthrough = positions[0] == 0 && upper_l > 0 && {
476            let lhs = heads[0].borrow();
477            let rhs = heads[1].borrow();
478            let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1));
479            let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1]));
480            last_l < cur_r
481        };
482        if lhs_passthrough {
483            if !result.is_empty() {
484                sink(pager.page(&mut result));
485                if let Some(reuse) = stash.pop() {
486                    result = reuse;
487                }
488            }
489            let mut head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
490            sink(pager.page(&mut head));
491            positions[0] = 0;
492            continue;
493        }
494
495        let rhs_passthrough = positions[1] == 0 && upper_r > 0 && {
496            let lhs = heads[0].borrow();
497            let rhs = heads[1].borrow();
498            let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1));
499            let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0]));
500            last_r < cur_l
501        };
502        if rhs_passthrough {
503            if !result.is_empty() {
504                sink(pager.page(&mut result));
505                if let Some(reuse) = stash.pop() {
506                    result = reuse;
507                }
508            }
509            let mut head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
510            sink(pager.page(&mut head));
511            positions[1] = 0;
512            continue;
513        }
514
515        let yielded = result.merge_from(&mut heads, &mut positions);
516
517        if positions[0] >= heads[0].borrow().len() {
518            let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
519            recycle_capped(old, stash);
520            positions[0] = 0;
521        }
522        if positions[1] >= heads[1].borrow().len() {
523            let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
524            recycle_capped(old, stash);
525            positions[1] = 0;
526        }
527        if yielded || result.at_capacity() {
528            sink(pager.page(&mut result));
529            // `pager.page` either took `result`'s allocation (Skip path leaves
530            // a zero-cap default) or kept the Typed buffer (Paged / Compressed
531            // paths clear in place). Pull a fresh chunk from the stash so the
532            // next `merge_from` starts with retained capacity; if the stash is
533            // empty, fall back to whatever `result` already is.
534            if let Some(reuse) = stash.pop() {
535                result = reuse;
536            }
537        }
538    }
539
540    // Drain remaining: copy partial head through `merge_from`'s 1-input
541    // dispatch, then hand the rest of the chain's `PagedColumn`s straight to
542    // the sink without materializing.
543    drain_side(
544        &mut heads[0],
545        &mut positions[0],
546        list1,
547        &mut result,
548        &mut sink,
549        pager,
550        stash,
551    );
552    drain_side(
553        &mut heads[1],
554        &mut positions[1],
555        list2,
556        &mut result,
557        &mut sink,
558        pager,
559        stash,
560    );
561
562    if !result.is_empty() {
563        sink(pager.page(&mut result));
564    } else {
565        // Empty `result` may still carry a useful Typed allocation; recycle
566        // so subsequent calls (next `merge_by`, the seal `extract_chain`)
567        // can pick it up.
568        recycle_capped(result, stash);
569    }
570    // Recycle the now-exhausted (or default) head slots too — for `Resident`
571    // heads that finished naturally, this preserves their Typed allocation
572    // for the next call.
573    let [h0, h1] = heads;
574    recycle_capped(h0, stash);
575    recycle_capped(h1, stash);
576}
577
578/// Helper for `merge_chains`'s drain phase: copy a partially-consumed head
579/// into `result` (via 1-input `merge_from`), ship `result` if non-empty, then
580/// pass the remaining queued `PagedColumn`s straight through.
581fn drain_side<D, T, R, Sink>(
582    head: &mut Column<(D, T, R)>,
583    pos: &mut usize,
584    rest: FetchIter<'_, D, T, R>,
585    result: &mut Column<(D, T, R)>,
586    sink: &mut Sink,
587    pager: &ColumnPager,
588    stash: &mut Vec<Column<(D, T, R)>>,
589) where
590    D: Columnar,
591    for<'a> columnar::Ref<'a, D>: Copy + Ord,
592    T: Columnar + Default + Clone + PartialOrder,
593    for<'a> columnar::Ref<'a, T>: Copy + Ord,
594    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
595    Sink: FnMut(PagedColumn<(D, T, R)>),
596{
597    if *pos < head.borrow().len() {
598        // 1-input dispatch — bulk copy that runs to completion.
599        let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos));
600    }
601    if !result.is_empty() {
602        sink(pager.page(result));
603        if let Some(reuse) = stash.pop() {
604            *result = reuse;
605        }
606    }
607    for paged in rest.into_paged() {
608        sink(paged);
609    }
610}
611
612/// Streaming extract: walks `merged` chunk-by-chunk via `Column::extract`,
613/// routing each filled keep/ship chunk through its sink after pageing.
614/// Mirrors the per-chunk ship-threshold yield already inside
615/// `Column::extract`.
616///
617/// `stash` carries recycled `Column::Typed` buffers in and out so the
618/// per-chunk extract loop doesn't restart from zero capacity each time
619/// `keep_buf` / `ship_buf` ships and the source `buffer` is dropped.
620pub fn extract_chain<D, T, R, SinkShip, SinkKeep>(
621    merged: FetchIter<'_, D, T, R>,
622    upper: AntichainRef<T>,
623    frontier: &mut Antichain<T>,
624    mut ship: SinkShip,
625    mut keep: SinkKeep,
626    stash: &mut Vec<Column<(D, T, R)>>,
627) where
628    D: Columnar,
629    for<'a> columnar::Ref<'a, D>: Copy + Ord,
630    T: Columnar + Default + Clone + PartialOrder,
631    for<'a> columnar::Ref<'a, T>: Copy + Ord,
632    R: Columnar + Default + Semigroup + for<'a> Semigroup<columnar::Ref<'a, R>>,
633    SinkShip: FnMut(PagedColumn<(D, T, R)>),
634    SinkKeep: FnMut(PagedColumn<(D, T, R)>),
635{
636    let pager = merged.pager();
637    let mut keep_buf: Column<(D, T, R)> = empty_chunk(stash);
638    let mut ship_buf: Column<(D, T, R)> = empty_chunk(stash);
639
640    for mut buffer in merged {
641        let mut position = 0;
642        let len = buffer.borrow().len();
643        while position < len {
644            buffer.extract(&mut position, upper, frontier, &mut keep_buf, &mut ship_buf);
645            if keep_buf.at_capacity() {
646                keep(pager.page(&mut keep_buf));
647                if let Some(reuse) = stash.pop() {
648                    keep_buf = reuse;
649                }
650            }
651            if ship_buf.at_capacity() {
652                ship(pager.page(&mut ship_buf));
653                if let Some(reuse) = stash.pop() {
654                    ship_buf = reuse;
655                }
656            }
657        }
658        // Buffer fully consumed; recycle whatever Typed allocation it had.
659        recycle_capped(buffer, stash);
660    }
661    if !keep_buf.is_empty() {
662        keep(pager.page(&mut keep_buf));
663    } else {
664        recycle_capped(keep_buf, stash);
665    }
666    if !ship_buf.is_empty() {
667        ship(pager.page(&mut ship_buf));
668    } else {
669        recycle_capped(ship_buf, stash);
670    }
671}
672
673#[cfg(test)]
674#[allow(clippy::clone_on_ref_ptr)]
675mod tests {
676    use std::sync::Arc;
677
678    use columnar::Index;
679    use timely::container::PushInto as _;
680
681    use super::*;
682    use crate::column_pager::{PageDecision, PageEvent, PageHint, PagingPolicy};
683
684    // ----- helpers -----------------------------------------------------------
685
686    type KvUpdate = ((u64, u64), u64, i64);
687
688    fn col(rows: &[KvUpdate]) -> Column<KvUpdate> {
689        let mut c: Column<KvUpdate> = Default::default();
690        for &t in rows {
691            c.push_into(t);
692        }
693        c
694    }
695
696    fn collect_pc(chunks: &[PagedColumn<KvUpdate>], pager: &ColumnPager) -> Vec<KvUpdate> {
697        // `collect_pc` peeks via materialization on a side path so the test's
698        // assertions don't consume the chain.
699        chunks
700            .iter()
701            .flat_map(|p| {
702                let view: Column<KvUpdate> = match p {
703                    PagedColumn::Resident(c, _) => clone_column(c),
704                    _ => pager.take(clone_paged(p)),
705                };
706                collect_column(&view).into_iter()
707            })
708            .collect()
709    }
710
711    fn collect_column(c: &Column<KvUpdate>) -> Vec<KvUpdate> {
712        c.borrow()
713            .into_index_iter()
714            .map(|((k, v), t, r)| {
715                (
716                    (u64::into_owned(k), u64::into_owned(v)),
717                    u64::into_owned(t),
718                    i64::into_owned(r),
719                )
720            })
721            .collect()
722    }
723
724    fn clone_column(c: &Column<KvUpdate>) -> Column<KvUpdate> {
725        // `Column` is `Clone` when `C::Container: Clone`, which is true for
726        // tuple-of-primitive containers. Used so test helpers can peek at a
727        // chain without consuming it.
728        c.clone()
729    }
730
731    /// Helper that bypasses `pager.take` for non-`Resident` variants by
732    /// taking and re-pageing. Only used in test inspection paths where the
733    /// extra round-trip is acceptable.
734    fn clone_paged(p: &PagedColumn<KvUpdate>) -> PagedColumn<KvUpdate> {
735        match p {
736            PagedColumn::Resident(c, _) => {
737                // Wrap via a disabled pager so the ticket is fresh.
738                let mut c = c.clone();
739                ColumnPager::disabled().page(&mut c)
740            }
741            // For paged/compressed variants we can't clone without
742            // re-reading; the tests below only inspect Resident chains.
743            _ => panic!("clone_paged only supports Resident"),
744        }
745    }
746
747    /// Always-page policy: bypasses any resident shortcut so we can assert
748    /// the chains remain in `Paged` form regardless of memory pressure.
749    struct ForcePagePolicy {
750        out: std::sync::atomic::AtomicUsize,
751        r#in: std::sync::atomic::AtomicUsize,
752    }
753    impl ForcePagePolicy {
754        fn new() -> Arc<Self> {
755            Arc::new(Self {
756                out: std::sync::atomic::AtomicUsize::new(0),
757                r#in: std::sync::atomic::AtomicUsize::new(0),
758            })
759        }
760    }
761    impl PagingPolicy for ForcePagePolicy {
762        fn decide(&self, _hint: PageHint) -> PageDecision {
763            PageDecision::Page {
764                backend: mz_ore::pager::Backend::Swap,
765                codec: None,
766            }
767        }
768        fn record(&self, event: PageEvent) {
769            use std::sync::atomic::Ordering;
770            match event {
771                PageEvent::PagedOut { .. } => {
772                    self.out.fetch_add(1, Ordering::Relaxed);
773                }
774                PageEvent::PagedIn { .. } => {
775                    self.r#in.fetch_add(1, Ordering::Relaxed);
776                }
777                _ => {}
778            }
779        }
780    }
781
782    /// Wrap a Vec<Column> as a paged chain for `FetchIter`.
783    fn to_chain(
784        cols: Vec<Column<KvUpdate>>,
785        pager: &ColumnPager,
786    ) -> VecDeque<PagedColumn<KvUpdate>> {
787        cols.into_iter().map(|mut c| pager.page(&mut c)).collect()
788    }
789
790    /// Drive `merge_chains` with a disabled pager and return owned tuples.
791    fn drive_merge(chain1: Vec<Column<KvUpdate>>, chain2: Vec<Column<KvUpdate>>) -> Vec<KvUpdate> {
792        let pager = ColumnPager::disabled();
793        let q1 = to_chain(chain1, &pager);
794        let q2 = to_chain(chain2, &pager);
795        let mut output: Vec<PagedColumn<KvUpdate>> = Vec::new();
796        let mut stash: Vec<Column<KvUpdate>> = Vec::new();
797        merge_chains(
798            FetchIter::new(q1, &pager),
799            FetchIter::new(q2, &pager),
800            |paged| output.push(paged),
801            &mut stash,
802        );
803        collect_pc(&output, &pager)
804    }
805
806    // ----- merge_chains correctness -----------------------------------------
807
808    /// Disjoint chains: same data as the legacy passthrough test. Without
809    /// passthrough, the merger runs per-record but should still produce the
810    /// fully ordered output.
811    #[mz_ore::test]
812    fn merge_chains_disjoint_ranges() {
813        let out = drive_merge(
814            vec![
815                col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]),
816                col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]),
817            ],
818            vec![
819                col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]),
820                col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]),
821            ],
822        );
823        let expected: Vec<_> = (0..4u64)
824            .map(|d| ((d, 0u64), 0u64, 1i64))
825            .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64)))
826            .collect();
827        assert_eq!(out, expected);
828    }
829
830    /// Interleaved chains: every record alternates between the two chains.
831    #[mz_ore::test]
832    fn merge_chains_interleaved() {
833        let out = drive_merge(
834            vec![
835                col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]),
836                col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]),
837            ],
838            vec![
839                col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]),
840                col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]),
841            ],
842        );
843        let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
844        assert_eq!(out, expected);
845    }
846
847    /// Equal-key consolidation across chunk boundaries: chain1's last record
848    /// shares `(d, t)` with chain2's first; sum of diffs should land on a
849    /// single output record.
850    #[mz_ore::test]
851    fn merge_chains_equal_boundary() {
852        let out = drive_merge(
853            vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])],
854            vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])],
855        );
856        assert_eq!(out, vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]);
857    }
858
859    /// Same merge, force-paged: chains stay in `Paged` form throughout, and
860    /// the consolidated result still matches.
861    #[mz_ore::test]
862    fn merge_chains_force_paged_round_trip() {
863        let policy = ForcePagePolicy::new();
864        let pager = ColumnPager::new(policy.clone());
865        let q1 = to_chain(vec![col(&[((0, 0), 0, 1), ((2, 0), 0, 1)])], &pager);
866        let q2 = to_chain(vec![col(&[((1, 0), 0, 1), ((3, 0), 0, 1)])], &pager);
867
868        // Confirm the chains started paged-out (not Resident).
869        assert!(matches!(q1.front().unwrap(), PagedColumn::Paged { .. }));
870        assert!(matches!(q2.front().unwrap(), PagedColumn::Paged { .. }));
871
872        let mut output: Vec<PagedColumn<KvUpdate>> = Vec::new();
873        let mut stash: Vec<Column<KvUpdate>> = Vec::new();
874        merge_chains(
875            FetchIter::new(q1, &pager),
876            FetchIter::new(q2, &pager),
877            |paged| output.push(paged),
878            &mut stash,
879        );
880
881        // Output entries should also have been routed through the pager.
882        for p in &output {
883            assert!(matches!(p, PagedColumn::Paged { .. }));
884        }
885
886        // Materialize the output and check correctness.
887        let mut collected = Vec::new();
888        for p in output {
889            let c = pager.take(p);
890            collected.extend(collect_column(&c));
891        }
892        let expected: Vec<_> = (0..4u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect();
893        assert_eq!(collected, expected);
894    }
895
896    // ----- extract_chain correctness ----------------------------------------
897
898    #[mz_ore::test]
899    fn extract_chain_partitions_by_frontier() {
900        let pager = ColumnPager::disabled();
901        let data = vec![
902            ((0, 0), 0u64, 1i64),
903            ((1, 0), 1, 1),
904            ((2, 0), 2, 1),
905            ((3, 0), 3, 1),
906        ];
907        let chain = to_chain(vec![col(&data)], &pager);
908        let upper = Antichain::from_elem(2u64);
909        let mut frontier: Antichain<u64> = Antichain::new();
910        let mut ship: Vec<PagedColumn<KvUpdate>> = Vec::new();
911        let mut keep: Vec<PagedColumn<KvUpdate>> = Vec::new();
912        let mut stash: Vec<Column<KvUpdate>> = Vec::new();
913
914        extract_chain(
915            FetchIter::new(chain, &pager),
916            upper.borrow(),
917            &mut frontier,
918            |p| ship.push(p),
919            |p| keep.push(p),
920            &mut stash,
921        );
922
923        let shipped = collect_pc(&ship, &pager);
924        let kept = collect_pc(&keep, &pager);
925        for (_, t, _) in &shipped {
926            assert!(*t < 2, "shipped time {t} should be < upper");
927        }
928        for (_, t, _) in &kept {
929            assert!(*t >= 2, "kept time {t} should be >= upper");
930        }
931        assert_eq!(shipped.len() + kept.len(), data.len());
932    }
933
934    // ----- ColumnMergeBatcher end-to-end ------------------------------------
935
936    #[mz_ore::test]
937    fn batcher_seal_round_trip() {
938        let mut b: ColumnMergeBatcher<(u64, u64), u64, i64> =
939            differential_dataflow::trace::Batcher::new(None, 0);
940        // Two pushes; second has an equal-key collision with the first.
941        // Inputs arrive pre-consolidated chunk-by-chunk, as from the upstream
942        // chunker.
943        let input1 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1), ((3, 0), 0, 1)]);
944        let input2 = col(&[((2, 0), 0, 2), ((4, 0), 0, 1)]);
945        b.push_into(input1);
946        b.push_into(input2);
947
948        // Seal everything (upper = ∞-ish, here just past any time we used).
949        let upper = Antichain::from_elem(u64::MAX);
950        let (chain, _description) = differential_dataflow::trace::Batcher::seal(&mut b, upper);
951        let out: Vec<KvUpdate> = chain.iter().flat_map(collect_column).collect();
952
953        // (2, 0)@0 was pushed with +1 then +2; sums to +3 after consolidation.
954        let mut expected = vec![
955            ((1u64, 1u64), 0u64, 1i64),
956            ((2, 0), 0, 3),
957            ((3, 0), 0, 1),
958            ((4, 0), 0, 1),
959        ];
960        expected.sort();
961        let mut out_sorted = out.clone();
962        out_sorted.sort();
963        assert_eq!(out_sorted, expected);
964    }
965
966    #[mz_ore::test]
967    fn account_chunk_resident_vs_paged() {
968        let policy = ForcePagePolicy::new();
969        let pager_paged = ColumnPager::new(policy.clone());
970        let pager_res = ColumnPager::disabled();
971
972        let mut c1 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1), ((3, 0), 0, 1)]);
973        let resident = pager_res.page(&mut c1);
974        let (records, size, capacity, allocations) = account_chunk(&resident);
975        assert_eq!(records, 3);
976        assert!(size > 0);
977        assert_eq!(size, capacity);
978        assert_eq!(allocations, 1);
979
980        let mut c2 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1)]);
981        let paged = pager_paged.page(&mut c2);
982        assert!(matches!(paged, PagedColumn::Paged { .. }));
983        // Paged variants contribute zero to memory accounting.
984        assert_eq!(account_chunk(&paged), (0, 0, 0, 0));
985    }
986
987    #[mz_ore::test]
988    fn batcher_seal_keeps_kept_chain_paged() {
989        // Force-page policy; verify that after seal, the kept chain in
990        // self.chains contains only Paged entries (no Resident).
991        let policy = ForcePagePolicy::new();
992        let pager = ColumnPager::new(policy.clone());
993
994        let mut b: ColumnMergeBatcher<(u64, u64), u64, i64> =
995            differential_dataflow::trace::Batcher::new(None, 0);
996        b.set_pager(pager);
997
998        // Push records straddling an upper of 5 — half should be kept, half
999        // shipped. Use enough records to fill at least one chunk.
1000        let n: u64 = 200;
1001        for i in 0..n {
1002            let input = col(&[((i, 0), i % 10, 1)]);
1003            b.push_into(input);
1004        }
1005        let upper = Antichain::from_elem(5u64);
1006        let _ = differential_dataflow::trace::Batcher::seal(&mut b, upper);
1007
1008        // Anything kept (times >= 5) should be sitting in b.chains as paged.
1009        let kept_records: usize = b
1010            .chains
1011            .iter()
1012            .flat_map(|c| c.iter())
1013            .map(|p| match p {
1014                PagedColumn::Paged { meta, .. } => {
1015                    // Records aren't directly available here; sanity-check
1016                    // that no Resident snuck in.
1017                    let _ = meta;
1018                    1
1019                }
1020                PagedColumn::Compressed { meta, .. } => {
1021                    let _ = meta;
1022                    1
1023                }
1024                PagedColumn::Resident(_, _) => {
1025                    panic!("kept chain entry was Resident under ForcePagePolicy");
1026                }
1027            })
1028            .sum();
1029        // We expect *some* kept entries (times in [5..10) loop slot).
1030        assert!(kept_records > 0, "expected at least one kept paged entry");
1031        assert!(policy.out.load(std::sync::atomic::Ordering::Relaxed) > 0);
1032        let _ = n;
1033    }
1034}