mz_compute/sink/
correction_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//!  * insert new updates
15//!  * advance the compaction frontier (called `since`)
16//!  * obtain an iterator over consolidated updates before some `upper`
17//!  * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of [`Chain`]s containing [`Chunk`]s of stashed updates. Each
32//! [`Chunk`] is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! ```text
36//!       chain[0]   |   chain[1]   |   chain[2]
37//!                  |              |
38//!     chunk[0]     | chunk[0]     | chunk[0]
39//!       (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
40//!       (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
41//!     chunk[1]     | chunk[1]     |
42//!       (c, 1, +1) |   (c, 2, -2) |
43//!       (a, 2, -1) |   (c, 4, -1) |
44//!     chunk[2]     |              |
45//!       (b, 2, +1) |              |
46//!       (c, 2, +1) |              |
47//!     chunk[3]     |              |
48//!       (b, 3, -1) |              |
49//!       (c, 3, +1) |              |
50//! ```
51//!
52//! The "chain invariant" states that each chain has at least [`CHAIN_PROPORTIONALITY`] times as
53//! many chunks as the next one. This means that chain sizes will often be powers of
54//! `CHAIN_PROPORTIONALITY`, but they don't have to be. For example, for a proportionality of 2,
55//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
56//!
57//! Choosing the `CHAIN_PROPORTIONALITY` value allows tuning the trade-off between memory and CPU
58//! resources required to maintain corrections. A higher proportionality forces more frequent chain
59//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
60//!
61//! ## Inserting Updates
62//!
63//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
64//! list until the chain invariant is restored.
65//!
66//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
67//! chunk, copying the update in, and then likely merging with an existing chain to restore the
68//! chain invariant. If updates trickle in in small batches, this can cause a considerable
69//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
70//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
71//! [`Chunk`], they are sorted an inserted into the chains.
72//!
73//! The insert operation has an amortized complexity of O(log N), with N being the current number
74//! of updates stored.
75//!
76//! ## Retrieving Consolidated Updates
77//!
78//! Retrieving consolidated updates before a given `upper` works by first consolidating all updates
79//! at times before the `upper`, merging them all into one chain, then returning an iterator over
80//! that chain.
81//!
82//! Because each chain contains updates ordered by time first, consolidation of all updates before
83//! an `upper` is possible without touching updates at future times. It works by merging the chains
84//! only up to the `upper`, producing a merged chain containing consolidated times before the
85//! `upper` and leaving behind the chain parts containing later times. The complexity of this
86//! operation is O(U log K), with U being the number of updates before `upper` and K the number
87//! of chains.
88//!
89//! Unfortunately, performing consolidation as described above can break the chain invariant and we
90//! might need to restore it by merging chains, including ones containing future updates. This is
91//! something that would be great to fix! In the meantime the hope is that in steady state it
92//! doesn't matter too much because either there are no future retractions and U is approximately
93//! equal to N, or the amount of future retractions is much larger than the amount of current
94//! changes, in which case removing the current changes has a good chance of leaving the chain
95//! invariant intact.
96//!
97//! ## Merging Chains
98//!
99//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
100//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
101//! complexity of a merge of K chains containing N updates is O(N log K).
102//!
103//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
104//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
105//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
106//!
107//! For example, consider these two chains, assuming `since = [2]`:
108//!   chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
109//!   chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
110//! After time advancement, the chains look like this:
111//!   chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
112//!   chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
113//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
114//! neither sorted nor consolidated.
115//!
116//! Instead we need to merge sub-chains, one for each distinct time that's before or at the
117//! `since`. Each of these sub-chains retains the (time, data) ordering after the time advancement
118//! to `since`, so merging those yields the expected result.
119//!
120//! For the above example, the chains we would merge are:
121//!   chain 1.a: [(c, 2, +1)]
122//!   chain 1.b: [(b, 2, -1), (a, 3, -1)]
123//!   chain 2.a: [(b, 2, +1)],
124//!   chain 2.b: [(a, 2, +1), (c, 2, -1)]
125
126use std::borrow::Borrow;
127use std::cmp::Ordering;
128use std::collections::{BinaryHeap, VecDeque};
129use std::fmt;
130use std::rc::Rc;
131
132use differential_dataflow::containers::{Columnation, TimelyStack};
133use differential_dataflow::trace::implementations::BatchContainer;
134use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
135use mz_repr::{Diff, Timestamp};
136use timely::container::SizableContainer;
137use timely::progress::Antichain;
138use timely::{Container, PartialOrder};
139
140use crate::sink::correction::LengthAndCapacity;
141
142/// Determines the size factor of subsequent chains required by the chain invariant.
143const CHAIN_PROPORTIONALITY: usize = 3;
144
145/// Convenient alias for use in data trait bounds.
146pub trait Data: differential_dataflow::Data + Columnation {}
147impl<D: differential_dataflow::Data + Columnation> Data for D {}
148
149/// A data structure used to store corrections in the MV sink implementation.
150///
151/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
152/// allowing their memory to be transparently spilled to disk.
153#[derive(Debug)]
154pub(super) struct CorrectionV2<D: Data> {
155    /// Chains containing sorted updates.
156    chains: Vec<Chain<D>>,
157    /// A staging area for updates, to speed up small inserts.
158    stage: Stage<D>,
159    /// The frontier by which all contained times are advanced.
160    since: Antichain<Timestamp>,
161
162    /// Total length and capacity of chunks in `chains`.
163    ///
164    /// Tracked to maintain metrics.
165    total_size: LengthAndCapacity,
166    /// Global persist sink metrics.
167    metrics: SinkMetrics,
168    /// Per-worker persist sink metrics.
169    worker_metrics: SinkWorkerMetrics,
170}
171
172impl<D: Data> CorrectionV2<D> {
173    /// Construct a new [`CorrectionV2`] instance.
174    pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
175        Self {
176            chains: Default::default(),
177            stage: Default::default(),
178            since: Antichain::from_elem(Timestamp::MIN),
179            total_size: Default::default(),
180            metrics,
181            worker_metrics,
182        }
183    }
184
185    /// Insert a batch of updates.
186    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
187        let Some(since_ts) = self.since.as_option() else {
188            // If the since is the empty frontier, discard all updates.
189            updates.clear();
190            return;
191        };
192
193        for (_, time, _) in &mut *updates {
194            *time = std::cmp::max(*time, *since_ts);
195        }
196
197        self.insert_inner(updates);
198    }
199
200    /// Insert a batch of updates, after negating their diffs.
201    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
202        let Some(since_ts) = self.since.as_option() else {
203            // If the since is the empty frontier, discard all updates.
204            updates.clear();
205            return;
206        };
207
208        for (_, time, diff) in &mut *updates {
209            *time = std::cmp::max(*time, *since_ts);
210            *diff = -*diff;
211        }
212
213        self.insert_inner(updates);
214    }
215
216    /// Insert a batch of updates.
217    ///
218    /// All times are expected to be >= the `since`.
219    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
220        debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
221
222        if let Some(chain) = self.stage.insert(updates) {
223            self.chains.push(chain);
224
225            // Restore the chain invariant.
226            let merge_needed = |chains: &[Chain<_>]| match chains {
227                [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(),
228                _ => false,
229            };
230
231            while merge_needed(&self.chains) {
232                let a = self.chains.pop().unwrap();
233                let b = self.chains.pop().unwrap();
234                let merged = merge_chains([a, b], &self.since);
235                self.chains.push(merged);
236            }
237        };
238
239        self.update_metrics();
240    }
241
242    /// Return consolidated updates before the given `upper`.
243    pub fn updates_before<'a>(
244        &'a mut self,
245        upper: &Antichain<Timestamp>,
246    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + 'a {
247        let mut result = None;
248
249        if !PartialOrder::less_than(&self.since, upper) {
250            // All contained updates are beyond the upper.
251            return result.into_iter().flatten();
252        }
253
254        self.consolidate_before(upper);
255
256        // There is at most one chain that contains updates before `upper` now.
257        result = self
258            .chains
259            .iter()
260            .find(|c| c.first().is_some_and(|(_, t, _)| !upper.less_equal(&t)))
261            .map(move |c| {
262                let upper = upper.clone();
263                c.iter().take_while(move |(_, t, _)| !upper.less_equal(t))
264            });
265
266        result.into_iter().flatten()
267    }
268
269    /// Consolidate all updates before the given `upper`.
270    ///
271    /// Once this method returns, all remaining updates before `upper` are contained in a single
272    /// chain. Note that this chain might also contain updates beyond `upper` though!
273    fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
274        if self.chains.is_empty() && self.stage.is_empty() {
275            return;
276        }
277
278        let mut chains = std::mem::take(&mut self.chains);
279        chains.extend(self.stage.flush());
280
281        if chains.is_empty() {
282            // We can only get here if the stage contained updates but they all got consolidated
283            // away by `flush`, so we need to update the metrics before we return.
284            self.update_metrics();
285            return;
286        }
287
288        let (merged, remains) = merge_chains_up_to(chains, &self.since, upper);
289
290        self.chains = remains;
291        if !merged.is_empty() {
292            // We put the merged chain at the end, assuming that its contents are likely to
293            // consolidate with retractions that will arrive soon.
294            self.chains.push(merged);
295        }
296
297        // Restore the chain invariant.
298        //
299        // This part isn't great. We've taken great care so far to only look at updates with times
300        // before `upper`, but now we might end up merging all chains anyway in the worst case.
301        // There might be something smarter we could do to avoid merging as much as possible. For
302        // example, we could consider sorting chains by length first, or inspect the contained
303        // times and prefer merging chains that have a chance at consolidating with one another.
304        let mut i = self.chains.len().saturating_sub(1);
305        while i > 0 {
306            let needs_merge = self.chains.get(i).is_some_and(|a| {
307                let b = &self.chains[i - 1];
308                a.len() * CHAIN_PROPORTIONALITY > b.len()
309            });
310            if needs_merge {
311                let a = self.chains.remove(i);
312                let b = std::mem::take(&mut self.chains[i - 1]);
313                let merged = merge_chains([a, b], &self.since);
314                self.chains[i - 1] = merged;
315            } else {
316                // Only advance the index if we didn't merge. A merge can reduce the size of the
317                // chain at `i - 1`, causing an violation of the chain invariant with the next
318                // chain, so we might need to merge the two before proceeding to lower indexes.
319                i -= 1;
320            }
321        }
322
323        self.update_metrics();
324    }
325
326    /// Advance the since frontier.
327    ///
328    /// # Panics
329    ///
330    /// Panics if the given `since` is less than the current since frontier.
331    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
332        assert!(PartialOrder::less_equal(&self.since, &since));
333        self.stage.advance_times(&since);
334        self.since = since;
335    }
336
337    /// Consolidate all updates at the current `since`.
338    pub fn consolidate_at_since(&mut self) {
339        let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
340        if let Some(upper_ts) = upper_ts {
341            let upper = Antichain::from_elem(upper_ts);
342            self.consolidate_before(&upper);
343        }
344    }
345
346    /// Update persist sink metrics.
347    fn update_metrics(&mut self) {
348        let mut new_size = self.stage.get_size();
349        for chain in &mut self.chains {
350            new_size += chain.get_size();
351        }
352
353        let old_size = self.total_size;
354        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
355        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
356        self.metrics
357            .report_correction_update_deltas(len_delta, cap_delta);
358        self.worker_metrics
359            .report_correction_update_totals(new_size.length, new_size.capacity);
360
361        self.total_size = new_size;
362    }
363}
364
365/// A chain of [`Chunk`]s containing updates.
366///
367/// All updates in a chain are sorted by (time, data) and consolidated.
368///
369/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
370/// keeping around empty chains.
371#[derive(Debug)]
372struct Chain<D: Data> {
373    /// The contained chunks.
374    chunks: Vec<Chunk<D>>,
375    /// Cached value of the current chain size, for efficient updating of metrics.
376    cached_size: Option<LengthAndCapacity>,
377}
378
379impl<D: Data> Default for Chain<D> {
380    fn default() -> Self {
381        Self {
382            chunks: Default::default(),
383            cached_size: None,
384        }
385    }
386}
387
388impl<D: Data> Chain<D> {
389    /// Return whether the chain is empty.
390    fn is_empty(&self) -> bool {
391        self.chunks.is_empty()
392    }
393
394    /// Return the length of the chain, in chunks.
395    fn len(&self) -> usize {
396        self.chunks.len()
397    }
398
399    /// Push an update onto the chain.
400    ///
401    /// The update must sort after all updates already in the chain, in (time, data)-order, to
402    /// ensure the chain remains sorted.
403    fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
404        let (d, t, r) = update;
405        let update = (d.borrow(), t, r);
406
407        debug_assert!(self.can_accept(update));
408
409        match self.chunks.last_mut() {
410            Some(c) if !c.at_capacity() => c.push(update),
411            Some(_) | None => {
412                let chunk = Chunk::from_update(update);
413                self.push_chunk(chunk);
414            }
415        }
416
417        self.invalidate_cached_size();
418    }
419
420    /// Push a chunk onto the chain.
421    ///
422    /// All updates in the chunk must sort after all updates already in the chain, in
423    /// (time, data)-order, to ensure the chain remains sorted.
424    fn push_chunk(&mut self, chunk: Chunk<D>) {
425        debug_assert!(self.can_accept(chunk.first()));
426
427        self.chunks.push(chunk);
428        self.invalidate_cached_size();
429    }
430
431    /// Push the updates produced by a cursor onto the chain.
432    ///
433    /// All updates produced by the cursor must sort after all updates already in the chain, in
434    /// (time, data)-order, to ensure the chain remains sorted.
435    fn push_cursor(&mut self, cursor: Cursor<D>) {
436        let mut rest = Some(cursor);
437        while let Some(cursor) = rest.take() {
438            let update = cursor.get();
439            self.push(update);
440            rest = cursor.step();
441        }
442    }
443
444    /// Return whether the chain can accept the given update.
445    ///
446    /// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
447    fn can_accept(&self, update: (&D, Timestamp, Diff)) -> bool {
448        self.last().is_none_or(|(dc, tc, _)| {
449            let (d, t, _) = update;
450            (tc, dc) < (t, d)
451        })
452    }
453
454    /// Return the first update in the chain, if any.
455    fn first(&self) -> Option<(&D, Timestamp, Diff)> {
456        self.chunks.first().map(|c| c.first())
457    }
458
459    /// Return the last update in the chain, if any.
460    fn last(&self) -> Option<(&D, Timestamp, Diff)> {
461        self.chunks.last().map(|c| c.last())
462    }
463
464    /// Convert the chain into a cursor over the contained updates.
465    fn into_cursor(self) -> Option<Cursor<D>> {
466        let chunks = self.chunks.into_iter().map(Rc::new).collect();
467        Cursor::new(chunks)
468    }
469
470    /// Return an iterator over the contained updates.
471    fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
472        self.chunks
473            .iter()
474            .flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
475    }
476
477    /// Return the size of the chain, for use in metrics.
478    fn get_size(&mut self) -> LengthAndCapacity {
479        // This operation can be expensive as it requires inspecting the individual chunks and
480        // their backing regions. We thus cache the result to hopefully avoid the cost most of the
481        // time.
482        if self.cached_size.is_none() {
483            let mut size = LengthAndCapacity::default();
484            for chunk in &mut self.chunks {
485                size += chunk.get_size();
486            }
487            self.cached_size = Some(size);
488        }
489
490        self.cached_size.unwrap()
491    }
492
493    /// Invalidate the cached chain size.
494    ///
495    /// This method must be called every time the size of the chain changed.
496    fn invalidate_cached_size(&mut self) {
497        self.cached_size = None;
498    }
499}
500
501impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
502    fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
503        for update in iter {
504            self.push(update);
505        }
506    }
507}
508
509/// A cursor over updates in a chain.
510///
511/// A cursor provides two guarantees:
512///  * Produced updates are ordered and consolidated.
513///  * A cursor always yields at least one update.
514///
515/// The second guarantee is enforced through the type system: Every method that steps a cursor
516/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
517/// over the last update.
518///
519/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
520/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
521/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
522#[derive(Clone, Debug)]
523struct Cursor<D: Data> {
524    /// The chunks from which updates can still be produced.
525    chunks: VecDeque<Rc<Chunk<D>>>,
526    /// The current offset into `chunks.front()`.
527    chunk_offset: usize,
528    /// An optional limit for the number of updates the cursor will produce.
529    limit: Option<usize>,
530    /// An optional overwrite for the timestamp of produced updates.
531    overwrite_ts: Option<Timestamp>,
532}
533
534impl<D: Data> Cursor<D> {
535    /// Construct a cursor over a list of chunks.
536    ///
537    /// Returns `None` if `chunks` is empty.
538    fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
539        if chunks.is_empty() {
540            return None;
541        }
542
543        Some(Self {
544            chunks,
545            chunk_offset: 0,
546            limit: None,
547            overwrite_ts: None,
548        })
549    }
550
551    /// Set a limit for the number of updates this cursor will produce.
552    ///
553    /// # Panics
554    ///
555    /// Panics if there is already a limit lower than the new one.
556    fn set_limit(mut self, limit: usize) -> Option<Self> {
557        assert!(self.limit.is_none_or(|l| l >= limit));
558
559        if limit == 0 {
560            return None;
561        }
562
563        // Release chunks made unreachable by the limit.
564        let mut count = 0;
565        let mut idx = 0;
566        let mut offset = self.chunk_offset;
567        while idx < self.chunks.len() && count < limit {
568            let chunk = &self.chunks[idx];
569            count += chunk.len() - offset;
570            idx += 1;
571            offset = 0;
572        }
573        self.chunks.truncate(idx);
574
575        if count > limit {
576            self.limit = Some(limit);
577        }
578
579        Some(self)
580    }
581
582    /// Get a reference to the current update.
583    fn get(&self) -> (&D, Timestamp, Diff) {
584        let chunk = self.get_chunk();
585        let (d, t, r) = chunk.index(self.chunk_offset);
586        let t = self.overwrite_ts.unwrap_or(t);
587        (d, t, r)
588    }
589
590    /// Get a reference to the current chunk.
591    fn get_chunk(&self) -> &Chunk<D> {
592        &self.chunks[0]
593    }
594
595    /// Step to the next update.
596    ///
597    /// Returns the stepped cursor, or `None` if the step was over the last update.
598    fn step(mut self) -> Option<Self> {
599        if self.chunk_offset == self.get_chunk().len() - 1 {
600            return self.skip_chunk().map(|(c, _)| c);
601        }
602
603        self.chunk_offset += 1;
604
605        if let Some(limit) = &mut self.limit {
606            *limit -= 1;
607            if *limit == 0 {
608                return None;
609            }
610        }
611
612        Some(self)
613    }
614
615    /// Skip the remainder of the current chunk.
616    ///
617    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
618    /// left after the skip.
619    fn skip_chunk(mut self) -> Option<(Self, usize)> {
620        let chunk = self.chunks.pop_front().expect("cursor invariant");
621
622        if self.chunks.is_empty() {
623            return None;
624        }
625
626        let skipped = chunk.len() - self.chunk_offset;
627        self.chunk_offset = 0;
628
629        if let Some(limit) = &mut self.limit {
630            if skipped >= *limit {
631                return None;
632            }
633            *limit -= skipped;
634        }
635
636        Some((self, skipped))
637    }
638
639    /// Skip all updates with times <= the given time.
640    ///
641    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
642    /// left after the skip.
643    fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
644        if self.overwrite_ts.is_some_and(|ts| ts <= time) {
645            return None;
646        } else if self.get().1 > time {
647            return Some((self, 0));
648        }
649
650        let mut skipped = 0;
651
652        let new_offset = loop {
653            let chunk = self.get_chunk();
654            if let Some(index) = chunk.find_time_greater_than(time) {
655                break index;
656            }
657
658            let (cursor, count) = self.skip_chunk()?;
659            self = cursor;
660            skipped += count;
661        };
662
663        skipped += new_offset - self.chunk_offset;
664        self.chunk_offset = new_offset;
665
666        Some((self, skipped))
667    }
668
669    /// Advance all updates in this cursor by the given `since_ts`.
670    ///
671    /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
672    /// been advanced by `since_ts`.
673    fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
674        // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
675        // only need to advance the `overwrite_ts` by the `since_ts`.
676        if let Some(ts) = self.overwrite_ts {
677            if ts < since_ts {
678                self.overwrite_ts = Some(since_ts);
679            }
680            return vec![self];
681        }
682
683        // Otherwise we need to split the cursor so that each new cursor only yields runs of
684        // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
685        // this by splitting the cursor at each time <= `since_ts`.
686        let mut splits = Vec::new();
687        let mut remaining = Some(self);
688
689        while let Some(cursor) = remaining.take() {
690            let (_, time, _) = cursor.get();
691            if time >= since_ts {
692                splits.push(cursor);
693                break;
694            }
695
696            let mut current = cursor.clone();
697            if let Some((cursor, skipped)) = cursor.skip_time(time) {
698                remaining = Some(cursor);
699                current = current.set_limit(skipped).expect("skipped at least 1");
700            }
701            current.overwrite_ts = Some(since_ts);
702            splits.push(current);
703        }
704
705        splits
706    }
707
708    /// Split the cursor at the given time.
709    ///
710    /// Returns two cursors, the first yielding all updates at times < `time`, the second yielding
711    /// all updates at times >= `time`. Both can be `None` if they would be empty.
712    fn split_at_time(self, time: Timestamp) -> (Option<Self>, Option<Self>) {
713        let Some(skip_ts) = time.step_back() else {
714            return (None, Some(self));
715        };
716
717        let before = self.clone();
718        match self.skip_time(skip_ts) {
719            Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
720            None => (Some(before), None),
721        }
722    }
723
724    /// Attempt to unwrap the cursor into a [`Chain`].
725    ///
726    /// This operation efficiently reuses chunks by directly inserting them into the output chain
727    /// where possible.
728    ///
729    /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
730    /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
731    /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
732    /// chain by copying chunks rather than reusing them.
733    fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
734        if self.limit.is_some() {
735            return Err(("cursor with limit", self));
736        }
737        if self.overwrite_ts.is_some() {
738            return Err(("cursor with overwrite_ts", self));
739        }
740        if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
741            return Err(("cursor on shared chunks", self));
742        }
743
744        let mut chain = Chain::default();
745        let mut remaining = Some(self);
746
747        // We might be partway through the first chunk, in which case we can't reuse it but need to
748        // allocate a new one to contain only the updates the cursor can still yield.
749        while let Some(cursor) = remaining.take() {
750            if cursor.chunk_offset == 0 {
751                remaining = Some(cursor);
752                break;
753            }
754            let update = cursor.get();
755            chain.push(update);
756            remaining = cursor.step();
757        }
758
759        if let Some(cursor) = remaining {
760            for chunk in cursor.chunks {
761                let chunk = Rc::into_inner(chunk).expect("checked above");
762                chain.push_chunk(chunk);
763            }
764        }
765
766        Ok(chain)
767    }
768}
769
770impl<D: Data> From<Cursor<D>> for Chain<D> {
771    fn from(cursor: Cursor<D>) -> Self {
772        match cursor.try_unwrap() {
773            Ok(chain) => chain,
774            Err((_, cursor)) => {
775                let mut chain = Chain::default();
776                chain.push_cursor(cursor);
777                chain
778            }
779        }
780    }
781}
782
783/// A non-empty chunk of updates, backed by a columnation region.
784///
785/// All updates in a chunk are sorted by (time, data) and consolidated.
786///
787/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
788/// re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API doesn't
789/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
790/// spirit.
791struct Chunk<D: Data> {
792    /// The contained updates.
793    data: TimelyStack<(D, Timestamp, Diff)>,
794    /// Cached value of the current chunk size, for efficient updating of metrics.
795    cached_size: Option<LengthAndCapacity>,
796}
797
798impl<D: Data> Default for Chunk<D> {
799    fn default() -> Self {
800        let mut data = TimelyStack::default();
801        data.ensure_capacity(&mut None);
802
803        Self {
804            data,
805            cached_size: None,
806        }
807    }
808}
809
810impl<D: Data> fmt::Debug for Chunk<D> {
811    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812        write!(f, "Chunk(<{}>)", self.len())
813    }
814}
815
816impl<D: Data> Chunk<D> {
817    /// Create a new chunk containing a single update.
818    fn from_update<DT: Borrow<D>>(update: (DT, Timestamp, Diff)) -> Self {
819        let (d, t, r) = update;
820
821        let mut chunk = Self::default();
822        chunk.data.copy_destructured(d.borrow(), &t, &r);
823
824        chunk
825    }
826
827    /// Return the number of updates in the chunk.
828    fn len(&self) -> usize {
829        Container::len(&self.data)
830    }
831
832    /// Return the (local) capacity of the chunk.
833    fn capacity(&self) -> usize {
834        self.data.capacity()
835    }
836
837    /// Return whether the chunk is at capacity.
838    fn at_capacity(&self) -> bool {
839        self.data.at_capacity()
840    }
841
842    /// Return the update at the given index.
843    ///
844    /// # Panics
845    ///
846    /// Panics if the given index is not populated.
847    fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
848        let (d, t, r) = self.data.index(idx);
849        (d, *t, *r)
850    }
851
852    /// Return the first update in the chunk.
853    fn first(&self) -> (&D, Timestamp, Diff) {
854        self.index(0)
855    }
856
857    /// Return the last update in the chunk.
858    fn last(&self) -> (&D, Timestamp, Diff) {
859        self.index(self.len() - 1)
860    }
861
862    /// Push an update onto the chunk.
863    fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
864        let (d, t, r) = update;
865        self.data.copy_destructured(d.borrow(), &t, &r);
866
867        self.invalidate_cached_size();
868    }
869
870    /// Return the index of the first update at a time greater than `time`, or `None` if no such
871    /// update exists.
872    fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
873        if self.last().1 <= time {
874            return None;
875        }
876
877        let mut lower = 0;
878        let mut upper = self.len();
879        while lower < upper {
880            let idx = (lower + upper) / 2;
881            if self.index(idx).1 > time {
882                upper = idx;
883            } else {
884                lower = idx + 1;
885            }
886        }
887
888        Some(lower)
889    }
890
891    /// Return the size of the chunk, for use in metrics.
892    fn get_size(&mut self) -> LengthAndCapacity {
893        if self.cached_size.is_none() {
894            let length = Container::len(&self.data);
895            let mut capacity = 0;
896            self.data.heap_size(|_, cap| capacity += cap);
897            self.cached_size = Some(LengthAndCapacity { length, capacity });
898        }
899
900        self.cached_size.unwrap()
901    }
902
903    /// Invalidate the cached chunk size.
904    ///
905    /// This method must be called every time the size of the chunk changed.
906    fn invalidate_cached_size(&mut self) {
907        self.cached_size = None;
908    }
909}
910
911/// A buffer for staging updates before they are inserted into the sorted chains.
912#[derive(Debug)]
913struct Stage<D> {
914    /// The contained updates.
915    ///
916    /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
917    data: Vec<(D, Timestamp, Diff)>,
918}
919
920impl<D: Data> Default for Stage<D> {
921    fn default() -> Self {
922        // Make sure that the `Stage` has the same capacity as a `Chunk`.
923        let chunk = Chunk::<D>::default();
924        let data = Vec::with_capacity(chunk.capacity());
925        Self { data }
926    }
927}
928
929impl<D: Data> Stage<D> {
930    fn is_empty(&self) -> bool {
931        self.data.is_empty()
932    }
933
934    /// Insert a batch of updates, possibly producing a ready [`Chain`].
935    fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
936        if updates.is_empty() {
937            return None;
938        }
939
940        // Determine how many chunks we can fill with the available updates.
941        let update_count = self.data.len() + updates.len();
942        let chunk_size = self.data.capacity();
943        let chunk_count = update_count / chunk_size;
944
945        let mut new_updates = updates.drain(..);
946
947        // If we have enough shipable updates, collect them, consolidate, and build a chain.
948        let maybe_chain = if chunk_count > 0 {
949            let ship_count = chunk_count * chunk_size;
950            let mut buffer = Vec::with_capacity(ship_count);
951
952            buffer.append(&mut self.data);
953            while buffer.len() < ship_count {
954                let update = new_updates.next().unwrap();
955                buffer.push(update);
956            }
957
958            consolidate(&mut buffer);
959
960            let mut chain = Chain::default();
961            chain.extend(buffer);
962            Some(chain)
963        } else {
964            None
965        };
966
967        // Stage the remaining updates.
968        self.data.extend(new_updates);
969
970        maybe_chain
971    }
972
973    /// Flush all currently staged updates into a chain.
974    fn flush(&mut self) -> Option<Chain<D>> {
975        consolidate(&mut self.data);
976
977        if self.data.is_empty() {
978            return None;
979        }
980
981        let mut chain = Chain::default();
982        chain.extend(self.data.drain(..));
983        Some(chain)
984    }
985
986    /// Advance the times of staged updates by the given `since`.
987    fn advance_times(&mut self, since: &Antichain<Timestamp>) {
988        let Some(since_ts) = since.as_option() else {
989            // If the since is the empty frontier, discard all updates.
990            self.data.clear();
991            return;
992        };
993
994        for (_, time, _) in &mut self.data {
995            *time = std::cmp::max(*time, *since_ts);
996        }
997    }
998
999    /// Return the size of the stage, for use in metrics.
1000    fn get_size(&self) -> LengthAndCapacity {
1001        LengthAndCapacity {
1002            length: self.data.len(),
1003            capacity: self.data.capacity(),
1004        }
1005    }
1006}
1007
1008/// Sort and consolidate the given list of updates.
1009///
1010/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1011/// except that it sorts updates by (time, data) instead of (data, time).
1012fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1013    if updates.len() <= 1 {
1014        return;
1015    }
1016
1017    let diff = |update: &(_, _, Diff)| update.2;
1018
1019    updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1020
1021    let mut offset = 0;
1022    let mut accum = diff(&updates[0]);
1023
1024    for idx in 1..updates.len() {
1025        let this = &updates[idx];
1026        let prev = &updates[idx - 1];
1027        if this.0 == prev.0 && this.1 == prev.1 {
1028            accum += diff(&updates[idx]);
1029        } else {
1030            if accum != Diff::ZERO {
1031                updates.swap(offset, idx - 1);
1032                updates[offset].2 = accum;
1033                offset += 1;
1034            }
1035            accum = diff(&updates[idx]);
1036        }
1037    }
1038
1039    if accum != Diff::ZERO {
1040        let len = updates.len();
1041        updates.swap(offset, len - 1);
1042        updates[offset].2 = accum;
1043        offset += 1;
1044    }
1045
1046    updates.truncate(offset);
1047}
1048
1049/// Merge the given chains, advancing times by the given `since` in the process.
1050fn merge_chains<D: Data>(
1051    chains: impl IntoIterator<Item = Chain<D>>,
1052    since: &Antichain<Timestamp>,
1053) -> Chain<D> {
1054    let Some(&since_ts) = since.as_option() else {
1055        return Chain::default();
1056    };
1057
1058    let mut to_merge = Vec::new();
1059    for chain in chains {
1060        if let Some(cursor) = chain.into_cursor() {
1061            let mut runs = cursor.advance_by(since_ts);
1062            to_merge.append(&mut runs);
1063        }
1064    }
1065
1066    merge_cursors(to_merge)
1067}
1068
1069/// Merge the given chains, advancing times by the given `since` in the process, but only up to the
1070/// given `upper`.
1071///
1072/// Returns the merged chain and a list of non-empty remainders of the input chains.
1073fn merge_chains_up_to<D: Data>(
1074    chains: Vec<Chain<D>>,
1075    since: &Antichain<Timestamp>,
1076    upper: &Antichain<Timestamp>,
1077) -> (Chain<D>, Vec<Chain<D>>) {
1078    let Some(&since_ts) = since.as_option() else {
1079        return (Chain::default(), Vec::new());
1080    };
1081    let Some(&upper_ts) = upper.as_option() else {
1082        let merged = merge_chains(chains, since);
1083        return (merged, Vec::new());
1084    };
1085
1086    if since_ts >= upper_ts {
1087        // After advancing by `since` there will be no updates before `upper`.
1088        return (Chain::default(), chains);
1089    }
1090
1091    let mut to_merge = Vec::new();
1092    let mut to_keep = Vec::new();
1093    for chain in chains {
1094        if let Some(cursor) = chain.into_cursor() {
1095            let mut runs = cursor.advance_by(since_ts);
1096            if let Some(last) = runs.pop() {
1097                let (before, beyond) = last.split_at_time(upper_ts);
1098                before.map(|c| runs.push(c));
1099                beyond.map(|c| to_keep.push(c));
1100            }
1101            to_merge.append(&mut runs);
1102        }
1103    }
1104
1105    let merged = merge_cursors(to_merge);
1106    let remains = to_keep
1107        .into_iter()
1108        .map(|c| c.try_unwrap().expect("unwrapable"))
1109        .collect();
1110
1111    (merged, remains)
1112}
1113
1114/// Merge the given cursors into one chain.
1115fn merge_cursors<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
1116    match cursors.len() {
1117        0 => Chain::default(),
1118        1 => {
1119            let [cur] = cursors.try_into().unwrap();
1120            Chain::from(cur)
1121        }
1122        2 => {
1123            let [a, b] = cursors.try_into().unwrap();
1124            merge_2(a, b)
1125        }
1126        _ => merge_many(cursors),
1127    }
1128}
1129
1130/// Merge the given two cursors using a 2-way merge.
1131///
1132/// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
1133fn merge_2<D: Data>(cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
1134    let mut rest1 = Some(cursor1);
1135    let mut rest2 = Some(cursor2);
1136    let mut merged = Chain::default();
1137
1138    loop {
1139        match (rest1, rest2) {
1140            (Some(c1), Some(c2)) => {
1141                let (d1, t1, r1) = c1.get();
1142                let (d2, t2, r2) = c2.get();
1143
1144                match (t1, d1).cmp(&(t2, d2)) {
1145                    Ordering::Less => {
1146                        merged.push((d1, t1, r1));
1147                        rest1 = c1.step();
1148                        rest2 = Some(c2);
1149                    }
1150                    Ordering::Greater => {
1151                        merged.push((d2, t2, r2));
1152                        rest1 = Some(c1);
1153                        rest2 = c2.step();
1154                    }
1155                    Ordering::Equal => {
1156                        let r = r1 + r2;
1157                        if r != Diff::ZERO {
1158                            merged.push((d1, t1, r));
1159                        }
1160                        rest1 = c1.step();
1161                        rest2 = c2.step();
1162                    }
1163                }
1164            }
1165            (Some(c), None) | (None, Some(c)) => {
1166                merged.push_cursor(c);
1167                break;
1168            }
1169            (None, None) => break,
1170        }
1171    }
1172
1173    merged
1174}
1175
1176/// Merge the given cursors using a k-way merge with a binary heap.
1177fn merge_many<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
1178    let mut heap = MergeHeap::from_iter(cursors);
1179    let mut merged = Chain::default();
1180    while let Some(cursor1) = heap.pop() {
1181        let (data, time, mut diff) = cursor1.get();
1182
1183        while let Some((cursor2, r)) = heap.pop_equal(data, time) {
1184            diff += r;
1185            if let Some(cursor2) = cursor2.step() {
1186                heap.push(cursor2);
1187            }
1188        }
1189
1190        if diff != Diff::ZERO {
1191            merged.push((data, time, diff));
1192        }
1193        if let Some(cursor1) = cursor1.step() {
1194            heap.push(cursor1);
1195        }
1196    }
1197
1198    merged
1199}
1200
1201/// A binary heap specialized for merging [`Cursor`]s.
1202struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1203
1204impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1205    fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1206        let inner = cursors.into_iter().map(MergeCursor).collect();
1207        Self(inner)
1208    }
1209}
1210
1211impl<D: Data> MergeHeap<D> {
1212    /// Pop the next cursor (the one yielding the least update) from the heap.
1213    fn pop(&mut self) -> Option<Cursor<D>> {
1214        self.0.pop().map(|MergeCursor(c)| c)
1215    }
1216
1217    /// Pop the next cursor from the heap, provided the data and time of its current update are
1218    /// equal to the given values.
1219    ///
1220    /// Returns both the cursor and the diff corresponding to `data` and `time`.
1221    fn pop_equal(&mut self, data: &D, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1222        let MergeCursor(cursor) = self.0.peek()?;
1223        let (d, t, r) = cursor.get();
1224        if d == data && t == time {
1225            let cursor = self.pop().expect("checked above");
1226            Some((cursor, r))
1227        } else {
1228            None
1229        }
1230    }
1231
1232    /// Push a cursor onto the heap.
1233    fn push(&mut self, cursor: Cursor<D>) {
1234        self.0.push(MergeCursor(cursor));
1235    }
1236}
1237
1238/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1239///
1240/// Implements the cursor ordering required for merging cursors.
1241struct MergeCursor<D: Data>(Cursor<D>);
1242
1243impl<D: Data> PartialEq for MergeCursor<D> {
1244    fn eq(&self, other: &Self) -> bool {
1245        self.cmp(other).is_eq()
1246    }
1247}
1248
1249impl<D: Data> Eq for MergeCursor<D> {}
1250
1251impl<D: Data> PartialOrd for MergeCursor<D> {
1252    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1253        Some(self.cmp(other))
1254    }
1255}
1256
1257impl<D: Data> Ord for MergeCursor<D> {
1258    fn cmp(&self, other: &Self) -> Ordering {
1259        let (d1, t1, _) = self.0.get();
1260        let (d2, t2, _) = other.0.get();
1261        (t1, d1).cmp(&(t2, d2)).reverse()
1262    }
1263}