Skip to main content

mz_compute/sink/
correction_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//!  * insert new updates
15//!  * advance the compaction frontier (called `since`)
16//!  * obtain an iterator over consolidated updates before some `upper`
17//!  * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of [`Chain`]s containing [`Chunk`]s of stashed updates. Each
32//! [`Chunk`] is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! ```text
36//!       chain[0]   |   chain[1]   |   chain[2]
37//!                  |              |
38//!     chunk[0]     | chunk[0]     | chunk[0]
39//!       (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
40//!       (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
41//!     chunk[1]     | chunk[1]     |
42//!       (c, 1, +1) |   (c, 2, -2) |
43//!       (a, 2, -1) |   (c, 4, -1) |
44//!     chunk[2]     |              |
45//!       (b, 2, +1) |              |
46//!       (c, 2, +1) |              |
47//!     chunk[3]     |              |
48//!       (b, 3, -1) |              |
49//!       (c, 3, +1) |              |
50//! ```
51//!
52//! The "chain invariant" states that each chain has at least `chain_proportionality` times as
53//! many updates as the next one. This means that chain sizes will often be powers of
54//! `chain_proportionality`, but they don't have to be. For example, for a proportionality of 2,
55//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
56//!
57//! Note that the invariant is maintained on update counts, not chunk counts. Chunks are
58//! byte-bounded (see [`ChunkBuilder`]), so chunk count is not proportional to update count and
59//! would be a poor proxy: any chain below the chunk byte boundary is a single chunk regardless
60//! of how many updates it holds, which would let the geometric invariant collapse and break the
61//! O(log N) amortization of inserts.
62//!
63//! Choosing the `chain_proportionality` value allows tuning the trade-off between memory and CPU
64//! resources required to maintain corrections. A higher proportionality forces more frequent chain
65//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
66//!
67//! ## Inserting Updates
68//!
69//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
70//! list until the chain invariant is restored.
71//!
72//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
73//! chunk, copying the update in, and then likely merging with an existing chain to restore the
74//! chain invariant. If updates trickle in in small batches, this can cause a considerable
75//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
76//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
77//! [`Chunk`], they are sorted an inserted into the chains.
78//!
79//! The insert operation has an amortized complexity of O(log N), with N being the current number
80//! of updates stored.
81//!
82//! ## Retrieving Consolidated Updates
83//!
84//! Retrieving consolidated updates before a given `upper` works by first consolidating all updates
85//! at times before the `upper`, merging them all into one chain, then returning an iterator over
86//! that chain.
87//!
88//! Because each chain contains updates ordered by time first, consolidation of all updates before
89//! an `upper` is possible without touching updates at future times. It works by merging the chains
90//! only up to the `upper`, producing a merged chain containing consolidated times before the
91//! `upper` and leaving behind the chain parts containing later times. The complexity of this
92//! operation is O(U log K), with U being the number of updates before `upper` and K the number
93//! of chains.
94//!
95//! Unfortunately, performing consolidation as described above can break the chain invariant and we
96//! might need to restore it by merging chains, including ones containing future updates. This is
97//! something that would be great to fix! In the meantime the hope is that in steady state it
98//! doesn't matter too much because either there are no future retractions and U is approximately
99//! equal to N, or the amount of future retractions is much larger than the amount of current
100//! changes, in which case removing the current changes has a good chance of leaving the chain
101//! invariant intact.
102//!
103//! ## Merging Chains
104//!
105//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
106//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
107//! complexity of a merge of K chains containing N updates is O(N log K).
108//!
109//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
110//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
111//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
112//!
113//! For example, consider these two chains, assuming `since = [2]`:
114//!   chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
115//!   chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
116//! After time advancement, the chains look like this:
117//!   chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
118//!   chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
119//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
120//! neither sorted nor consolidated.
121//!
122//! Instead we need to merge sub-chains, one for each distinct time that's before or at the
123//! `since`. Each of these sub-chains retains the (time, data) ordering after the time advancement
124//! to `since`, so merging those yields the expected result.
125//!
126//! For the above example, the chains we would merge are:
127//!   chain 1.a: [(c, 2, +1)]
128//!   chain 1.b: [(b, 2, -1), (a, 3, -1)]
129//!   chain 2.a: [(b, 2, +1)],
130//!   chain 2.b: [(a, 2, +1), (c, 2, -1)]
131
132use std::cmp::Ordering;
133use std::collections::{BinaryHeap, VecDeque};
134use std::fmt;
135use std::rc::Rc;
136
137use columnar::{Columnar, Index, Len, Ref};
138use mz_ore::cast::CastLossy;
139use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
140use mz_repr::{Diff, Timestamp};
141use mz_timely_util::columnar::Column;
142use timely::PartialOrder;
143use timely::dataflow::channels::ContainerBytes;
144use timely::progress::Antichain;
145
146use crate::sink::correction::{ChannelLogging, SizeMetrics};
147
148/// Convenient alias for use in data trait bounds.
149///
150/// `D` is constrained to be `Columnar`, so that updates can be stored in a single columnar
151/// region per chunk, and the variable-length payload (e.g. `Row` bytes) lives in the same
152/// allocation as the rest of the chunk. The `Ref`-level `Eq + Ord` bounds let the merge/heap
153/// code compare updates directly through the columnar borrow, avoiding `into_owned` clones
154/// on the hot path.
155pub trait Data:
156    differential_dataflow::Data
157    + Columnar<Container: Send + Sync + Clone + for<'a> columnar::Borrow<Ref<'a>: Eq + Ord>>
158    + Send
159    + Sync
160{
161}
162impl<D> Data for D where
163    D: differential_dataflow::Data
164        + Columnar<Container: Send + Sync + Clone + for<'a> columnar::Borrow<Ref<'a>: Eq + Ord>>
165        + Send
166        + Sync
167{
168}
169
170/// A data structure used to store corrections in the MV sink implementation.
171///
172/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
173/// allowing their memory to be transparently spilled to disk.
174#[derive(Debug)]
175pub(super) struct CorrectionV2<D: Data> {
176    /// Chains containing sorted updates.
177    chains: Vec<Chain<D>>,
178    /// A staging area for updates, to speed up small inserts.
179    stage: Stage<D>,
180    /// The frontier by which all contained times are advanced.
181    since: Antichain<Timestamp>,
182    /// The size factor of subsequent chains required by the chain invariant.
183    chain_proportionality: f64,
184
185    /// Total count of updates in the correction buffer.
186    ///
187    /// Tracked to compute deltas in `update_metrics`.
188    prev_update_count: usize,
189    /// Total heap size used by the correction buffer.
190    ///
191    /// Tracked to compute deltas in `update_metrics`.
192    prev_size: SizeMetrics,
193    /// Global persist sink metrics.
194    metrics: SinkMetrics,
195    /// Per-worker persist sink metrics.
196    worker_metrics: SinkWorkerMetrics,
197    /// Introspection logging.
198    logging: Option<ChannelLogging>,
199}
200
201impl<D: Data> CorrectionV2<D> {
202    /// Construct a new [`CorrectionV2`] instance.
203    pub fn new(
204        metrics: SinkMetrics,
205        worker_metrics: SinkWorkerMetrics,
206        logging: Option<ChannelLogging>,
207        chain_proportionality: f64,
208        chunk_size: usize,
209    ) -> Self {
210        let update_size = std::mem::size_of::<(D, Timestamp, Diff)>();
211        let chunk_capacity = std::cmp::max(chunk_size / update_size, 1);
212
213        Self {
214            chains: Default::default(),
215            stage: Stage::new(logging.clone(), chunk_capacity),
216            since: Antichain::from_elem(Timestamp::MIN),
217            chain_proportionality,
218            prev_update_count: 0,
219            prev_size: Default::default(),
220            metrics,
221            worker_metrics,
222            logging,
223        }
224    }
225
226    /// Insert a batch of updates.
227    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
228        let Some(since_ts) = self.since.as_option() else {
229            // If the since is the empty frontier, discard all updates.
230            updates.clear();
231            return;
232        };
233
234        for (_, time, _) in &mut *updates {
235            *time = std::cmp::max(*time, *since_ts);
236        }
237
238        self.insert_inner(updates);
239    }
240
241    /// Insert a batch of updates, after negating their diffs.
242    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
243        let Some(since_ts) = self.since.as_option() else {
244            // If the since is the empty frontier, discard all updates.
245            updates.clear();
246            return;
247        };
248
249        for (_, time, diff) in &mut *updates {
250            *time = std::cmp::max(*time, *since_ts);
251            *diff = -*diff;
252        }
253
254        self.insert_inner(updates);
255    }
256
257    /// Insert a batch of updates.
258    ///
259    /// All times are expected to be >= the `since`.
260    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
261        debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
262
263        if let Some(chain) = self.stage.insert(updates) {
264            self.log_chain_created(&chain);
265            self.chains.push(chain);
266
267            // Restore the chain invariant.
268            let prop = self.chain_proportionality;
269            let merge_needed = |chains: &[Chain<_>]| match chains {
270                [.., prev, last] => {
271                    let last_len = f64::cast_lossy(last.update_count);
272                    let prev_len = f64::cast_lossy(prev.update_count);
273                    last_len * prop > prev_len
274                }
275                _ => false,
276            };
277
278            while merge_needed(&self.chains) {
279                let a = self.chains.pop().unwrap();
280                let b = self.chains.pop().unwrap();
281                self.log_chain_dropped(&a);
282                self.log_chain_dropped(&b);
283
284                let merged = self.merge_chains([a, b]);
285                self.log_chain_created(&merged);
286                self.chains.push(merged);
287            }
288        };
289
290        self.update_metrics();
291    }
292
293    /// Return consolidated updates before the given `upper`.
294    pub fn updates_before<'a>(
295        &'a mut self,
296        upper: &Antichain<Timestamp>,
297    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + Send + 'a {
298        let mut result = None;
299
300        if !PartialOrder::less_than(&self.since, upper) {
301            // All contained updates are beyond the upper.
302            return result.into_iter().flatten();
303        }
304
305        self.consolidate_before(upper);
306
307        // There is at most one chain that contains updates before `upper` now.
308        result = self
309            .chains
310            .iter()
311            .find(|c| c.first().is_some_and(|(_, t, _)| !upper.less_equal(&t)))
312            .map(move |c| {
313                let upper = upper.clone();
314                c.iter().take_while(move |(_, t, _)| !upper.less_equal(t))
315            });
316
317        result.into_iter().flatten()
318    }
319
320    /// Consolidate all updates before the given `upper`.
321    ///
322    /// Once this method returns, all remaining updates before `upper` are contained in a single
323    /// chain. Note that this chain might also contain updates beyond `upper` though!
324    fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
325        if self.chains.is_empty() && self.stage.is_empty() {
326            return;
327        }
328
329        let mut chains = std::mem::take(&mut self.chains);
330
331        // To keep things simple, we log the dropping of all chains here and log the creation of
332        // all remaining chains at the end. This causes more event churn than necessary, but the
333        // consolidated result is correct.
334        chains.iter().for_each(|c| self.log_chain_dropped(c));
335
336        Extend::extend(&mut chains, self.stage.flush());
337
338        if chains.is_empty() {
339            // We can only get here if the stage contained updates but they all got consolidated
340            // away by `flush`, so we need to update the metrics before we return.
341            self.update_metrics();
342            return;
343        }
344
345        let (merged, remains) = self.merge_chains_up_to(chains, upper);
346
347        self.chains = remains;
348        if !merged.is_empty() {
349            // We put the merged chain at the end, assuming that its contents are likely to
350            // consolidate with retractions that will arrive soon.
351            self.chains.push(merged);
352        }
353
354        // Restore the chain invariant.
355        //
356        // This part isn't great. We've taken great care so far to only look at updates with times
357        // before `upper`, but now we might end up merging all chains anyway in the worst case.
358        // There might be something smarter we could do to avoid merging as much as possible. For
359        // example, we could consider sorting chains by length first, or inspect the contained
360        // times and prefer merging chains that have a chance at consolidating with one another.
361        let mut i = self.chains.len().saturating_sub(1);
362        while i > 0 {
363            let needs_merge = self.chains.get(i).is_some_and(|a| {
364                let b = &self.chains[i - 1];
365                let a_len = f64::cast_lossy(a.update_count);
366                let b_len = f64::cast_lossy(b.update_count);
367                a_len * self.chain_proportionality > b_len
368            });
369            if needs_merge {
370                let a = self.chains.remove(i);
371                let b = std::mem::replace(&mut self.chains[i - 1], Chain::new());
372                let merged = self.merge_chains([a, b]);
373                self.chains[i - 1] = merged;
374            } else {
375                // Only advance the index if we didn't merge. A merge can reduce the size of the
376                // chain at `i - 1`, causing an violation of the chain invariant with the next
377                // chain, so we might need to merge the two before proceeding to lower indexes.
378                i -= 1;
379            }
380        }
381
382        self.chains.iter().for_each(|c| self.log_chain_created(c));
383        self.update_metrics();
384    }
385
386    /// Advance the since frontier.
387    ///
388    /// # Panics
389    ///
390    /// Panics if the given `since` is less than the current since frontier.
391    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
392        assert!(PartialOrder::less_equal(&self.since, &since));
393        self.stage.advance_times(&since);
394        self.since = since;
395    }
396
397    /// Consolidate all updates at the current `since`.
398    pub fn consolidate_at_since(&mut self) {
399        let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
400        if let Some(upper_ts) = upper_ts {
401            let upper = Antichain::from_elem(upper_ts);
402            self.consolidate_before(&upper);
403        }
404    }
405
406    fn log_chain_created(&self, chain: &Chain<D>) {
407        if let Some(logging) = &self.logging {
408            logging.chain_created(chain.update_count);
409        }
410    }
411
412    fn log_chain_dropped(&self, chain: &Chain<D>) {
413        if let Some(logging) = &self.logging {
414            logging.chain_dropped(chain.update_count);
415        }
416    }
417
418    /// Update persist sink metrics.
419    fn update_metrics(&mut self) {
420        let mut new_size = self.stage.get_size();
421        let mut new_length = self.stage.data.len();
422        for chain in &self.chains {
423            new_size += chain.get_size();
424            new_length += chain.update_count;
425        }
426
427        self.update_metrics_inner(new_size, new_length);
428    }
429
430    /// Update persist sink metrics to the given new size and length.
431    fn update_metrics_inner(&mut self, new_size: SizeMetrics, new_length: usize) {
432        let old_size = self.prev_size;
433        let old_length = self.prev_update_count;
434        let len_delta = UpdateDelta::new(new_length, old_length);
435        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
436        self.metrics
437            .report_correction_update_deltas(len_delta, cap_delta);
438        self.worker_metrics
439            .report_correction_update_totals(new_length, new_size.capacity);
440
441        if let Some(logging) = &self.logging {
442            let i = |x: usize| isize::try_from(x).expect("must fit");
443            logging.report_size_diff(i(new_size.size) - i(old_size.size));
444            logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
445            logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
446        }
447
448        self.prev_size = new_size;
449        self.prev_update_count = new_length;
450    }
451
452    /// Merge the given chains, advancing times by the current `since` in the process.
453    fn merge_chains(&self, chains: impl IntoIterator<Item = Chain<D>>) -> Chain<D> {
454        let Some(&since_ts) = self.since.as_option() else {
455            return Chain::new();
456        };
457
458        let mut to_merge = Vec::new();
459        for chain in chains {
460            if let Some(cursor) = chain.into_cursor() {
461                let mut runs = cursor.advance_by(since_ts);
462                to_merge.append(&mut runs);
463            }
464        }
465
466        self.merge_cursors(to_merge)
467    }
468
469    /// Merge the given chains, advancing times by the current `since` in the process, but only up
470    /// to the given `upper`.
471    ///
472    /// Returns the merged chain and a list of non-empty remainders of the input chains.
473    fn merge_chains_up_to(
474        &self,
475        chains: Vec<Chain<D>>,
476        upper: &Antichain<Timestamp>,
477    ) -> (Chain<D>, Vec<Chain<D>>) {
478        let Some(&since_ts) = self.since.as_option() else {
479            return (Chain::new(), Vec::new());
480        };
481        let Some(&upper_ts) = upper.as_option() else {
482            let merged = self.merge_chains(chains);
483            return (merged, Vec::new());
484        };
485
486        if since_ts >= upper_ts {
487            // After advancing by `since` there will be no updates before `upper`.
488            return (Chain::new(), chains);
489        }
490
491        let mut to_merge = Vec::new();
492        let mut to_keep = Vec::new();
493        for chain in chains {
494            if let Some(cursor) = chain.into_cursor() {
495                let mut runs = cursor.advance_by(since_ts);
496                if let Some(last) = runs.pop() {
497                    let (before, beyond) = last.split_at_time(upper_ts);
498                    before.map(|c| runs.push(c));
499                    beyond.map(|c| to_keep.push(c));
500                }
501                to_merge.append(&mut runs);
502            }
503        }
504
505        let merged = self.merge_cursors(to_merge);
506        let remains = to_keep
507            .into_iter()
508            .map(|c| c.try_unwrap().expect("unwrapable"))
509            .collect();
510
511        (merged, remains)
512    }
513
514    /// Merge the given cursors into one chain.
515    fn merge_cursors(&self, cursors: Vec<Cursor<D>>) -> Chain<D> {
516        match cursors.len() {
517            0 => Chain::new(),
518            1 => {
519                let [cur] = cursors.try_into().unwrap();
520                cur.into_chain()
521            }
522            2 => {
523                let [a, b] = cursors.try_into().unwrap();
524                self.merge_2(a, b)
525            }
526            _ => self.merge_many(cursors),
527        }
528    }
529
530    /// Merge the given two cursors using a 2-way merge.
531    ///
532    /// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
533    fn merge_2(&self, cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
534        let mut rest1 = Some(cursor1);
535        let mut rest2 = Some(cursor2);
536        let mut merged = ChainBuilder::default();
537
538        loop {
539            match (rest1, rest2) {
540                (Some(c1), Some(c2)) => {
541                    let (d1, t1, r1) = c1.get();
542                    let (d2, t2, r2) = c2.get();
543
544                    match (t1, d1).cmp(&(t2, d2)) {
545                        Ordering::Less => {
546                            merged.push_ref((d1, t1, r1));
547                            rest1 = c1.step();
548                            rest2 = Some(c2);
549                        }
550                        Ordering::Greater => {
551                            merged.push_ref((d2, t2, r2));
552                            rest1 = Some(c1);
553                            rest2 = c2.step();
554                        }
555                        Ordering::Equal => {
556                            let r = r1 + r2;
557                            if r != Diff::ZERO {
558                                merged.push_ref((d1, t1, r));
559                            }
560                            rest1 = c1.step();
561                            rest2 = c2.step();
562                        }
563                    }
564                }
565                (Some(c), None) | (None, Some(c)) => {
566                    merged.push_cursor(c);
567                    break;
568                }
569                (None, None) => break,
570            }
571        }
572
573        merged.finish()
574    }
575
576    /// Merge the given cursors using a k-way merge with a binary heap.
577    fn merge_many(&self, cursors: Vec<Cursor<D>>) -> Chain<D> {
578        let mut heap = MergeHeap::from_iter(cursors);
579        let mut merged = ChainBuilder::default();
580        while let Some(cursor1) = heap.pop() {
581            let (data, time, mut diff) = cursor1.get();
582
583            while let Some((cursor2, r)) = heap.pop_equal(data, time) {
584                diff += r;
585                if let Some(cursor2) = cursor2.step() {
586                    heap.push(cursor2);
587                }
588            }
589
590            if diff != Diff::ZERO {
591                merged.push_ref((data, time, diff));
592            }
593            if let Some(cursor1) = cursor1.step() {
594                heap.push(cursor1);
595            }
596        }
597
598        merged.finish()
599    }
600}
601
602impl<D: Data> Drop for CorrectionV2<D> {
603    fn drop(&mut self) {
604        self.chains.iter().for_each(|c| self.log_chain_dropped(c));
605        self.update_metrics_inner(Default::default(), 0);
606    }
607}
608
609/// A chain of [`Chunk`]s containing updates.
610///
611/// All updates in a chain are sorted by (time, data) and consolidated.
612///
613/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
614/// keeping around empty chains.
615#[derive(Debug)]
616struct Chain<D: Data> {
617    /// The contained chunks.
618    chunks: Vec<Chunk<D>>,
619    /// The number of updates contained in all chunks.
620    update_count: usize,
621}
622
623impl<D: Data> Chain<D> {
624    /// Construct an empty chain.
625    fn new() -> Self {
626        Self {
627            chunks: Default::default(),
628            update_count: 0,
629        }
630    }
631
632    /// Return whether the chain is empty.
633    fn is_empty(&self) -> bool {
634        self.chunks.is_empty()
635    }
636
637    /// Push a chunk onto the chain.
638    ///
639    /// All updates in the chunk must sort after all updates already in the chain, in
640    /// (time, data)-order, to ensure the chain remains sorted.
641    fn push_chunk(&mut self, chunk: Chunk<D>) {
642        debug_assert!(self.can_accept(chunk.first()));
643
644        self.update_count += chunk.len();
645        self.chunks.push(chunk);
646    }
647
648    /// Return whether the chain can accept the given update.
649    ///
650    /// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
651    fn can_accept<'a>(&'a self, update: Ref<'a, (D, Timestamp, Diff)>) -> bool {
652        self.last().is_none_or(|(dc, tc, _)| {
653            let (d, t, _) = update;
654            (tc, dc) < (t, d)
655        })
656    }
657
658    /// Return the first update in the chain, if any.
659    fn first(&self) -> Option<Ref<'_, (D, Timestamp, Diff)>> {
660        self.chunks.first().map(|c| c.first())
661    }
662
663    /// Return the last update in the chain, if any.
664    fn last(&self) -> Option<Ref<'_, (D, Timestamp, Diff)>> {
665        self.chunks.last().map(|c| c.last())
666    }
667
668    /// Convert the chain into a cursor over the contained updates.
669    fn into_cursor(self) -> Option<Cursor<D>> {
670        let chunks = self.chunks.into_iter().map(Rc::new).collect();
671        Cursor::new(chunks)
672    }
673
674    /// Return an iterator over the contained updates.
675    fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
676        self.chunks.iter().flat_map(|c| {
677            (0..c.len()).map(move |i| {
678                let (d, t, r) = c.index(i);
679                (D::into_owned(d), t, r)
680            })
681        })
682    }
683
684    /// Return the size of the chain, for use in metrics.
685    fn get_size(&self) -> SizeMetrics {
686        let mut metrics = SizeMetrics::default();
687        for chunk in &self.chunks {
688            metrics += chunk.get_size();
689        }
690        metrics
691    }
692}
693
694/// A builder that constructs a [`Chain`] from a stream of updates.
695///
696/// Wraps a [`ChunkBuilder`] and drains its minted chunks into a [`Chain`]. Pushed updates must
697/// arrive in (time, data) sorted order.
698struct ChainBuilder<D: Data> {
699    builder: ChunkBuilder<D>,
700    chain: Chain<D>,
701}
702
703impl<D: Data> Default for ChainBuilder<D> {
704    fn default() -> Self {
705        Self {
706            builder: Default::default(),
707            chain: Chain::new(),
708        }
709    }
710}
711
712impl<D: Data> ChainBuilder<D> {
713    /// Push a reference-form update into the builder.
714    fn push_ref(&mut self, update: Ref<'_, (D, Timestamp, Diff)>) {
715        self.builder.push(update);
716        self.drain();
717    }
718
719    /// Push an owned-form update into the builder.
720    fn push_owned(&mut self, update: &(D, Timestamp, Diff)) {
721        self.builder.push(update);
722        self.drain();
723    }
724
725    /// Push the updates produced by a cursor into the builder.
726    fn push_cursor(&mut self, cursor: Cursor<D>) {
727        let mut rest = Some(cursor);
728        while let Some(cursor) = rest.take() {
729            let update = cursor.get();
730            self.push_ref(update);
731            rest = cursor.step();
732        }
733    }
734
735    /// Move any minted chunks from the builder into the chain.
736    fn drain(&mut self) {
737        while let Some(chunk) = self.builder.pop() {
738            self.chain.push_chunk(chunk);
739        }
740    }
741
742    /// Finish building, returning the assembled [`Chain`].
743    fn finish(self) -> Chain<D> {
744        let Self { builder, mut chain } = self;
745        for chunk in builder.finish() {
746            if chunk.len() > 0 {
747                chain.push_chunk(chunk);
748            }
749        }
750        chain
751    }
752}
753
754impl<D: Data> Extend<(D, Timestamp, Diff)> for ChainBuilder<D> {
755    fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
756        for update in iter {
757            self.push_owned(&update);
758        }
759    }
760}
761
762/// A cursor over updates in a chain.
763///
764/// A cursor provides two guarantees:
765///  * Produced updates are ordered and consolidated.
766///  * A cursor always yields at least one update.
767///
768/// The second guarantee is enforced through the type system: Every method that steps a cursor
769/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
770/// over the last update.
771///
772/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
773/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
774/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
775#[derive(Clone, Debug)]
776struct Cursor<D: Data> {
777    /// The chunks from which updates can still be produced.
778    chunks: VecDeque<Rc<Chunk<D>>>,
779    /// The current offset into `chunks.front()`.
780    chunk_offset: usize,
781    /// An optional limit for the number of updates the cursor will produce.
782    limit: Option<usize>,
783    /// An optional overwrite for the timestamp of produced updates.
784    overwrite_ts: Option<Timestamp>,
785}
786
787impl<D: Data> Cursor<D> {
788    /// Construct a cursor over a list of chunks.
789    ///
790    /// Returns `None` if `chunks` is empty.
791    fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
792        if chunks.is_empty() {
793            return None;
794        }
795
796        Some(Self {
797            chunks,
798            chunk_offset: 0,
799            limit: None,
800            overwrite_ts: None,
801        })
802    }
803
804    /// Set a limit for the number of updates this cursor will produce.
805    ///
806    /// # Panics
807    ///
808    /// Panics if there is already a limit lower than the new one.
809    fn set_limit(mut self, limit: usize) -> Option<Self> {
810        assert!(self.limit.is_none_or(|l| l >= limit));
811
812        if limit == 0 {
813            return None;
814        }
815
816        // Release chunks made unreachable by the limit.
817        let mut count = 0;
818        let mut idx = 0;
819        let mut offset = self.chunk_offset;
820        while idx < self.chunks.len() && count < limit {
821            let chunk = &self.chunks[idx];
822            count += chunk.len() - offset;
823            idx += 1;
824            offset = 0;
825        }
826        self.chunks.truncate(idx);
827
828        if count > limit {
829            self.limit = Some(limit);
830        }
831
832        Some(self)
833    }
834
835    /// Get a reference to the current update.
836    fn get(&self) -> Ref<'_, (D, Timestamp, Diff)> {
837        let chunk = self.get_chunk();
838        let (d, t, r) = chunk.index(self.chunk_offset);
839        let t = self.overwrite_ts.unwrap_or(t);
840        (d, t, r)
841    }
842
843    /// Get a reference to the current chunk.
844    fn get_chunk(&self) -> &Chunk<D> {
845        &self.chunks[0]
846    }
847
848    /// Step to the next update.
849    ///
850    /// Returns the stepped cursor, or `None` if the step was over the last update.
851    fn step(mut self) -> Option<Self> {
852        if self.chunk_offset == self.get_chunk().len() - 1 {
853            return self.skip_chunk().map(|(c, _)| c);
854        }
855
856        self.chunk_offset += 1;
857
858        if let Some(limit) = &mut self.limit {
859            *limit -= 1;
860            if *limit == 0 {
861                return None;
862            }
863        }
864
865        Some(self)
866    }
867
868    /// Skip the remainder of the current chunk.
869    ///
870    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
871    /// left after the skip.
872    fn skip_chunk(mut self) -> Option<(Self, usize)> {
873        let chunk = self.chunks.pop_front().expect("cursor invariant");
874
875        if self.chunks.is_empty() {
876            return None;
877        }
878
879        let skipped = chunk.len() - self.chunk_offset;
880        self.chunk_offset = 0;
881
882        if let Some(limit) = &mut self.limit {
883            if skipped >= *limit {
884                return None;
885            }
886            *limit -= skipped;
887        }
888
889        Some((self, skipped))
890    }
891
892    /// Skip all updates with times <= the given time.
893    ///
894    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
895    /// left after the skip.
896    fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
897        if self.overwrite_ts.is_some_and(|ts| ts <= time) {
898            return None;
899        } else if self.get().1 > time {
900            return Some((self, 0));
901        }
902
903        let mut skipped = 0;
904
905        let new_offset = loop {
906            let chunk = self.get_chunk();
907            if let Some(index) = chunk.find_time_greater_than(time) {
908                break index;
909            }
910
911            let (cursor, count) = self.skip_chunk()?;
912            self = cursor;
913            skipped += count;
914        };
915
916        skipped += new_offset - self.chunk_offset;
917        self.chunk_offset = new_offset;
918
919        Some((self, skipped))
920    }
921
922    /// Advance all updates in this cursor by the given `since_ts`.
923    ///
924    /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
925    /// been advanced by `since_ts`.
926    fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
927        // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
928        // only need to advance the `overwrite_ts` by the `since_ts`.
929        if let Some(ts) = self.overwrite_ts {
930            if ts < since_ts {
931                self.overwrite_ts = Some(since_ts);
932            }
933            return vec![self];
934        }
935
936        // Otherwise we need to split the cursor so that each new cursor only yields runs of
937        // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
938        // this by splitting the cursor at each time <= `since_ts`.
939        let mut splits = Vec::new();
940        let mut remaining = Some(self);
941
942        while let Some(cursor) = remaining.take() {
943            let (_, time, _) = cursor.get();
944            if time >= since_ts {
945                splits.push(cursor);
946                break;
947            }
948
949            let mut current = cursor.clone();
950            if let Some((cursor, skipped)) = cursor.skip_time(time) {
951                remaining = Some(cursor);
952                current = current.set_limit(skipped).expect("skipped at least 1");
953            }
954            current.overwrite_ts = Some(since_ts);
955            splits.push(current);
956        }
957
958        splits
959    }
960
961    /// Split the cursor at the given time.
962    ///
963    /// Returns two cursors, the first yielding all updates at times < `time`, the second yielding
964    /// all updates at times >= `time`. Both can be `None` if they would be empty.
965    fn split_at_time(self, time: Timestamp) -> (Option<Self>, Option<Self>) {
966        let Some(skip_ts) = time.step_back() else {
967            return (None, Some(self));
968        };
969
970        let before = self.clone();
971        match self.skip_time(skip_ts) {
972            Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
973            None => (Some(before), None),
974        }
975    }
976
977    /// Drain the cursor into a [`Chain`].
978    ///
979    /// This reuses the underlying chunks if possible, and writes new ones otherwise.
980    fn into_chain(self) -> Chain<D> {
981        match self.try_unwrap() {
982            Ok(chain) => chain,
983            Err((_, cursor)) => {
984                let mut builder = ChainBuilder::default();
985                builder.push_cursor(cursor);
986                builder.finish()
987            }
988        }
989    }
990
991    /// Attempt to unwrap the cursor into a [`Chain`].
992    ///
993    /// This operation efficiently reuses chunks by directly inserting them into the output chain
994    /// where possible.
995    ///
996    /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
997    /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
998    /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
999    /// chain by copying chunks rather than reusing them.
1000    fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
1001        if self.limit.is_some() {
1002            return Err(("cursor with limit", self));
1003        }
1004        if self.overwrite_ts.is_some() {
1005            return Err(("cursor with overwrite_ts", self));
1006        }
1007        if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
1008            return Err(("cursor on shared chunks", self));
1009        }
1010
1011        let mut builder = ChainBuilder::default();
1012        let mut remaining = Some(self);
1013
1014        // We might be partway through the first chunk, in which case we can't reuse it but need to
1015        // allocate a new one to contain only the updates the cursor can still yield.
1016        while let Some(cursor) = remaining.take() {
1017            if cursor.chunk_offset == 0 {
1018                remaining = Some(cursor);
1019                break;
1020            }
1021            let update = cursor.get();
1022            builder.push_ref(update);
1023            remaining = cursor.step();
1024        }
1025
1026        let mut chain = builder.finish();
1027        if let Some(cursor) = remaining {
1028            for chunk in cursor.chunks {
1029                let chunk = Rc::into_inner(chunk).expect("checked above");
1030                chain.push_chunk(chunk);
1031            }
1032        }
1033
1034        Ok(chain)
1035    }
1036}
1037
1038/// A non-empty chunk of updates, backed by a columnar region.
1039///
1040/// All updates in a chunk are sorted by (time, data) and consolidated.
1041///
1042/// Chunks are immutable once created. They are produced by [`ChunkBuilder`], which mints a
1043/// new chunk whenever its in-progress columnar container reaches a fixed serialized byte
1044/// boundary (~2 MiB, matching the ship granularity used elsewhere in the codebase), so each
1045/// chunk corresponds to a single, predictably sized allocation.
1046struct Chunk<D: Data> {
1047    /// The contained updates.
1048    data: Column<(D, Timestamp, Diff)>,
1049}
1050
1051impl<D: Data> fmt::Debug for Chunk<D> {
1052    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1053        write!(f, "Chunk(<{}>)", self.len())
1054    }
1055}
1056
1057impl<D: Data> Chunk<D> {
1058    /// Wrap the given column into a chunk.
1059    fn from_column(data: Column<(D, Timestamp, Diff)>) -> Self {
1060        Self { data }
1061    }
1062
1063    /// Return the number of updates in the chunk.
1064    fn len(&self) -> usize {
1065        self.data.borrow().len()
1066    }
1067
1068    /// Return the update at the given index.
1069    ///
1070    /// # Panics
1071    ///
1072    /// Panics if the given index is not populated.
1073    fn index(&self, idx: usize) -> Ref<'_, (D, Timestamp, Diff)> {
1074        self.data.borrow().get(idx)
1075    }
1076
1077    /// Return the first update in the chunk.
1078    fn first(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1079        self.index(0)
1080    }
1081
1082    /// Return the last update in the chunk.
1083    fn last(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1084        self.index(self.len() - 1)
1085    }
1086
1087    /// Return the index of the first update at a time greater than `time`, or `None` if no such
1088    /// update exists.
1089    fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
1090        if self.last().1 <= time {
1091            return None;
1092        }
1093
1094        let mut lower = 0;
1095        let mut upper = self.len();
1096        while lower < upper {
1097            let idx = (lower + upper) / 2;
1098            if self.index(idx).1 > time {
1099                upper = idx;
1100            } else {
1101                lower = idx + 1;
1102            }
1103        }
1104
1105        Some(lower)
1106    }
1107
1108    /// Return the size of the chunk, for use in metrics.
1109    fn get_size(&self) -> SizeMetrics {
1110        let bytes = self.data.length_in_bytes();
1111        SizeMetrics {
1112            size: bytes,
1113            capacity: bytes,
1114            allocations: 1,
1115        }
1116    }
1117}
1118
1119/// Builder that produces a stream of fixed-size [`Chunk`]s.
1120///
1121/// Wraps [`mz_timely_util::columnar::builder::ColumnBuilder`], which mints a new
1122/// [`Column::Align`] chunk whenever its in-progress columnar container reaches a fixed
1123/// serialized byte boundary (~2 MiB, matching the ship granularity used elsewhere in the
1124/// codebase). Each minted chunk is therefore a single, predictably-sized aligned allocation.
1125struct ChunkBuilder<D: Data> {
1126    inner: mz_timely_util::columnar::builder::ColumnBuilder<(D, Timestamp, Diff)>,
1127}
1128
1129impl<D: Data> Default for ChunkBuilder<D> {
1130    fn default() -> Self {
1131        Self {
1132            inner: Default::default(),
1133        }
1134    }
1135}
1136
1137impl<D: Data> ChunkBuilder<D> {
1138    /// Push an update into the builder.
1139    ///
1140    /// Accepts whatever the inner [`ColumnBuilder`]'s [`PushInto`] impl accepts — both the
1141    /// `Ref<'_, (D, T, R)>` refs produced by cursors and `&(D, T, R)` references to owned
1142    /// tuples drained from the staging buffer.
1143    ///
1144    /// [`ColumnBuilder`]: mz_timely_util::columnar::builder::ColumnBuilder
1145    /// [`PushInto`]: timely::container::PushInto
1146    #[inline]
1147    fn push<T>(&mut self, item: T)
1148    where
1149        mz_timely_util::columnar::builder::ColumnBuilder<(D, Timestamp, Diff)>:
1150            timely::container::PushInto<T>,
1151    {
1152        timely::container::PushInto::push_into(&mut self.inner, item);
1153    }
1154
1155    /// Pop a finished chunk, if one is available.
1156    fn pop(&mut self) -> Option<Chunk<D>> {
1157        use timely::container::ContainerBuilder;
1158        // `ColumnBuilder::extract` stashes the popped chunk in its `finished` slot so the
1159        // caller can read it through `&mut`; move it out with `mem::take` so we own it
1160        // (leaves `Column::Typed(Default::default())` behind, which the next `extract`
1161        // overwrites).
1162        self.inner
1163            .extract()
1164            .map(|c| Chunk::from_column(std::mem::take(c)))
1165    }
1166
1167    /// Finalize the builder: flush any in-progress updates as a typed chunk and drain pending.
1168    fn finish(mut self) -> impl Iterator<Item = Chunk<D>> {
1169        use timely::container::ContainerBuilder;
1170        // `ColumnBuilder::finish` flushes the in-progress container into the pending queue
1171        // (as `Column::Typed`) and returns the first pending entry. Subsequent calls drain
1172        // the rest until `None`. Translate that into an owning iterator.
1173        std::iter::from_fn(move || {
1174            self.inner
1175                .finish()
1176                .map(|c| Chunk::from_column(std::mem::take(c)))
1177        })
1178    }
1179}
1180
1181/// A buffer for staging updates before they are inserted into the sorted chains.
1182#[derive(Debug)]
1183struct Stage<D> {
1184    /// The contained updates.
1185    ///
1186    /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
1187    data: Vec<(D, Timestamp, Diff)>,
1188    /// Introspection logging.
1189    ///
1190    /// We want to report the number of records in the stage. To do so, we pretend that the stage
1191    /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
1192    /// re-created.
1193    logging: Option<ChannelLogging>,
1194}
1195
1196impl<D: Data> Stage<D> {
1197    fn new(logging: Option<ChannelLogging>, chunk_capacity: usize) -> Self {
1198        // For logging, we pretend the stage consists of a single chain.
1199        if let Some(logging) = &logging {
1200            logging.chain_created(0);
1201        }
1202
1203        Self {
1204            data: Vec::with_capacity(chunk_capacity),
1205            logging,
1206        }
1207    }
1208
1209    fn is_empty(&self) -> bool {
1210        self.data.is_empty()
1211    }
1212
1213    /// Insert a batch of updates, possibly producing a ready [`Chain`].
1214    fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
1215        if updates.is_empty() {
1216            return None;
1217        }
1218
1219        let prev_length = self.ilen();
1220
1221        // Determine how many chunks we can fill with the available updates.
1222        let update_count = self.data.len() + updates.len();
1223        let chunk_capacity = self.data.capacity();
1224        let chunk_count = update_count / chunk_capacity;
1225
1226        let mut new_updates = updates.drain(..);
1227
1228        // If we have enough shipable updates, collect them, consolidate, and build a chain.
1229        let maybe_chain = if chunk_count > 0 {
1230            let ship_count = chunk_count * chunk_capacity;
1231            let mut buffer = Vec::with_capacity(ship_count);
1232
1233            buffer.append(&mut self.data);
1234            while buffer.len() < ship_count {
1235                let update = new_updates.next().unwrap();
1236                buffer.push(update);
1237            }
1238
1239            consolidate(&mut buffer);
1240
1241            let mut chain = ChainBuilder::default();
1242            chain.extend(buffer);
1243            Some(chain.finish())
1244        } else {
1245            None
1246        };
1247
1248        // Stage the remaining updates.
1249        Extend::extend(&mut self.data, new_updates);
1250
1251        self.log_length_diff(self.ilen() - prev_length);
1252
1253        maybe_chain
1254    }
1255
1256    /// Flush all currently staged updates into a chain.
1257    fn flush(&mut self) -> Option<Chain<D>> {
1258        self.log_length_diff(-self.ilen());
1259
1260        consolidate(&mut self.data);
1261
1262        if self.data.is_empty() {
1263            return None;
1264        }
1265
1266        let mut chain = ChainBuilder::default();
1267        chain.extend(self.data.drain(..));
1268        Some(chain.finish())
1269    }
1270
1271    /// Advance the times of staged updates by the given `since`.
1272    fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1273        let Some(since_ts) = since.as_option() else {
1274            // If the since is the empty frontier, discard all updates.
1275            self.log_length_diff(-self.ilen());
1276            self.data.clear();
1277            return;
1278        };
1279
1280        for (_, time, _) in &mut self.data {
1281            *time = std::cmp::max(*time, *since_ts);
1282        }
1283    }
1284
1285    /// Return the size of the stage, for use in metrics.
1286    ///
1287    /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1288    /// under-estimates. That's fine as the stage should always be small.
1289    fn get_size(&self) -> SizeMetrics {
1290        SizeMetrics {
1291            size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1292            capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1293            allocations: 1,
1294        }
1295    }
1296
1297    /// Return the number of updates in the stage, as an `isize`.
1298    fn ilen(&self) -> isize {
1299        self.data.len().try_into().expect("must fit")
1300    }
1301
1302    fn log_length_diff(&self, diff: isize) {
1303        let Some(logging) = &self.logging else { return };
1304        if diff > 0 {
1305            let count = usize::try_from(diff).expect("must fit");
1306            logging.chain_created(count);
1307            logging.chain_dropped(0);
1308        } else if diff < 0 {
1309            let count = usize::try_from(-diff).expect("must fit");
1310            logging.chain_created(0);
1311            logging.chain_dropped(count);
1312        }
1313    }
1314}
1315
1316impl<D> Drop for Stage<D> {
1317    fn drop(&mut self) {
1318        if let Some(logging) = &self.logging {
1319            logging.chain_dropped(self.data.len());
1320        }
1321    }
1322}
1323
1324/// Sort and consolidate the given list of updates.
1325///
1326/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1327/// except that it sorts updates by (time, data) instead of (data, time).
1328fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1329    if updates.len() <= 1 {
1330        return;
1331    }
1332
1333    let diff = |update: &(_, _, Diff)| update.2;
1334
1335    updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1336
1337    let mut offset = 0;
1338    let mut accum = diff(&updates[0]);
1339
1340    for idx in 1..updates.len() {
1341        let this = &updates[idx];
1342        let prev = &updates[idx - 1];
1343        if this.0 == prev.0 && this.1 == prev.1 {
1344            accum += diff(&updates[idx]);
1345        } else {
1346            if accum != Diff::ZERO {
1347                updates.swap(offset, idx - 1);
1348                updates[offset].2 = accum;
1349                offset += 1;
1350            }
1351            accum = diff(&updates[idx]);
1352        }
1353    }
1354
1355    if accum != Diff::ZERO {
1356        let len = updates.len();
1357        updates.swap(offset, len - 1);
1358        updates[offset].2 = accum;
1359        offset += 1;
1360    }
1361
1362    updates.truncate(offset);
1363}
1364
1365/// Compare two columnar refs that have unrelated input lifetimes.
1366///
1367/// `<D::Container as Borrow>::Ref<'a>` is an associated-type projection through a trait, so
1368/// the compiler treats it as invariant in `'a` and won't auto-shorten the inputs by variance.
1369/// We instead explicitly reborrow both to a fresh, local lifetime `'x` via
1370/// [`Columnar::reborrow`] before letting the inner `==` pick up the `for<'a> Ref<'a>: Eq`
1371/// bound on [`Data`].
1372#[inline]
1373fn refs_eq<D: Data>(a: Ref<'_, D>, b: Ref<'_, D>) -> bool {
1374    #[inline]
1375    fn eq<'x, D: Data>(a: Ref<'x, D>, b: Ref<'x, D>) -> bool {
1376        a == b
1377    }
1378    eq::<D>(D::reborrow(a), D::reborrow(b))
1379}
1380
1381/// A binary heap specialized for merging [`Cursor`]s.
1382struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1383
1384impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1385    fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1386        let inner = cursors.into_iter().map(MergeCursor).collect();
1387        Self(inner)
1388    }
1389}
1390
1391impl<D: Data> MergeHeap<D> {
1392    /// Pop the next cursor (the one yielding the least update) from the heap.
1393    fn pop(&mut self) -> Option<Cursor<D>> {
1394        self.0.pop().map(|MergeCursor(c)| c)
1395    }
1396
1397    /// Pop the next cursor from the heap, provided the data and time of its current update are
1398    /// equal to the given values.
1399    ///
1400    /// Returns both the cursor and the diff corresponding to `data` and `time`.
1401    fn pop_equal(&mut self, data: Ref<'_, D>, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1402        let r = {
1403            let MergeCursor(cursor) = self.0.peek()?;
1404            let (d, t, r) = cursor.get();
1405            if t != time || !refs_eq::<D>(d, data) {
1406                return None;
1407            }
1408            r
1409        };
1410        let cursor = self.pop().expect("checked above");
1411        Some((cursor, r))
1412    }
1413
1414    /// Push a cursor onto the heap.
1415    fn push(&mut self, cursor: Cursor<D>) {
1416        self.0.push(MergeCursor(cursor));
1417    }
1418}
1419
1420/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1421///
1422/// Implements the cursor ordering required for merging cursors.
1423struct MergeCursor<D: Data>(Cursor<D>);
1424
1425impl<D: Data> PartialEq for MergeCursor<D> {
1426    fn eq(&self, other: &Self) -> bool {
1427        self.cmp(other).is_eq()
1428    }
1429}
1430
1431impl<D: Data> Eq for MergeCursor<D> {}
1432
1433impl<D: Data> PartialOrd for MergeCursor<D> {
1434    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1435        Some(self.cmp(other))
1436    }
1437}
1438
1439impl<D: Data> Ord for MergeCursor<D> {
1440    fn cmp(&self, other: &Self) -> Ordering {
1441        let (d1, t1, _) = self.0.get();
1442        let (d2, t2, _) = other.0.get();
1443        (t1, d1).cmp(&(t2, d2)).reverse()
1444    }
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449    use mz_repr::{Diff, Timestamp};
1450
1451    use super::ChainBuilder;
1452
1453    #[mz_ore::test]
1454    fn chain_builder_update_count_matches_items() {
1455        let mut builder = ChainBuilder::<i64>::default();
1456        for i in 0..10_u64 {
1457            let d = i64::try_from(i).expect("fits");
1458            builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
1459        }
1460        let chain = builder.finish();
1461        assert_eq!(chain.update_count, chain.iter().count());
1462    }
1463
1464    /// Push enough updates to cross at least one `mint()` boundary, forcing the
1465    /// `Align` encode -> `from_bytes` decode roundtrip (the spilling path this data
1466    /// structure exists to support), and assert `iter()` roundtrips values, order,
1467    /// and diffs across the spill boundary.
1468    #[mz_ore::test]
1469    fn chain_builder_roundtrips_across_mint_boundary() {
1470        // A single `mint()` fires near the ~2 MiB (`SHIP_WORDS`) serialized boundary. With
1471        // three 8-byte columns per update that's tens of thousands of updates; pushing 200k
1472        // comfortably forces multiple mints.
1473        let count = 200_000_u64;
1474
1475        let mut builder = ChainBuilder::<i64>::default();
1476        for i in 0..count {
1477            let d = i64::try_from(i).expect("fits");
1478            builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
1479        }
1480        let chain = builder.finish();
1481
1482        // At least one chunk must have been minted into `Column::Align` form, otherwise the
1483        // spill path wouldn't be exercised.
1484        let align_chunks = chain
1485            .chunks
1486            .iter()
1487            .filter(|c| matches!(c.data, mz_timely_util::columnar::Column::Align(_)))
1488            .count();
1489        assert!(
1490            align_chunks > 0,
1491            "expected at least one spilled (Align) chunk, got chunks: {:?}",
1492            chain.chunks,
1493        );
1494
1495        // `iter()` must roundtrip every update, in order, with correct diffs.
1496        assert_eq!(chain.update_count, usize::try_from(count).expect("fits"));
1497        let mut expected = 0_u64;
1498        for (d, t, r) in chain.iter() {
1499            assert_eq!(d, i64::try_from(expected).expect("fits"));
1500            assert_eq!(t, Timestamp::new(expected));
1501            assert_eq!(r, Diff::ONE);
1502            expected += 1;
1503        }
1504        assert_eq!(expected, count);
1505    }
1506}