Skip to main content

mz_compute/sink/
correction_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//!  * insert new updates
15//!  * advance the compaction frontier (called `since`)
16//!  * obtain an iterator over consolidated updates before some `upper`
17//!  * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of `Chain`s containing `Chunk`s of stashed updates. Each
32//! `Chunk` is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! Chains live in three places:
36//!
37//!  * A [`BucketChain`] partitions times at or beyond the `boundary` (the largest read `upper`
38//!    seen so far) into buckets of exponentially growing time ranges, each holding a list of
39//!    chains. Reads only touch the buckets below their `upper`, so the bulk of the buffered
40//!    updates — in particular far-future retractions produced by temporal filters — is left
41//!    alone.
42//!  * `pending_low` holds chains at times below the `boundary`, mostly insertions arriving
43//!    through the persist feedback.
44//!  * `emitted` is a single chain holding the updates returned by the last read. Updates must
45//!    stay in the buffer until their feedback retractions arrive, and keeping them separate from
46//!    the bucket chain means reads never have to re-merge future updates.
47//!
48//! ```text
49//!       chain[0]   |   chain[1]   |   chain[2]
50//!                  |              |
51//!     chunk[0]     | chunk[0]     | chunk[0]
52//!       (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
53//!       (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
54//!     chunk[1]     | chunk[1]     |
55//!       (c, 1, +1) |   (c, 2, -2) |
56//!       (a, 2, -1) |   (c, 4, -1) |
57//!     chunk[2]     |              |
58//!       (b, 2, +1) |              |
59//!       (c, 2, +1) |              |
60//!     chunk[3]     |              |
61//!       (b, 3, -1) |              |
62//!       (c, 3, +1) |              |
63//! ```
64//!
65//! The "chain invariant" states that each chain in a bucket has at least `chain_proportionality` times as
66//! many updates as the next one. This means that chain sizes will often be powers of
67//! `chain_proportionality`, but they don't have to be. For example, for a proportionality of 2,
68//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
69//!
70//! Note that the invariant is maintained on update counts, not chunk counts. Chunks are
71//! byte-bounded (see `ChunkBuilder`), so chunk count is not proportional to update count and
72//! would be a poor proxy: any chain below the chunk byte boundary is a single chunk regardless
73//! of how many updates it holds, which would let the geometric invariant collapse and break the
74//! O(log N) amortization of inserts.
75//!
76//! Choosing the `chain_proportionality` value allows tuning the trade-off between memory and CPU
77//! resources required to maintain corrections. A higher proportionality forces more frequent chain
78//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
79//!
80//! ## Inserting Updates
81//!
82//! A batch of updates is routed by time: updates below the `boundary` become a `pending_low`
83//! chain, the rest is appended as new chains to their respective buckets. Appending to a bucket
84//! merges chains until the chain invariant is restored.
85//!
86//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
87//! chunk, copying the update in, and then likely merging with an existing chain to restore the
88//! chain invariant. If updates trickle in in small batches, this can cause a considerable
89//! overhead. To amortize this overhead, new updates aren't immediately inserted into the sorted
90//! chains but instead stored in a `Stage` buffer. Once enough updates have been staged to fill a
91//! `Chunk`, they are sorted and routed.
92//!
93//! The insert operation has an amortized complexity of O(log N), with N being the current number
94//! of updates stored.
95//!
96//! ## Retrieving Consolidated Updates
97//!
98//! Retrieving consolidated updates before a given `upper` works by peeling all buckets below the
99//! `upper` off the bucket chain, splitting their chains, the pending low chains, and the previous
100//! `emitted` chain at the `upper`, merging the parts below the `upper` into the new `emitted`
101//! chain, and returning an iterator over that chain.
102//!
103//! Because each chain contains updates ordered by time first, splitting a chain at the `upper`
104//! reuses whole chunks and copies at most one chunk straddling the split point. Updates at times
105//! at or beyond the `upper` are never touched, no matter how many the buffer holds. The
106//! complexity of a read is O(U log K), with U being the number of updates before `upper` and K
107//! the number of chains containing them.
108//!
109//! ## Merging Chains
110//!
111//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
112//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
113//! complexity of a merge of K chains containing N updates is O(N log K).
114//!
115//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
116//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
117//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
118//!
119//! For example, consider these two chains, assuming `since = [2]`:
120//!   chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
121//!   chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
122//! After time advancement, the chains look like this:
123//!   chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
124//!   chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
125//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
126//! neither sorted nor consolidated.
127//!
128//! Times below the `since` can only exist in chains read by `consolidate_before`, and only if
129//! the `since` advanced past buffered times since the previous read. For few distinct stale
130//! times — the steady state, where the previously emitted chain was written just before the
131//! since advanced past it — we merge sub-chains, one for each distinct time that's before or at
132//! the `since`. Each of these sub-chains retains the (time, data) ordering after the time
133//! advancement to `since`, so merging those yields the expected result.
134//!
135//! For the above example, the chains we would merge are:
136//!   chain 1.a: [(c, 2, +1)]
137//!   chain 1.b: [(b, 2, -1), (a, 3, -1)]
138//!   chain 2.a: [(b, 2, +1)],
139//!   chain 2.b: [(a, 2, +1), (c, 2, -1)]
140//!
141//! For many distinct stale times — e.g. a since jump across many buffered timestamps when a sink
142//! restarts with an old as-of — the number of sub-chains grows with the number of distinct times,
143//! so we instead materialize the affected updates, advance their times, and sort and consolidate
144//! them in one O(U log U) pass.
145
146use std::cmp::Ordering;
147use std::collections::{BinaryHeap, VecDeque};
148use std::fmt;
149use std::rc::Rc;
150use std::sync::{Mutex, OnceLock};
151
152use columnar::{Columnar, Index, Len, Ref};
153use mz_ore::cast::CastLossy;
154use mz_ore::soft_assert_or_log;
155use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
156use mz_repr::{Diff, Timestamp};
157use mz_timely_util::column_pager::{self, PagedColumn};
158use mz_timely_util::columnar::Column;
159use mz_timely_util::temporal::{Bucket, BucketChain};
160use timely::PartialOrder;
161use timely::dataflow::channels::ContainerBytes;
162use timely::progress::Antichain;
163
164use crate::sink::correction::{ChannelLogging, SizeMetrics};
165
166/// Convenient alias for use in data trait bounds.
167///
168/// `D` is constrained to be `Columnar`, so that updates can be stored in a single columnar
169/// region per chunk, and the variable-length payload (e.g. `Row` bytes) lives in the same
170/// allocation as the rest of the chunk. The `Ref`-level `Eq + Ord` bounds let the merge/heap
171/// code compare updates directly through the columnar borrow, avoiding `into_owned` clones
172/// on the hot path.
173pub trait Data:
174    differential_dataflow::Data
175    + Columnar<Container: Send + Sync + Clone + for<'a> columnar::Borrow<Ref<'a>: Eq + Ord>>
176    + Send
177    + Sync
178{
179}
180impl<D> Data for D where
181    D: differential_dataflow::Data
182        + Columnar<Container: Send + Sync + Clone + for<'a> columnar::Borrow<Ref<'a>: Eq + Ord>>
183        + Send
184        + Sync
185{
186}
187
188/// A data structure used to store corrections in the MV sink implementation.
189///
190/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
191/// allowing their memory to be transparently spilled to disk.
192#[derive(Debug)]
193pub struct CorrectionV2<D: Data> {
194    /// Bucketed storage for updates at times at or beyond `boundary`.
195    ///
196    /// Buckets cover exponentially growing time ranges, so reads only touch the buckets below
197    /// their `upper`, and far-future updates (e.g. retractions produced by temporal filters) are
198    /// rarely touched.
199    chain: BucketChain<ChainBucket<D>>,
200    /// Chains at times below `boundary` that were not yet emitted.
201    ///
202    /// Filled by inserts at times below the boundary (mostly persist feedback) and by the
203    /// remainders of `emitted` when a read uses a smaller `upper` than the previous one. Merged
204    /// into `emitted` by the next read.
205    pending_low: Vec<Chain<D>>,
206    /// Updates that were emitted by `updates_before` but not yet cancelled by persist feedback.
207    ///
208    /// Sorted and consolidated, with all times advanced to the `since`.
209    emitted: Chain<D>,
210    /// A staging area for updates, to speed up small inserts.
211    stage: Stage<D>,
212    /// The lower bound of times stored in `chain`. Only ever advances.
213    ///
214    /// Times below the boundary have been peeled off the bucket chain and can only be stored in
215    /// `pending_low` or `emitted`.
216    boundary: Antichain<Timestamp>,
217    /// The frontier by which all contained times are advanced.
218    since: Antichain<Timestamp>,
219
220    /// Total count of updates in the correction buffer.
221    ///
222    /// Tracked to compute deltas in `update_metrics`.
223    prev_update_count: usize,
224    /// Total heap size used by the correction buffer.
225    ///
226    /// Tracked to compute deltas in `update_metrics`.
227    prev_size: SizeMetrics,
228    /// Global persist sink metrics.
229    metrics: SinkMetrics,
230    /// Per-worker persist sink metrics.
231    worker_metrics: SinkWorkerMetrics,
232    /// Introspection logging.
233    logging: Option<ChannelLogging>,
234}
235
236/// Fuel for restoring the bucket chain invariant after peeling.
237///
238/// Bounds the restoration work per buffer operation. The bucket chain remains functional when
239/// restoration is incomplete -- peeling and finding work on ill-formed chains, at the cost of
240/// more in-line splitting -- so leftover restoration is simply picked up by the next operation.
241///
242/// `restore` spends one unit of fuel per bucket split, and a single `peel` leaves at most
243/// `BucketTimestamp::DOMAIN` (64) buckets to re-split, so this budget completes restoration in one
244/// call for any realistic buffer. It is deliberately generous: the "incomplete restoration is
245/// picked up next op" path is a correctness safety net for pathological bucket counts, not a hot
246/// path we expect to exercise. Lower it if restoration ever needs to interleave with other work.
247const RESTORE_FUEL: i64 = 1_000_000;
248
249impl<D: Data> CorrectionV2<D> {
250    /// Construct a new [`CorrectionV2`] instance.
251    pub fn new(
252        metrics: SinkMetrics,
253        worker_metrics: SinkWorkerMetrics,
254        logging: Option<ChannelLogging>,
255        chain_proportionality: f64,
256        chunk_size: usize,
257    ) -> Self {
258        let update_size = std::mem::size_of::<(D, Timestamp, Diff)>();
259        let chunk_capacity = std::cmp::max(chunk_size / update_size, 1);
260
261        Self {
262            chain: BucketChain::new(ChainBucket::new(chain_proportionality, logging.clone())),
263            pending_low: Vec::new(),
264            emitted: Chain::new(),
265            stage: Stage::new(logging.clone(), chunk_capacity),
266            boundary: Antichain::from_elem(Timestamp::MIN),
267            since: Antichain::from_elem(Timestamp::MIN),
268            prev_update_count: 0,
269            prev_size: Default::default(),
270            metrics,
271            worker_metrics,
272            logging,
273        }
274    }
275
276    /// Insert a batch of updates.
277    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
278        let Some(since_ts) = self.since.as_option() else {
279            // If the since is the empty frontier, discard all updates.
280            updates.clear();
281            return;
282        };
283
284        for (_, time, _) in &mut *updates {
285            *time = std::cmp::max(*time, *since_ts);
286        }
287
288        self.insert_inner(updates);
289    }
290
291    /// Insert a batch of updates, after negating their diffs.
292    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
293        let Some(since_ts) = self.since.as_option() else {
294            // If the since is the empty frontier, discard all updates.
295            updates.clear();
296            return;
297        };
298
299        for (_, time, diff) in &mut *updates {
300            *time = std::cmp::max(*time, *since_ts);
301            *diff = -*diff;
302        }
303
304        self.insert_inner(updates);
305    }
306
307    /// Insert a batch of updates into the stage, flushing it when full.
308    ///
309    /// All times are expected to be >= the `since`.
310    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
311        debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
312
313        if let Some(mut ready) = self.stage.insert(updates) {
314            self.route(&mut ready);
315        }
316
317        self.update_metrics();
318    }
319
320    /// Route a batch of sorted, consolidated updates to `pending_low` or their chain buckets.
321    fn route(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
322        // Updates at times below the boundary become a pending low chain.
323        let idx = updates.partition_point(|(_, t, _)| !self.boundary.less_equal(t));
324        if idx > 0 {
325            let mut builder = ChainBuilder::default();
326            builder.extend(updates.drain(..idx));
327            let chain = builder.finish();
328            if !chain.is_empty() {
329                self.log_chain_created(&chain);
330                self.pending_low.push(chain);
331            }
332        }
333
334        // Updates at times at or beyond the boundary go into their chain buckets. Walk ranges of
335        // times that fall into the same bucket, to push batches of updates at once.
336        let mut drain = updates.drain(..).peekable();
337        while let Some(update) = drain.next() {
338            let time = update.1;
339            let range = self
340                .chain
341                .range_of(&time)
342                .expect("bucket chain covers all times at or beyond the boundary");
343            let mut builder = ChainBuilder::default();
344            builder.extend(std::iter::once(update));
345            while let Some(update) = drain.next_if(|(_, t, _)| range.contains(t)) {
346                builder.extend(std::iter::once(update));
347            }
348            let bucket = self
349                .chain
350                .find_mut(&range.start)
351                .expect("bucket chain covers all times at or beyond the boundary");
352            bucket.push_chain(builder.finish());
353        }
354    }
355
356    /// Return consolidated updates before the given `upper`.
357    pub fn updates_before<'a>(
358        &'a mut self,
359        upper: &Antichain<Timestamp>,
360    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + Send + 'a {
361        // All contained times are advanced to at least the `since`, so a read at an `upper` that
362        // is not beyond the `since` is always empty. Short-circuit to avoid the eager peel, merge,
363        // and `boundary` advancement that `consolidate_before` would otherwise perform. Normal
364        // reads and `consolidate_at_since` always pass an `upper` beyond the `since`.
365        if !PartialOrder::less_than(&self.since, upper) {
366            return None.into_iter().flatten();
367        }
368
369        self.consolidate_before(upper);
370
371        // After `consolidate_before`, `emitted` holds exactly the updates before `upper`: every
372        // path that populates it splits at `upper` (pushing the remainder to `pending_low`), and
373        // the guard above guarantees `upper > since`, so advancing stale times to the `since`
374        // cannot lift them to or beyond `upper`. We can therefore yield all of `emitted`. Guard
375        // the invariant: a violation would write updates beyond the batch upper to persist.
376        soft_assert_or_log!(
377            self.emitted
378                .last()
379                .is_none_or(|(_, t, _)| !upper.less_equal(&t)),
380            "emitted contains times at or beyond the upper",
381        );
382        Some(self.emitted.iter()).into_iter().flatten()
383    }
384
385    /// Consolidate all updates before the given `upper` into the `emitted` chain.
386    ///
387    /// Once this method returns, `emitted` contains all updates at times before `upper`,
388    /// consolidated. It can also contain updates at times at or beyond `upper` if `upper` is not
389    /// beyond the `since`.
390    fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
391        if let Some(mut ready) = self.stage.flush() {
392            self.route(&mut ready);
393        }
394
395        let Some(&since_ts) = self.since.as_option() else {
396            // If the since is the empty frontier, discard all updates.
397            let peeled = self.chain.peel(Antichain::new().borrow());
398            for bucket in peeled {
399                for chain in bucket.into_chains() {
400                    self.log_chain_dropped(&chain);
401                }
402            }
403            for chain in std::mem::take(&mut self.pending_low) {
404                self.log_chain_dropped(&chain);
405            }
406            let emitted = std::mem::replace(&mut self.emitted, Chain::new());
407            if !emitted.is_empty() {
408                self.log_chain_dropped(&emitted);
409            }
410            self.update_metrics();
411            return;
412        };
413
414        // Peel the buckets below the upper off the bucket chain. Bucket splits during the peel
415        // only touch chunks around the upper; chunks wholly on either side are reused.
416        let peeled = self.chain.peel(upper.borrow());
417        if PartialOrder::less_than(&self.boundary, upper) {
418            self.boundary = upper.clone();
419        }
420
421        // Collect candidate chains: peeled bucket contents, pending low chains, and the previous
422        // emitted chain. All contain only times below the boundary.
423        let emitted = std::mem::replace(&mut self.emitted, Chain::new());
424        let mut candidates: Vec<Chain<D>> = Vec::new();
425        for bucket in peeled {
426            candidates.extend(bucket.into_chains());
427        }
428        candidates.append(&mut self.pending_low);
429        if !emitted.is_empty() {
430            candidates.push(emitted);
431        }
432
433        if candidates.is_empty() {
434            self.restore_chain();
435            self.update_metrics();
436            return;
437        }
438
439        candidates.iter().for_each(|c| self.log_chain_dropped(c));
440
441        // Split the candidates at the upper. Parts at or beyond the upper (possible when `upper`
442        // regresses below a previous one) stay pending.
443        let mut lowers = Vec::new();
444        for chain in candidates {
445            match upper.as_option() {
446                Some(&upper_ts) => {
447                    let (lower, remainder) = chain.split_at_time(upper_ts);
448                    if !lower.is_empty() {
449                        lowers.push(lower);
450                    }
451                    if !remainder.is_empty() {
452                        self.log_chain_created(&remainder);
453                        self.pending_low.push(remainder);
454                    }
455                }
456                // The empty upper is greater than all times.
457                None => lowers.push(chain),
458            }
459        }
460
461        // Merge the lower parts into the new emitted chain, advancing times below the since.
462        // Advancing times in a (time, data)-sorted chain can break its sort order, so chains
463        // containing stale times cannot be merged as they are. Stale times are expected in steady
464        // state: the previous emitted chain was written before the since advanced past it.
465        //
466        // Count the distinct stale times, up to a small cap. For few distinct stale times -- the
467        // steady state -- split cursors into runs that remain sorted under advancement and merge
468        // those. For many distinct stale times -- e.g. a since jump across many buffered
469        // timestamps when a sink restarts with an old as-of -- the number of runs and the cost of
470        // cloning cursor state per run grow with the number of distinct times, so materialize,
471        // advance, and consolidate in one O(U log U) pass instead.
472        const MAX_STALE_RUNS: usize = 32;
473        let mut stale_times = 0;
474        for chain in &lowers {
475            stale_times += chain.distinct_times_before(since_ts, MAX_STALE_RUNS - stale_times);
476            if stale_times >= MAX_STALE_RUNS {
477                break;
478            }
479        }
480
481        let merged = if stale_times == 0 {
482            let cursors: Vec<_> = lowers.into_iter().filter_map(Chain::into_cursor).collect();
483            merge_cursors(cursors)
484        } else if stale_times < MAX_STALE_RUNS {
485            let mut runs = Vec::new();
486            for chain in lowers {
487                if let Some(cursor) = chain.into_cursor() {
488                    runs.append(&mut cursor.advance_by(since_ts));
489                }
490            }
491            merge_cursors(runs)
492        } else {
493            let mut updates: Vec<_> = lowers.iter().flat_map(|c| c.iter()).collect();
494            for (_, time, _) in &mut updates {
495                *time = std::cmp::max(*time, since_ts);
496            }
497            consolidate(&mut updates);
498            let mut builder = ChainBuilder::default();
499            builder.extend(updates);
500            let chain = builder.finish();
501
502            // Advancement can move updates to or beyond the upper; such updates stay pending.
503            match upper.as_option() {
504                Some(&upper_ts) => {
505                    let (lower, remainder) = chain.split_at_time(upper_ts);
506                    if !remainder.is_empty() {
507                        self.log_chain_created(&remainder);
508                        self.pending_low.push(remainder);
509                    }
510                    lower
511                }
512                None => chain,
513            }
514        };
515
516        if !merged.is_empty() {
517            self.log_chain_created(&merged);
518        }
519        self.emitted = merged;
520
521        self.restore_chain();
522        self.update_metrics();
523    }
524
525    /// Perform a bounded amount of work towards restoring the bucket chain invariant.
526    ///
527    /// Restoration is allowed to remain incomplete: the bucket chain supports peeling and finding
528    /// on ill-formed chains, so any leftover work is picked up by subsequent operations. The fuel
529    /// bound keeps individual buffer operations from stalling the operator that owns the buffer.
530    fn restore_chain(&mut self) {
531        let mut fuel = RESTORE_FUEL;
532        self.chain.restore(&mut fuel);
533    }
534
535    /// Advance the since frontier.
536    ///
537    /// Time advancement of updates in the bucket chain is lazy: it happens when the updates are
538    /// consolidated by a read.
539    ///
540    /// # Panics
541    ///
542    /// Panics if the given `since` is less than the current since frontier.
543    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
544        assert!(PartialOrder::less_equal(&self.since, &since));
545        self.stage.advance_times(&since);
546        self.since = since;
547    }
548
549    /// Consolidate all updates at the current `since`.
550    pub fn consolidate_at_since(&mut self) {
551        let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
552        if let Some(upper_ts) = upper_ts {
553            let upper = Antichain::from_elem(upper_ts);
554            self.consolidate_before(&upper);
555        }
556    }
557
558    fn log_chain_created(&self, chain: &Chain<D>) {
559        if let Some(logging) = &self.logging {
560            logging.chain_created(chain.update_count);
561        }
562    }
563
564    fn log_chain_dropped(&self, chain: &Chain<D>) {
565        if let Some(logging) = &self.logging {
566            logging.chain_dropped(chain.update_count);
567        }
568    }
569
570    /// Update persist sink metrics.
571    fn update_metrics(&mut self) {
572        let mut new_size = self.stage.get_size();
573        let mut new_length = self.stage.data.len();
574        for chain in &self.pending_low {
575            new_size += chain.get_size();
576            new_length += chain.update_count;
577        }
578        new_size += self.emitted.get_size();
579        new_length += self.emitted.update_count;
580        for bucket in self.chain.buckets() {
581            for chain in &bucket.chains {
582                new_size += chain.get_size();
583                new_length += chain.update_count;
584            }
585        }
586
587        self.update_metrics_inner(new_size, new_length);
588    }
589
590    /// Update persist sink metrics to the given new size and length.
591    fn update_metrics_inner(&mut self, new_size: SizeMetrics, new_length: usize) {
592        let old_size = self.prev_size;
593        let old_length = self.prev_update_count;
594        let len_delta = UpdateDelta::new(new_length, old_length);
595        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
596        self.metrics
597            .report_correction_update_deltas(len_delta, cap_delta);
598        self.worker_metrics
599            .report_correction_update_totals(new_length, new_size.capacity);
600
601        if let Some(logging) = &self.logging {
602            let i = |x: usize| isize::try_from(x).expect("must fit");
603            logging.report_size_diff(i(new_size.size) - i(old_size.size));
604            logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
605            logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
606        }
607
608        self.prev_size = new_size;
609        self.prev_update_count = new_length;
610    }
611}
612
613/// Merge the given cursors into one chain.
614fn merge_cursors<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
615    match cursors.len() {
616        0 => Chain::new(),
617        1 => {
618            let [cur] = cursors.try_into().unwrap();
619            cur.into_chain()
620        }
621        2 => {
622            let [a, b] = cursors.try_into().unwrap();
623            merge_2(a, b)
624        }
625        _ => merge_many(cursors),
626    }
627}
628
629/// Merge the given two cursors using a 2-way merge.
630///
631/// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
632fn merge_2<D: Data>(cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
633    let mut rest1 = Some(cursor1);
634    let mut rest2 = Some(cursor2);
635    let mut merged = ChainBuilder::default();
636
637    loop {
638        match (rest1, rest2) {
639            (Some(c1), Some(c2)) => {
640                let (d1, t1, r1) = c1.get();
641                let (d2, t2, r2) = c2.get();
642
643                match (t1, d1).cmp(&(t2, d2)) {
644                    Ordering::Less => {
645                        merged.push_ref((d1, t1, r1));
646                        rest1 = c1.step();
647                        rest2 = Some(c2);
648                    }
649                    Ordering::Greater => {
650                        merged.push_ref((d2, t2, r2));
651                        rest1 = Some(c1);
652                        rest2 = c2.step();
653                    }
654                    Ordering::Equal => {
655                        let r = r1 + r2;
656                        if r != Diff::ZERO {
657                            merged.push_ref((d1, t1, r));
658                        }
659                        rest1 = c1.step();
660                        rest2 = c2.step();
661                    }
662                }
663            }
664            (Some(c), None) | (None, Some(c)) => {
665                merged.push_cursor(c);
666                break;
667            }
668            (None, None) => break,
669        }
670    }
671
672    merged.finish()
673}
674
675/// Merge the given cursors using a k-way merge with a binary heap.
676fn merge_many<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
677    let mut heap = MergeHeap::from_iter(cursors);
678    let mut merged = ChainBuilder::default();
679    while let Some(cursor1) = heap.pop() {
680        let (data, time, mut diff) = cursor1.get();
681
682        while let Some((cursor2, r)) = heap.pop_equal(data, time) {
683            diff += r;
684            if let Some(cursor2) = cursor2.step() {
685                heap.push(cursor2);
686            }
687        }
688
689        if diff != Diff::ZERO {
690            merged.push_ref((data, time, diff));
691        }
692        if let Some(cursor1) = cursor1.step() {
693            heap.push(cursor1);
694        }
695    }
696
697    merged.finish()
698}
699
700impl<D: Data> Drop for CorrectionV2<D> {
701    fn drop(&mut self) {
702        for bucket in self.chain.buckets() {
703            bucket.chains.iter().for_each(|c| self.log_chain_dropped(c));
704        }
705        self.pending_low
706            .iter()
707            .for_each(|c| self.log_chain_dropped(c));
708        if !self.emitted.is_empty() {
709            self.log_chain_dropped(&self.emitted);
710        }
711        self.update_metrics_inner(Default::default(), 0);
712    }
713}
714
715/// A bucket of `Chain`s, for use in a [`BucketChain`].
716///
717/// All chains are individually sorted by (time, data) and consolidated, but updates can appear in
718/// multiple chains, so consumers must merge the chains to obtain consolidated updates.
719struct ChainBucket<D: Data> {
720    /// The contained chains.
721    ///
722    /// Maintained with the chain invariant on pushes; splits can leave it violated until the next
723    /// push restores it.
724    chains: Vec<Chain<D>>,
725    /// The size factor of subsequent chains required by the chain invariant.
726    chain_proportionality: f64,
727    /// Introspection logging.
728    logging: Option<ChannelLogging>,
729}
730
731impl<D: Data> fmt::Debug for ChainBucket<D> {
732    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
733        f.debug_struct("ChainBucket")
734            .field("chains", &self.chains)
735            .finish_non_exhaustive()
736    }
737}
738
739impl<D: Data> ChainBucket<D> {
740    /// Construct a new, empty `ChainBucket`.
741    fn new(chain_proportionality: f64, logging: Option<ChannelLogging>) -> Self {
742        Self {
743            chains: Vec::new(),
744            chain_proportionality,
745            logging,
746        }
747    }
748
749    /// Push a chain onto the bucket, restoring the chain invariant.
750    fn push_chain(&mut self, chain: Chain<D>) {
751        if chain.is_empty() {
752            return;
753        }
754        if let Some(logging) = &self.logging {
755            logging.chain_created(chain.update_count);
756        }
757        self.chains.push(chain);
758
759        // Restore the chain invariant.
760        let prop = self.chain_proportionality;
761        let merge_needed = |chains: &[Chain<_>]| match chains {
762            [.., prev, last] => {
763                let last_len = f64::cast_lossy(last.update_count);
764                let prev_len = f64::cast_lossy(prev.update_count);
765                last_len * prop > prev_len
766            }
767            _ => false,
768        };
769
770        while merge_needed(&self.chains) {
771            let a = self.chains.pop().unwrap();
772            let b = self.chains.pop().unwrap();
773            if let Some(logging) = &self.logging {
774                logging.chain_dropped(a.update_count);
775                logging.chain_dropped(b.update_count);
776            }
777
778            let cursors = [a, b].into_iter().filter_map(Chain::into_cursor).collect();
779            let merged = merge_cursors(cursors);
780            if !merged.is_empty() {
781                if let Some(logging) = &self.logging {
782                    logging.chain_created(merged.update_count);
783                }
784                self.chains.push(merged);
785            }
786        }
787    }
788
789    /// Convert the bucket into its contained chains.
790    fn into_chains(self) -> Vec<Chain<D>> {
791        self.chains
792    }
793}
794
795impl<D: Data> Bucket for ChainBucket<D> {
796    type Timestamp = Timestamp;
797
798    fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
799        let mut lower = Self::new(self.chain_proportionality, self.logging.clone());
800        let mut upper = Self::new(self.chain_proportionality, self.logging.clone());
801
802        for chain in self.chains {
803            // Whole chunks are reused; at most one chunk straddling the timestamp is copied per
804            // chain. Account fuel at chunk granularity.
805            *fuel = fuel.saturating_sub(i64::try_from(chain.chunks.len()).expect("must fit"));
806
807            if let Some(logging) = &self.logging {
808                logging.chain_dropped(chain.update_count);
809            }
810            let (lo, hi) = chain.split_at_time(*timestamp);
811            for (part, target) in [(lo, &mut lower), (hi, &mut upper)] {
812                if !part.is_empty() {
813                    if let Some(logging) = &self.logging {
814                        logging.chain_created(part.update_count);
815                    }
816                    target.chains.push(part);
817                }
818            }
819        }
820
821        (lower, upper)
822    }
823}
824
825/// A chain of [`Chunk`]s containing updates.
826///
827/// All updates in a chain are sorted by (time, data) and consolidated.
828///
829/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
830/// keeping around empty chains.
831#[derive(Debug)]
832struct Chain<D: Data> {
833    /// The contained chunks.
834    chunks: Vec<Chunk<D>>,
835    /// The number of updates contained in all chunks.
836    update_count: usize,
837}
838
839impl<D: Data> Chain<D> {
840    /// Construct an empty chain.
841    fn new() -> Self {
842        Self {
843            chunks: Default::default(),
844            update_count: 0,
845        }
846    }
847
848    /// Return whether the chain is empty.
849    fn is_empty(&self) -> bool {
850        self.chunks.is_empty()
851    }
852
853    /// Push a chunk onto the chain.
854    ///
855    /// All updates in the chunk must sort after all updates already in the chain, in
856    /// (time, data)-order, to ensure the chain remains sorted.
857    fn push_chunk(&mut self, chunk: Chunk<D>) {
858        debug_assert!(self.can_accept_chunk(&chunk));
859
860        self.update_count += chunk.len();
861        self.chunks.push(chunk);
862    }
863
864    /// Return whether the chain can accept the given chunk at its end while preserving
865    /// (time, data)-order.
866    ///
867    /// Uses the cached boundary times and only materializes the boundary chunks when the times
868    /// tie (a single timestamp straddling the chunk boundary), so the common
869    /// strictly-increasing-time case checks the invariant without paging chunks in.
870    fn can_accept_chunk(&self, chunk: &Chunk<D>) -> bool {
871        match self.chunks.last() {
872            None => true,
873            Some(last) => match last.last_time().cmp(&chunk.first_time()) {
874                Ordering::Less => true,
875                Ordering::Greater => false,
876                Ordering::Equal => {
877                    let (dc, _, _) = last.last();
878                    let (d, _, _) = chunk.first();
879                    dc < d
880                }
881            },
882        }
883    }
884
885    /// Return the last update in the chain, if any.
886    fn last(&self) -> Option<Ref<'_, (D, Timestamp, Diff)>> {
887        self.chunks.last().map(|c| c.last())
888    }
889
890    /// Convert the chain into a cursor over the contained updates.
891    fn into_cursor(self) -> Option<Cursor<D>> {
892        let chunks = self.chunks.into_iter().map(Rc::new).collect();
893        Cursor::new(chunks)
894    }
895
896    /// Return an iterator over the contained updates.
897    fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
898        self.chunks.iter().flat_map(|c| {
899            (0..c.len()).map(move |i| {
900                let (d, t, r) = c.index(i);
901                (D::into_owned(d), t, r)
902            })
903        })
904    }
905
906    /// Count the distinct times of updates at times before `time`, up to the given cap.
907    ///
908    /// The scan uses one binary search per distinct time, so its cost is bounded by
909    /// O(cap log chunks).
910    fn distinct_times_before(&self, time: Timestamp, cap: usize) -> usize {
911        let mut count = 0;
912        let mut chunk_idx = 0;
913        let mut offset = 0;
914        while count < cap && chunk_idx < self.chunks.len() {
915            let chunk = &self.chunks[chunk_idx];
916            let current = chunk.index(offset).1;
917            if current >= time {
918                break;
919            }
920            count += 1;
921            // Skip to the first update at a time greater than `current`.
922            match chunk.find_time_greater_than(current) {
923                Some(idx) => offset = idx,
924                None => {
925                    // All later updates at `current` are in subsequent chunks.
926                    chunk_idx += 1;
927                    offset = 0;
928                    while chunk_idx < self.chunks.len() {
929                        match self.chunks[chunk_idx].find_time_greater_than(current) {
930                            Some(idx) => {
931                                offset = idx;
932                                break;
933                            }
934                            None => chunk_idx += 1,
935                        }
936                    }
937                }
938            }
939        }
940        count
941    }
942
943    /// Split the chain at the given time.
944    ///
945    /// Returns two chains, the first containing all updates at times < `time`, the second
946    /// containing all updates at times >= `time`. Chunks fully on either side of `time` are
947    /// reused; only a chunk straddling `time` is copied.
948    fn split_at_time(mut self, time: Timestamp) -> (Self, Self) {
949        let mut lower = Self::new();
950        let mut upper = Self::new();
951
952        let Some(skip_ts) = time.step_back() else {
953            // Nothing sorts before `time`.
954            return (lower, self);
955        };
956
957        for chunk in self.chunks.drain(..) {
958            // Route whole chunks by cached boundary times so a chunk that lands entirely on one
959            // side is moved without paging it in; only a straddling chunk is materialized.
960            if chunk.last_time() < time {
961                lower.push_chunk(chunk);
962            } else if chunk.first_time() >= time {
963                upper.push_chunk(chunk);
964            } else {
965                // The chunk straddles `time`; copy its two halves.
966                let idx = chunk
967                    .find_time_greater_than(skip_ts)
968                    .expect("straddles time");
969                let mut builder = ChainBuilder::default();
970                for i in 0..idx {
971                    builder.push_ref(chunk.index(i));
972                }
973                for part in builder.finish().chunks {
974                    lower.push_chunk(part);
975                }
976                let mut builder = ChainBuilder::default();
977                for i in idx..chunk.len() {
978                    builder.push_ref(chunk.index(i));
979                }
980                for part in builder.finish().chunks {
981                    upper.push_chunk(part);
982                }
983            }
984        }
985
986        (lower, upper)
987    }
988
989    /// Return the size of the chain, for use in metrics.
990    fn get_size(&self) -> SizeMetrics {
991        let mut metrics = SizeMetrics::default();
992        for chunk in &self.chunks {
993            metrics += chunk.get_size();
994        }
995        metrics
996    }
997}
998
999/// A builder that constructs a [`Chain`] from a stream of updates.
1000///
1001/// Wraps a [`ChunkBuilder`] and drains its minted chunks into a [`Chain`]. Pushed updates must
1002/// arrive in (time, data) sorted order.
1003struct ChainBuilder<D: Data> {
1004    builder: ChunkBuilder<D>,
1005    chain: Chain<D>,
1006}
1007
1008impl<D: Data> Default for ChainBuilder<D> {
1009    fn default() -> Self {
1010        Self {
1011            builder: Default::default(),
1012            chain: Chain::new(),
1013        }
1014    }
1015}
1016
1017impl<D: Data> ChainBuilder<D> {
1018    /// Push a reference-form update into the builder.
1019    fn push_ref(&mut self, update: Ref<'_, (D, Timestamp, Diff)>) {
1020        self.builder.push(update);
1021        self.drain();
1022    }
1023
1024    /// Push an owned-form update into the builder.
1025    fn push_owned(&mut self, update: &(D, Timestamp, Diff)) {
1026        self.builder.push(update);
1027        self.drain();
1028    }
1029
1030    /// Push the updates produced by a cursor into the builder.
1031    fn push_cursor(&mut self, cursor: Cursor<D>) {
1032        let mut rest = Some(cursor);
1033        while let Some(cursor) = rest.take() {
1034            let update = cursor.get();
1035            self.push_ref(update);
1036            rest = cursor.step();
1037        }
1038    }
1039
1040    /// Move any minted chunks from the builder into the chain.
1041    fn drain(&mut self) {
1042        while let Some(chunk) = self.builder.pop() {
1043            self.chain.push_chunk(chunk);
1044        }
1045    }
1046
1047    /// Finish building, returning the assembled [`Chain`].
1048    fn finish(self) -> Chain<D> {
1049        let Self { builder, mut chain } = self;
1050        for chunk in builder.finish() {
1051            if chunk.len() > 0 {
1052                chain.push_chunk(chunk);
1053            }
1054        }
1055        chain
1056    }
1057}
1058
1059impl<D: Data> Extend<(D, Timestamp, Diff)> for ChainBuilder<D> {
1060    fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
1061        for update in iter {
1062            self.push_owned(&update);
1063        }
1064    }
1065}
1066
1067/// A cursor over updates in a chain.
1068///
1069/// A cursor provides two guarantees:
1070///  * Produced updates are ordered and consolidated.
1071///  * A cursor always yields at least one update.
1072///
1073/// The second guarantee is enforced through the type system: Every method that steps a cursor
1074/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
1075/// over the last update.
1076///
1077/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
1078/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
1079/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
1080#[derive(Clone, Debug)]
1081struct Cursor<D: Data> {
1082    /// The chunks from which updates can still be produced.
1083    chunks: VecDeque<Rc<Chunk<D>>>,
1084    /// The current offset into `chunks.front()`.
1085    chunk_offset: usize,
1086    /// An optional limit for the number of updates the cursor will produce.
1087    limit: Option<usize>,
1088    /// An optional overwrite for the timestamp of produced updates.
1089    overwrite_ts: Option<Timestamp>,
1090}
1091
1092impl<D: Data> Cursor<D> {
1093    /// Construct a cursor over a list of chunks.
1094    ///
1095    /// Returns `None` if `chunks` is empty.
1096    fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
1097        if chunks.is_empty() {
1098            return None;
1099        }
1100
1101        Some(Self {
1102            chunks,
1103            chunk_offset: 0,
1104            limit: None,
1105            overwrite_ts: None,
1106        })
1107    }
1108
1109    /// Set a limit for the number of updates this cursor will produce.
1110    ///
1111    /// # Panics
1112    ///
1113    /// Panics if there is already a limit lower than the new one.
1114    fn set_limit(mut self, limit: usize) -> Option<Self> {
1115        assert!(self.limit.is_none_or(|l| l >= limit));
1116
1117        if limit == 0 {
1118            return None;
1119        }
1120
1121        // Release chunks made unreachable by the limit.
1122        let mut count = 0;
1123        let mut idx = 0;
1124        let mut offset = self.chunk_offset;
1125        while idx < self.chunks.len() && count < limit {
1126            let chunk = &self.chunks[idx];
1127            count += chunk.len() - offset;
1128            idx += 1;
1129            offset = 0;
1130        }
1131        self.chunks.truncate(idx);
1132
1133        if count > limit {
1134            self.limit = Some(limit);
1135        }
1136
1137        Some(self)
1138    }
1139
1140    /// Get a reference to the current update.
1141    fn get(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1142        let chunk = self.get_chunk();
1143        let (d, t, r) = chunk.index(self.chunk_offset);
1144        let t = self.overwrite_ts.unwrap_or(t);
1145        (d, t, r)
1146    }
1147
1148    /// Get a reference to the current chunk.
1149    fn get_chunk(&self) -> &Chunk<D> {
1150        &self.chunks[0]
1151    }
1152
1153    /// Step to the next update.
1154    ///
1155    /// Returns the stepped cursor, or `None` if the step was over the last update.
1156    fn step(mut self) -> Option<Self> {
1157        if self.chunk_offset == self.get_chunk().len() - 1 {
1158            return self.skip_chunk().map(|(c, _)| c);
1159        }
1160
1161        self.chunk_offset += 1;
1162
1163        if let Some(limit) = &mut self.limit {
1164            *limit -= 1;
1165            if *limit == 0 {
1166                return None;
1167            }
1168        }
1169
1170        Some(self)
1171    }
1172
1173    /// Skip the remainder of the current chunk.
1174    ///
1175    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
1176    /// left after the skip.
1177    fn skip_chunk(mut self) -> Option<(Self, usize)> {
1178        let chunk = self.chunks.pop_front().expect("cursor invariant");
1179
1180        if self.chunks.is_empty() {
1181            return None;
1182        }
1183
1184        let skipped = chunk.len() - self.chunk_offset;
1185        self.chunk_offset = 0;
1186
1187        if let Some(limit) = &mut self.limit {
1188            if skipped >= *limit {
1189                return None;
1190            }
1191            *limit -= skipped;
1192        }
1193
1194        Some((self, skipped))
1195    }
1196
1197    /// Skip all updates with times <= the given time.
1198    ///
1199    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
1200    /// left after the skip.
1201    fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
1202        if self.overwrite_ts.is_some_and(|ts| ts <= time) {
1203            return None;
1204        } else if self.get().1 > time {
1205            return Some((self, 0));
1206        }
1207
1208        let mut skipped = 0;
1209
1210        let new_offset = loop {
1211            let chunk = self.get_chunk();
1212            if let Some(index) = chunk.find_time_greater_than(time) {
1213                break index;
1214            }
1215
1216            let (cursor, count) = self.skip_chunk()?;
1217            self = cursor;
1218            skipped += count;
1219        };
1220
1221        skipped += new_offset - self.chunk_offset;
1222        self.chunk_offset = new_offset;
1223
1224        Some((self, skipped))
1225    }
1226
1227    /// Advance all updates in this cursor by the given `since_ts`.
1228    ///
1229    /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
1230    /// been advanced by `since_ts`.
1231    fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
1232        // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
1233        // only need to advance the `overwrite_ts` by the `since_ts`.
1234        if let Some(ts) = self.overwrite_ts {
1235            if ts < since_ts {
1236                self.overwrite_ts = Some(since_ts);
1237            }
1238            return vec![self];
1239        }
1240
1241        // Otherwise we need to split the cursor so that each new cursor only yields runs of
1242        // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
1243        // this by splitting the cursor at each time <= `since_ts`.
1244        let mut splits = Vec::new();
1245        let mut remaining = Some(self);
1246
1247        while let Some(cursor) = remaining.take() {
1248            let (_, time, _) = cursor.get();
1249            if time >= since_ts {
1250                splits.push(cursor);
1251                break;
1252            }
1253
1254            let mut current = cursor.clone();
1255            if let Some((cursor, skipped)) = cursor.skip_time(time) {
1256                remaining = Some(cursor);
1257                current = current.set_limit(skipped).expect("skipped at least 1");
1258            }
1259            current.overwrite_ts = Some(since_ts);
1260            splits.push(current);
1261        }
1262
1263        splits
1264    }
1265
1266    /// Drain the cursor into a [`Chain`].
1267    ///
1268    /// This reuses the underlying chunks if possible, and writes new ones otherwise.
1269    fn into_chain(self) -> Chain<D> {
1270        match self.try_unwrap() {
1271            Ok(chain) => chain,
1272            Err((_, cursor)) => {
1273                let mut builder = ChainBuilder::default();
1274                builder.push_cursor(cursor);
1275                builder.finish()
1276            }
1277        }
1278    }
1279
1280    /// Attempt to unwrap the cursor into a [`Chain`].
1281    ///
1282    /// This operation efficiently reuses chunks by directly inserting them into the output chain
1283    /// where possible.
1284    ///
1285    /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
1286    /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
1287    /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
1288    /// chain by copying chunks rather than reusing them.
1289    fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
1290        if self.limit.is_some() {
1291            return Err(("cursor with limit", self));
1292        }
1293        if self.overwrite_ts.is_some() {
1294            return Err(("cursor with overwrite_ts", self));
1295        }
1296        if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
1297            return Err(("cursor on shared chunks", self));
1298        }
1299
1300        let mut builder = ChainBuilder::default();
1301        let mut remaining = Some(self);
1302
1303        // We might be partway through the first chunk, in which case we can't reuse it but need to
1304        // allocate a new one to contain only the updates the cursor can still yield.
1305        while let Some(cursor) = remaining.take() {
1306            if cursor.chunk_offset == 0 {
1307                remaining = Some(cursor);
1308                break;
1309            }
1310            let update = cursor.get();
1311            builder.push_ref(update);
1312            remaining = cursor.step();
1313        }
1314
1315        let mut chain = builder.finish();
1316        if let Some(cursor) = remaining {
1317            for chunk in cursor.chunks {
1318                let chunk = Rc::into_inner(chunk).expect("checked above");
1319                chain.push_chunk(chunk);
1320            }
1321        }
1322
1323        Ok(chain)
1324    }
1325}
1326
1327/// A non-empty chunk of updates, backed by a columnar region.
1328///
1329/// All updates in a chunk are sorted by (time, data) and consolidated.
1330///
1331/// Chunks are immutable once created. They are produced by [`ChunkBuilder`], which mints a
1332/// new chunk whenever its in-progress columnar container reaches a fixed serialized byte
1333/// boundary (~2 MiB, matching the ship granularity used elsewhere in the codebase), so each
1334/// chunk corresponds to a single, predictably sized allocation.
1335struct Chunk<D: Data> {
1336    /// The paged-out form, taken on first materialization.
1337    ///
1338    /// A `Mutex` (not `RefCell`) keeps the chunk `Sync`: cursors hold chunks behind a shared
1339    /// `Rc`, and the iterator returned by [`CorrectionV2::updates_before`] borrows them across
1340    /// the persist writer's `await`, so `&Chunk` must be `Send`. The lock is taken once, at
1341    /// materialization, and is otherwise uncontended (the sink runs single-threaded per worker).
1342    paged: Mutex<Option<PagedColumn<(D, Timestamp, Diff)>>>,
1343    /// The materialized form, populated lazily by [`Chunk::column`] on first access.
1344    ///
1345    /// An `OnceLock` (not `OnceCell`) for the same `Sync` reason. Once set the slot is never
1346    /// cleared, so its address is stable and [`Chunk::index`] can hand out `Ref<'_>` borrows tied
1347    /// to `&self`. The allocation is freed when the chunk drops, which bounds resident memory to
1348    /// the chunks under an active merge front.
1349    resident: OnceLock<Column<(D, Timestamp, Diff)>>,
1350    /// Number of updates, cached so `len` and chain bookkeeping never page the chunk in.
1351    len: usize,
1352    /// Time of the first update, cached so boundary checks (`split_at_time`, `can_accept`) route
1353    /// a resting chunk without materializing it.
1354    first_time: Timestamp,
1355    /// Time of the last update, cached likewise.
1356    last_time: Timestamp,
1357}
1358
1359impl<D: Data> fmt::Debug for Chunk<D> {
1360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1361        write!(f, "Chunk(<{}>)", self.len())
1362    }
1363}
1364
1365impl<D: Data> Chunk<D> {
1366    /// Page the given non-empty column out into a chunk.
1367    ///
1368    /// Reads the cached metadata (length, boundary times) while the column is still resident, then
1369    /// hands it to the global column pager. The policy decides whether it actually spills; either
1370    /// way the chunk is born paged and materializes lazily on first read.
1371    ///
1372    /// # Panics
1373    ///
1374    /// Panics if the column is empty. Chunks are non-empty by construction; [`ChunkBuilder`] only
1375    /// ever builds a chunk from a populated column.
1376    fn from_column(mut data: Column<(D, Timestamp, Diff)>) -> Self {
1377        let (len, first_time, last_time) = {
1378            let borrowed = data.borrow();
1379            let len = borrowed.len();
1380            assert!(len > 0, "chunks are non-empty");
1381            (len, borrowed.get(0).1, borrowed.get(len - 1).1)
1382        };
1383
1384        let paged = column_pager::global_pager().page(&mut data);
1385        Self {
1386            paged: Mutex::new(Some(paged)),
1387            resident: OnceLock::new(),
1388            len,
1389            first_time,
1390            last_time,
1391        }
1392    }
1393
1394    /// Materialize the chunk's column, paging it in on first access.
1395    ///
1396    /// The returned reference is valid for as long as `&self`: the `OnceLock` slot is never
1397    /// cleared once populated, so its contents have a stable address.
1398    fn column(&self) -> &Column<(D, Timestamp, Diff)> {
1399        self.resident.get_or_init(|| {
1400            let paged = self
1401                .paged
1402                .lock()
1403                .expect("pager mutex poisoned")
1404                .take()
1405                .expect("paged form present until materialized");
1406            column_pager::global_pager().take(paged)
1407        })
1408    }
1409
1410    /// Return the number of updates in the chunk.
1411    fn len(&self) -> usize {
1412        self.len
1413    }
1414
1415    /// Return the update at the given index, paging the chunk in if necessary.
1416    ///
1417    /// # Panics
1418    ///
1419    /// Panics if the given index is not populated.
1420    fn index(&self, idx: usize) -> Ref<'_, (D, Timestamp, Diff)> {
1421        self.column().borrow().get(idx)
1422    }
1423
1424    /// Return the first update in the chunk, paging the chunk in if necessary.
1425    fn first(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1426        self.index(0)
1427    }
1428
1429    /// Return the last update in the chunk, paging the chunk in if necessary.
1430    fn last(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1431        self.index(self.len - 1)
1432    }
1433
1434    /// Return the time of the first update, without materializing the chunk.
1435    fn first_time(&self) -> Timestamp {
1436        self.first_time
1437    }
1438
1439    /// Return the time of the last update, without materializing the chunk.
1440    fn last_time(&self) -> Timestamp {
1441        self.last_time
1442    }
1443
1444    /// Return the index of the first update at a time greater than `time`, or `None` if no such
1445    /// update exists.
1446    ///
1447    /// The early-out uses the cached last time, so a chunk whose updates are all at or before
1448    /// `time` is skipped without paging it in.
1449    fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
1450        if self.last_time <= time {
1451            return None;
1452        }
1453
1454        let mut lower = 0;
1455        let mut upper = self.len;
1456        while lower < upper {
1457            let idx = (lower + upper) / 2;
1458            if self.index(idx).1 > time {
1459                upper = idx;
1460            } else {
1461                lower = idx + 1;
1462            }
1463        }
1464
1465        Some(lower)
1466    }
1467
1468    /// Return the size of the chunk, for use in metrics.
1469    ///
1470    /// Reports resident bytes only: a chunk still spilled (on swap or in a pager file) is not part
1471    /// of RSS and contributes nothing, matching the accounting in
1472    /// [`mz_timely_util::columnar::merge_batcher`].
1473    fn get_size(&self) -> SizeMetrics {
1474        let resident = |col: &Column<(D, Timestamp, Diff)>| {
1475            let bytes = col.length_in_bytes();
1476            SizeMetrics {
1477                size: bytes,
1478                capacity: bytes,
1479                allocations: 1,
1480            }
1481        };
1482
1483        if let Some(col) = self.resident.get() {
1484            return resident(col);
1485        }
1486        // Not yet materialized: a policy that kept the column resident still occupies RSS, so
1487        // account for it; a genuinely spilled column does not.
1488        match &*self.paged.lock().expect("pager mutex poisoned") {
1489            Some(PagedColumn::Resident(col, _)) => resident(col),
1490            _ => SizeMetrics::default(),
1491        }
1492    }
1493}
1494
1495/// Builder that produces a stream of fixed-size [`Chunk`]s.
1496///
1497/// Wraps [`mz_timely_util::columnar::builder::ColumnBuilder`], which mints a new
1498/// [`Column::Align`] chunk whenever its in-progress columnar container reaches a fixed
1499/// serialized byte boundary (~2 MiB, matching the ship granularity used elsewhere in the
1500/// codebase). Each minted chunk is therefore a single, predictably-sized aligned allocation.
1501struct ChunkBuilder<D: Data> {
1502    inner: mz_timely_util::columnar::builder::ColumnBuilder<(D, Timestamp, Diff)>,
1503}
1504
1505impl<D: Data> Default for ChunkBuilder<D> {
1506    fn default() -> Self {
1507        Self {
1508            inner: Default::default(),
1509        }
1510    }
1511}
1512
1513impl<D: Data> ChunkBuilder<D> {
1514    /// Push an update into the builder.
1515    ///
1516    /// Accepts whatever the inner [`ColumnBuilder`]'s [`PushInto`] impl accepts — both the
1517    /// `Ref<'_, (D, T, R)>` refs produced by cursors and `&(D, T, R)` references to owned
1518    /// tuples drained from the staging buffer.
1519    ///
1520    /// [`ColumnBuilder`]: mz_timely_util::columnar::builder::ColumnBuilder
1521    /// [`PushInto`]: timely::container::PushInto
1522    #[inline]
1523    fn push<T>(&mut self, item: T)
1524    where
1525        mz_timely_util::columnar::builder::ColumnBuilder<(D, Timestamp, Diff)>:
1526            timely::container::PushInto<T>,
1527    {
1528        timely::container::PushInto::push_into(&mut self.inner, item);
1529    }
1530
1531    /// Pop a finished chunk, if one is available.
1532    fn pop(&mut self) -> Option<Chunk<D>> {
1533        use timely::container::ContainerBuilder;
1534        // `ColumnBuilder::extract` stashes the popped chunk in its `finished` slot so the
1535        // caller can read it through `&mut`; move it out with `mem::take` so we own it
1536        // (leaves `Column::Typed(Default::default())` behind, which the next `extract`
1537        // overwrites).
1538        self.inner
1539            .extract()
1540            .map(|c| Chunk::from_column(std::mem::take(c)))
1541    }
1542
1543    /// Finalize the builder: flush any in-progress updates as a typed chunk and drain pending.
1544    fn finish(mut self) -> impl Iterator<Item = Chunk<D>> {
1545        use timely::container::ContainerBuilder;
1546        // `ColumnBuilder::finish` flushes the in-progress container into the pending queue
1547        // (as `Column::Typed`) and returns the first pending entry. Subsequent calls drain
1548        // the rest until `None`. Translate that into an owning iterator.
1549        //
1550        // `finish` can hand back an empty column (e.g. when the last shipped chunk landed exactly
1551        // on the boundary). Skip those: `Chunk::from_column` requires a non-empty column, and an
1552        // empty chunk would needlessly engage the pager.
1553        std::iter::from_fn(move || {
1554            loop {
1555                let col = std::mem::take(self.inner.finish()?);
1556                if col.borrow().len() > 0 {
1557                    return Some(Chunk::from_column(col));
1558                }
1559            }
1560        })
1561    }
1562}
1563
1564/// A buffer for staging updates before they are inserted into the sorted chains.
1565#[derive(Debug)]
1566struct Stage<D> {
1567    /// The contained updates.
1568    ///
1569    /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
1570    data: Vec<(D, Timestamp, Diff)>,
1571    /// Introspection logging.
1572    ///
1573    /// We want to report the number of records in the stage. To do so, we pretend that the stage
1574    /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
1575    /// re-created.
1576    logging: Option<ChannelLogging>,
1577}
1578
1579impl<D: Data> Stage<D> {
1580    fn new(logging: Option<ChannelLogging>, chunk_capacity: usize) -> Self {
1581        // For logging, we pretend the stage consists of a single chain.
1582        if let Some(logging) = &logging {
1583            logging.chain_created(0);
1584        }
1585
1586        Self {
1587            data: Vec::with_capacity(chunk_capacity),
1588            logging,
1589        }
1590    }
1591
1592    /// Insert a batch of updates, possibly producing a batch of sorted, consolidated updates
1593    /// ready to be stored.
1594    fn insert(
1595        &mut self,
1596        updates: &mut Vec<(D, Timestamp, Diff)>,
1597    ) -> Option<Vec<(D, Timestamp, Diff)>> {
1598        if updates.is_empty() {
1599            return None;
1600        }
1601
1602        let prev_length = self.ilen();
1603
1604        // Determine how many chunks we can fill with the available updates.
1605        let update_count = self.data.len() + updates.len();
1606        let chunk_capacity = self.data.capacity();
1607        let chunk_count = update_count / chunk_capacity;
1608
1609        let mut new_updates = updates.drain(..);
1610
1611        // If we have enough shipable updates, collect them and consolidate.
1612        let maybe_ready = if chunk_count > 0 {
1613            let ship_count = chunk_count * chunk_capacity;
1614            let mut buffer = Vec::with_capacity(ship_count);
1615
1616            buffer.append(&mut self.data);
1617            while buffer.len() < ship_count {
1618                let update = new_updates.next().unwrap();
1619                buffer.push(update);
1620            }
1621
1622            consolidate(&mut buffer);
1623
1624            Some(buffer)
1625        } else {
1626            None
1627        };
1628
1629        // Stage the remaining updates.
1630        Extend::extend(&mut self.data, new_updates);
1631
1632        self.log_length_diff(self.ilen() - prev_length);
1633
1634        maybe_ready
1635    }
1636
1637    /// Flush all currently staged updates, returning them sorted and consolidated.
1638    fn flush(&mut self) -> Option<Vec<(D, Timestamp, Diff)>> {
1639        self.log_length_diff(-self.ilen());
1640
1641        consolidate(&mut self.data);
1642
1643        if self.data.is_empty() {
1644            return None;
1645        }
1646
1647        let capacity = self.data.capacity();
1648        let data = std::mem::replace(&mut self.data, Vec::with_capacity(capacity));
1649        Some(data)
1650    }
1651
1652    /// Advance the times of staged updates by the given `since`.
1653    fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1654        let Some(since_ts) = since.as_option() else {
1655            // If the since is the empty frontier, discard all updates.
1656            self.log_length_diff(-self.ilen());
1657            self.data.clear();
1658            return;
1659        };
1660
1661        for (_, time, _) in &mut self.data {
1662            *time = std::cmp::max(*time, *since_ts);
1663        }
1664    }
1665
1666    /// Return the size of the stage, for use in metrics.
1667    ///
1668    /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1669    /// under-estimates. That's fine as the stage should always be small.
1670    fn get_size(&self) -> SizeMetrics {
1671        SizeMetrics {
1672            size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1673            capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1674            allocations: 1,
1675        }
1676    }
1677
1678    /// Return the number of updates in the stage, as an `isize`.
1679    fn ilen(&self) -> isize {
1680        self.data.len().try_into().expect("must fit")
1681    }
1682
1683    fn log_length_diff(&self, diff: isize) {
1684        let Some(logging) = &self.logging else { return };
1685        if diff > 0 {
1686            let count = usize::try_from(diff).expect("must fit");
1687            logging.chain_created(count);
1688            logging.chain_dropped(0);
1689        } else if diff < 0 {
1690            let count = usize::try_from(-diff).expect("must fit");
1691            logging.chain_created(0);
1692            logging.chain_dropped(count);
1693        }
1694    }
1695}
1696
1697impl<D> Drop for Stage<D> {
1698    fn drop(&mut self) {
1699        if let Some(logging) = &self.logging {
1700            logging.chain_dropped(self.data.len());
1701        }
1702    }
1703}
1704
1705/// Sort and consolidate the given list of updates.
1706///
1707/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1708/// except that it sorts updates by (time, data) instead of (data, time).
1709fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1710    if updates.len() <= 1 {
1711        return;
1712    }
1713
1714    let diff = |update: &(_, _, Diff)| update.2;
1715
1716    updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1717
1718    let mut offset = 0;
1719    let mut accum = diff(&updates[0]);
1720
1721    for idx in 1..updates.len() {
1722        let this = &updates[idx];
1723        let prev = &updates[idx - 1];
1724        if this.0 == prev.0 && this.1 == prev.1 {
1725            accum += diff(&updates[idx]);
1726        } else {
1727            if accum != Diff::ZERO {
1728                updates.swap(offset, idx - 1);
1729                updates[offset].2 = accum;
1730                offset += 1;
1731            }
1732            accum = diff(&updates[idx]);
1733        }
1734    }
1735
1736    if accum != Diff::ZERO {
1737        let len = updates.len();
1738        updates.swap(offset, len - 1);
1739        updates[offset].2 = accum;
1740        offset += 1;
1741    }
1742
1743    updates.truncate(offset);
1744}
1745
1746/// Compare two columnar refs that have unrelated input lifetimes.
1747///
1748/// `<D::Container as Borrow>::Ref<'a>` is an associated-type projection through a trait, so
1749/// the compiler treats it as invariant in `'a` and won't auto-shorten the inputs by variance.
1750/// We instead explicitly reborrow both to a fresh, local lifetime `'x` via
1751/// [`Columnar::reborrow`] before letting the inner `==` pick up the `for<'a> Ref<'a>: Eq`
1752/// bound on [`Data`].
1753#[inline]
1754fn refs_eq<D: Data>(a: Ref<'_, D>, b: Ref<'_, D>) -> bool {
1755    #[inline]
1756    fn eq<'x, D: Data>(a: Ref<'x, D>, b: Ref<'x, D>) -> bool {
1757        a == b
1758    }
1759    eq::<D>(D::reborrow(a), D::reborrow(b))
1760}
1761
1762/// A binary heap specialized for merging [`Cursor`]s.
1763struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1764
1765impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1766    fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1767        let inner = cursors.into_iter().map(MergeCursor).collect();
1768        Self(inner)
1769    }
1770}
1771
1772impl<D: Data> MergeHeap<D> {
1773    /// Pop the next cursor (the one yielding the least update) from the heap.
1774    fn pop(&mut self) -> Option<Cursor<D>> {
1775        self.0.pop().map(|MergeCursor(c)| c)
1776    }
1777
1778    /// Pop the next cursor from the heap, provided the data and time of its current update are
1779    /// equal to the given values.
1780    ///
1781    /// Returns both the cursor and the diff corresponding to `data` and `time`.
1782    fn pop_equal(&mut self, data: Ref<'_, D>, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1783        let r = {
1784            let MergeCursor(cursor) = self.0.peek()?;
1785            let (d, t, r) = cursor.get();
1786            if t != time || !refs_eq::<D>(d, data) {
1787                return None;
1788            }
1789            r
1790        };
1791        let cursor = self.pop().expect("checked above");
1792        Some((cursor, r))
1793    }
1794
1795    /// Push a cursor onto the heap.
1796    fn push(&mut self, cursor: Cursor<D>) {
1797        self.0.push(MergeCursor(cursor));
1798    }
1799}
1800
1801/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1802///
1803/// Implements the cursor ordering required for merging cursors.
1804struct MergeCursor<D: Data>(Cursor<D>);
1805
1806impl<D: Data> PartialEq for MergeCursor<D> {
1807    fn eq(&self, other: &Self) -> bool {
1808        self.cmp(other).is_eq()
1809    }
1810}
1811
1812impl<D: Data> Eq for MergeCursor<D> {}
1813
1814impl<D: Data> PartialOrd for MergeCursor<D> {
1815    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1816        Some(self.cmp(other))
1817    }
1818}
1819
1820impl<D: Data> Ord for MergeCursor<D> {
1821    fn cmp(&self, other: &Self) -> Ordering {
1822        let (d1, t1, _) = self.0.get();
1823        let (d2, t2, _) = other.0.get();
1824        (t1, d1).cmp(&(t2, d2)).reverse()
1825    }
1826}
1827
1828#[cfg(test)]
1829mod tests {
1830    use mz_ore::metrics::MetricsRegistry;
1831    use mz_persist_client::cfg::PersistConfig;
1832    use mz_persist_client::metrics::Metrics;
1833    use mz_repr::{Diff, Timestamp};
1834
1835    use super::*;
1836    use crate::sink::correction::CorrectionV1;
1837
1838    #[mz_ore::test]
1839    fn chain_builder_update_count_matches_items() {
1840        let mut builder = ChainBuilder::<i64>::default();
1841        for i in 0..10_u64 {
1842            let d = i64::try_from(i).expect("fits");
1843            builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
1844        }
1845        let chain = builder.finish();
1846        assert_eq!(chain.update_count, chain.iter().count());
1847    }
1848
1849    /// Push enough updates to cross at least one `mint()` boundary, forcing the
1850    /// `Align` encode -> `from_bytes` decode roundtrip (the spilling path this data
1851    /// structure exists to support), and assert `iter()` roundtrips values, order,
1852    /// and diffs across the spill boundary.
1853    #[mz_ore::test]
1854    #[cfg_attr(miri, ignore)] // too slow: crossing the ~2 MiB mint boundary needs ~200k updates
1855    fn chain_builder_roundtrips_across_mint_boundary() {
1856        // A single `mint()` fires near the ~2 MiB (`SHIP_WORDS`) serialized boundary. With
1857        // three 8-byte columns per update that's tens of thousands of updates; pushing 200k
1858        // comfortably forces multiple mints.
1859        let count = 200_000_u64;
1860
1861        let mut builder = ChainBuilder::<i64>::default();
1862        for i in 0..count {
1863            let d = i64::try_from(i).expect("fits");
1864            builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
1865        }
1866        let chain = builder.finish();
1867
1868        // Crossing the mint boundary must have produced more than one chunk; otherwise the spill
1869        // path (each minted chunk is paged out and read back through the pager) wouldn't be
1870        // exercised. The chunk payload itself is now behind the pager (see [`Chunk`]), so we
1871        // assert on chunk count rather than inspecting the column variant directly.
1872        assert!(
1873            chain.chunks.len() > 1,
1874            "expected multiple minted chunks, got {} chunk(s): {:?}",
1875            chain.chunks.len(),
1876            chain.chunks,
1877        );
1878
1879        // `iter()` must roundtrip every update, in order, with correct diffs.
1880        assert_eq!(chain.update_count, usize::try_from(count).expect("fits"));
1881        let mut expected = 0_u64;
1882        for (d, t, r) in chain.iter() {
1883            assert_eq!(d, i64::try_from(expected).expect("fits"));
1884            assert_eq!(t, Timestamp::new(expected));
1885            assert_eq!(r, Diff::ONE);
1886            expected += 1;
1887        }
1888        assert_eq!(expected, count);
1889    }
1890
1891    fn sink_metrics() -> SinkMetrics {
1892        let registry = MetricsRegistry::new();
1893        let metrics = Metrics::new(&PersistConfig::new_for_tests(), &registry);
1894        metrics.sink.clone()
1895    }
1896
1897    /// Run the same stepwise-drain workload through `CorrectionV1` and `CorrectionV2` and assert
1898    /// that they emit the same updates at every step.
1899    ///
1900    /// Models the `write_batches` operator catching up through many distinct timestamps: the
1901    /// desired input runs ahead, batches are written one timestamp at a time, and written updates
1902    /// come back negated through the persist feedback.
1903    #[mz_ore::test]
1904    // Columnation regions are not Stacked Borrows compliant: later pushes invalidate the
1905    // provenance of previously stored items under Miri.
1906    #[cfg_attr(miri, ignore)]
1907    fn equivalence_with_v1() {
1908        let sink_metrics = sink_metrics();
1909
1910        let mut v1 =
1911            CorrectionV1::<String>::new(sink_metrics.clone(), sink_metrics.for_worker(0), 1);
1912        let mut v2 = CorrectionV2::<String>::new(
1913            sink_metrics.clone(),
1914            sink_metrics.for_worker(0),
1915            None,
1916            3.0,
1917            8 * 1024,
1918        );
1919
1920        let num_ts = 50;
1921        let keys = 4;
1922
1923        // Upsert-style input: every timestamp updates each key, retracting the previous value.
1924        let batch = |t: u64| -> Vec<(String, Timestamp, Diff)> {
1925            (0..keys)
1926                .flat_map(|k| {
1927                    let addition = (format!("{k}-{t}"), Timestamp::from(t), Diff::ONE);
1928                    let retraction = t
1929                        .checked_sub(1)
1930                        .map(|p| (format!("{k}-{p}"), Timestamp::from(t), -Diff::ONE));
1931                    std::iter::once(addition).chain(retraction)
1932                })
1933                .collect()
1934        };
1935
1936        // Pre-fill both with all batches, like a catch-up where the input runs ahead.
1937        for t in 0..num_ts {
1938            v1.insert(&mut batch(t));
1939            v2.insert(&mut batch(t));
1940        }
1941
1942        // Drain stepwise, with persist feedback, comparing emissions.
1943        for t in 0..num_ts {
1944            let upper = Antichain::from_elem(Timestamp::from(t + 1));
1945
1946            let mut out1: Vec<_> = v1.updates_before(&upper).collect();
1947            let mut out2: Vec<_> = v2.updates_before(&upper).collect();
1948            out1.sort();
1949            out2.sort();
1950            assert_eq!(out1, out2, "diverged at t={t}");
1951
1952            v1.insert_negated(&mut out1.clone());
1953            v2.insert_negated(&mut out2);
1954            v1.advance_since(upper.clone());
1955            v2.advance_since(upper);
1956        }
1957
1958        // Compare the final state at the since.
1959        let upper = Antichain::from_elem(Timestamp::from(num_ts + 1));
1960        v1.consolidate_at_since();
1961        v2.consolidate_at_since();
1962        let mut out1: Vec<_> = v1.updates_before(&upper).collect();
1963        let mut out2: Vec<_> = v2.updates_before(&upper).collect();
1964        out1.sort();
1965        out2.sort();
1966        assert_eq!(out1, out2);
1967    }
1968
1969    /// A since jump across many distinct buffered timestamps must collapse them onto the since.
1970    #[mz_ore::test]
1971    // Columnation regions are not Stacked Borrows compliant: later pushes invalidate the
1972    // provenance of previously stored items under Miri.
1973    #[cfg_attr(miri, ignore)]
1974    fn since_jump() {
1975        let sink_metrics = sink_metrics();
1976        let mut v2 = CorrectionV2::<String>::new(
1977            sink_metrics.clone(),
1978            sink_metrics.for_worker(0),
1979            None,
1980            3.0,
1981            8 * 1024,
1982        );
1983
1984        let num_ts = 100;
1985        for t in 0..num_ts {
1986            v2.insert(&mut vec![
1987                (format!("a-{t}"), Timestamp::from(t), Diff::ONE),
1988                (format!("a-{t}"), Timestamp::from(t), -Diff::ONE),
1989                (format!("b-{t}"), Timestamp::from(t), Diff::ONE),
1990            ]);
1991        }
1992
1993        v2.advance_since(Antichain::from_elem(Timestamp::from(num_ts)));
1994        v2.consolidate_at_since();
1995
1996        let upper = Antichain::from_elem(Timestamp::from(num_ts + 1));
1997        let out: Vec<_> = v2.updates_before(&upper).collect();
1998        assert_eq!(out.len(), usize::try_from(num_ts).unwrap());
1999        assert!(
2000            out.iter()
2001                .all(|(_, t, r)| *t == Timestamp::from(num_ts) && *r == Diff::ONE)
2002        );
2003    }
2004
2005    /// Reads must not observe updates at or beyond their `upper`, even when the `upper` is not
2006    /// beyond the `since`.
2007    #[mz_ore::test]
2008    // Columnation regions are not Stacked Borrows compliant: later pushes invalidate the
2009    // provenance of previously stored items under Miri.
2010    #[cfg_attr(miri, ignore)]
2011    fn upper_not_beyond_since() {
2012        let sink_metrics = sink_metrics();
2013        let mut v2 = CorrectionV2::<String>::new(
2014            sink_metrics.clone(),
2015            sink_metrics.for_worker(0),
2016            None,
2017            3.0,
2018            8 * 1024,
2019        );
2020
2021        v2.insert(&mut vec![(
2022            "a".to_owned(),
2023            Timestamp::from(5_u64),
2024            Diff::ONE,
2025        )]);
2026        v2.advance_since(Antichain::from_elem(Timestamp::from(10_u64)));
2027
2028        // The update logically lives at time 10 now, so a read before 7 must be empty.
2029        let upper = Antichain::from_elem(Timestamp::from(7_u64));
2030        assert_eq!(v2.updates_before(&upper).count(), 0);
2031
2032        // A read before 11 must emit it, advanced to the since.
2033        let upper = Antichain::from_elem(Timestamp::from(11_u64));
2034        let out: Vec<_> = v2.updates_before(&upper).collect();
2035        assert_eq!(
2036            out,
2037            vec![("a".to_owned(), Timestamp::from(10_u64), Diff::ONE)]
2038        );
2039    }
2040
2041    /// A [`PagingPolicy`] that always spills to the swap backend, uncompressed.
2042    ///
2043    /// The default global pager keeps every chunk resident; installing this drives the actual
2044    /// spill path so the tests exercise [`Chunk::column`]'s page-in through [`mz_ore::pager`].
2045    ///
2046    /// [`PagingPolicy`]: column_pager::PagingPolicy
2047    struct ForceSwap;
2048
2049    impl column_pager::PagingPolicy for ForceSwap {
2050        fn decide(&self, _hint: column_pager::PageHint) -> column_pager::PageDecision {
2051            column_pager::PageDecision::Page {
2052                backend: mz_ore::pager::Backend::Swap,
2053                codec: None,
2054            }
2055        }
2056        fn record(&self, _event: column_pager::PageEvent) {}
2057    }
2058
2059    /// Install a global pager that spills every chunk to swap for the duration of `f`, then
2060    /// restore the default (disabled) pager. The global pager is process-wide; concurrent tests
2061    /// only ever observe a correct round-trip regardless of backend, so racing on it is benign.
2062    fn with_swap_pager<R>(f: impl FnOnce() -> R) -> R {
2063        use std::sync::Arc;
2064        column_pager::set_global_pager(column_pager::ColumnPager::new(Arc::new(ForceSwap)));
2065        let result = f();
2066        column_pager::set_global_pager(column_pager::ColumnPager::disabled());
2067        result
2068    }
2069
2070    /// Build a chain crossing the mint boundary while every chunk is spilled to swap, then assert
2071    /// `iter()` (the read path behind `updates_before`) pages each chunk back in and roundtrips
2072    /// values, order, and diffs.
2073    #[mz_ore::test]
2074    #[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri
2075    fn iter_roundtrips_through_swap_backend() {
2076        let count = 200_000_u64;
2077        with_swap_pager(|| {
2078            let mut builder = ChainBuilder::<i64>::default();
2079            for i in 0..count {
2080                let d = i64::try_from(i).expect("fits");
2081                builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
2082            }
2083            let chain = builder.finish();
2084            assert!(chain.chunks.len() > 1, "expected multiple minted chunks");
2085            assert_eq!(chain.update_count, usize::try_from(count).expect("fits"));
2086
2087            let mut expected = 0_u64;
2088            for (d, t, r) in chain.iter() {
2089                assert_eq!(d, i64::try_from(expected).expect("fits"));
2090                assert_eq!(t, Timestamp::new(expected));
2091                assert_eq!(r, Diff::ONE);
2092                expected += 1;
2093            }
2094            assert_eq!(expected, count);
2095        });
2096    }
2097
2098    /// Drive a [`Cursor`] over a spilled, multi-chunk chain to completion (the access pattern
2099    /// merges use). Each step pages the front chunk back in via [`Chunk::column`]; assert the
2100    /// cursor yields every update in order.
2101    #[mz_ore::test]
2102    #[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri
2103    fn cursor_steps_through_swap_backend() {
2104        let count = 200_000_u64;
2105        with_swap_pager(|| {
2106            let mut builder = ChainBuilder::<i64>::default();
2107            for i in 0..count {
2108                let d = i64::try_from(i).expect("fits");
2109                builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
2110            }
2111            let chain = builder.finish();
2112            assert!(chain.chunks.len() > 1, "expected multiple minted chunks");
2113
2114            let mut rest = chain.into_cursor();
2115            let mut expected = 0_u64;
2116            while let Some(cursor) = rest.take() {
2117                let (d, t, r) = cursor.get();
2118                assert_eq!(i64::into_owned(d), i64::try_from(expected).expect("fits"));
2119                assert_eq!(t, Timestamp::new(expected));
2120                assert_eq!(r, Diff::ONE);
2121                expected += 1;
2122                rest = cursor.step();
2123            }
2124            assert_eq!(expected, count);
2125        });
2126    }
2127}