Skip to main content

mz_compute/sink/
correction_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//!  * insert new updates
15//!  * advance the compaction frontier (called `since`)
16//!  * obtain an iterator over consolidated updates before some `upper`
17//!  * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of [`Chain`]s containing [`Chunk`]s of stashed updates. Each
32//! [`Chunk`] is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! ```text
36//!       chain[0]   |   chain[1]   |   chain[2]
37//!                  |              |
38//!     chunk[0]     | chunk[0]     | chunk[0]
39//!       (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
40//!       (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
41//!     chunk[1]     | chunk[1]     |
42//!       (c, 1, +1) |   (c, 2, -2) |
43//!       (a, 2, -1) |   (c, 4, -1) |
44//!     chunk[2]     |              |
45//!       (b, 2, +1) |              |
46//!       (c, 2, +1) |              |
47//!     chunk[3]     |              |
48//!       (b, 3, -1) |              |
49//!       (c, 3, +1) |              |
50//! ```
51//!
52//! The "chain invariant" states that each chain has at least [`CHAIN_PROPORTIONALITY`] times as
53//! many chunks as the next one. This means that chain sizes will often be powers of
54//! `CHAIN_PROPORTIONALITY`, but they don't have to be. For example, for a proportionality of 2,
55//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
56//!
57//! Choosing the `CHAIN_PROPORTIONALITY` value allows tuning the trade-off between memory and CPU
58//! resources required to maintain corrections. A higher proportionality forces more frequent chain
59//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
60//!
61//! ## Inserting Updates
62//!
63//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
64//! list until the chain invariant is restored.
65//!
66//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
67//! chunk, copying the update in, and then likely merging with an existing chain to restore the
68//! chain invariant. If updates trickle in in small batches, this can cause a considerable
69//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
70//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
71//! [`Chunk`], they are sorted an inserted into the chains.
72//!
73//! The insert operation has an amortized complexity of O(log N), with N being the current number
74//! of updates stored.
75//!
76//! ## Retrieving Consolidated Updates
77//!
78//! Retrieving consolidated updates before a given `upper` works by first consolidating all updates
79//! at times before the `upper`, merging them all into one chain, then returning an iterator over
80//! that chain.
81//!
82//! Because each chain contains updates ordered by time first, consolidation of all updates before
83//! an `upper` is possible without touching updates at future times. It works by merging the chains
84//! only up to the `upper`, producing a merged chain containing consolidated times before the
85//! `upper` and leaving behind the chain parts containing later times. The complexity of this
86//! operation is O(U log K), with U being the number of updates before `upper` and K the number
87//! of chains.
88//!
89//! Unfortunately, performing consolidation as described above can break the chain invariant and we
90//! might need to restore it by merging chains, including ones containing future updates. This is
91//! something that would be great to fix! In the meantime the hope is that in steady state it
92//! doesn't matter too much because either there are no future retractions and U is approximately
93//! equal to N, or the amount of future retractions is much larger than the amount of current
94//! changes, in which case removing the current changes has a good chance of leaving the chain
95//! invariant intact.
96//!
97//! ## Merging Chains
98//!
99//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
100//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
101//! complexity of a merge of K chains containing N updates is O(N log K).
102//!
103//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
104//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
105//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
106//!
107//! For example, consider these two chains, assuming `since = [2]`:
108//!   chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
109//!   chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
110//! After time advancement, the chains look like this:
111//!   chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
112//!   chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
113//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
114//! neither sorted nor consolidated.
115//!
116//! Instead we need to merge sub-chains, one for each distinct time that's before or at the
117//! `since`. Each of these sub-chains retains the (time, data) ordering after the time advancement
118//! to `since`, so merging those yields the expected result.
119//!
120//! For the above example, the chains we would merge are:
121//!   chain 1.a: [(c, 2, +1)]
122//!   chain 1.b: [(b, 2, -1), (a, 3, -1)]
123//!   chain 2.a: [(b, 2, +1)],
124//!   chain 2.b: [(a, 2, +1), (c, 2, -1)]
125
126use std::borrow::Borrow;
127use std::cmp::Ordering;
128use std::collections::{BinaryHeap, VecDeque};
129use std::fmt;
130use std::rc::Rc;
131
132use differential_dataflow::containers::{Columnation, TimelyStack};
133use differential_dataflow::trace::implementations::BatchContainer;
134use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
135use mz_repr::{Diff, Timestamp};
136use timely::PartialOrder;
137use timely::container::SizableContainer;
138use timely::progress::Antichain;
139
140use crate::sink::correction::{Logging, SizeMetrics};
141
142/// Determines the size factor of subsequent chains required by the chain invariant.
143const CHAIN_PROPORTIONALITY: usize = 3;
144
145/// Convenient alias for use in data trait bounds.
146pub trait Data: differential_dataflow::Data + Columnation {}
147impl<D: differential_dataflow::Data + Columnation> Data for D {}
148
149/// A data structure used to store corrections in the MV sink implementation.
150///
151/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
152/// allowing their memory to be transparently spilled to disk.
153#[derive(Debug)]
154pub(super) struct CorrectionV2<D: Data> {
155    /// Chains containing sorted updates.
156    chains: Vec<Chain<D>>,
157    /// A staging area for updates, to speed up small inserts.
158    stage: Stage<D>,
159    /// The frontier by which all contained times are advanced.
160    since: Antichain<Timestamp>,
161
162    /// Total count of updates in the correction buffer.
163    ///
164    /// Tracked to compute deltas in `update_metrics`.
165    prev_update_count: usize,
166    /// Total heap size used by the correction buffer.
167    ///
168    /// Tracked to compute deltas in `update_metrics`.
169    prev_size: SizeMetrics,
170    /// Global persist sink metrics.
171    metrics: SinkMetrics,
172    /// Per-worker persist sink metrics.
173    worker_metrics: SinkWorkerMetrics,
174    /// Introspection logging.
175    logging: Option<Logging>,
176}
177
178impl<D: Data> CorrectionV2<D> {
179    /// Construct a new [`CorrectionV2`] instance.
180    pub fn new(
181        metrics: SinkMetrics,
182        worker_metrics: SinkWorkerMetrics,
183        logging: Option<Logging>,
184    ) -> Self {
185        Self {
186            chains: Default::default(),
187            stage: Stage::new(logging.clone()),
188            since: Antichain::from_elem(Timestamp::MIN),
189            prev_update_count: 0,
190            prev_size: Default::default(),
191            metrics,
192            worker_metrics,
193            logging,
194        }
195    }
196
197    /// Insert a batch of updates.
198    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
199        let Some(since_ts) = self.since.as_option() else {
200            // If the since is the empty frontier, discard all updates.
201            updates.clear();
202            return;
203        };
204
205        for (_, time, _) in &mut *updates {
206            *time = std::cmp::max(*time, *since_ts);
207        }
208
209        self.insert_inner(updates);
210    }
211
212    /// Insert a batch of updates, after negating their diffs.
213    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
214        let Some(since_ts) = self.since.as_option() else {
215            // If the since is the empty frontier, discard all updates.
216            updates.clear();
217            return;
218        };
219
220        for (_, time, diff) in &mut *updates {
221            *time = std::cmp::max(*time, *since_ts);
222            *diff = -*diff;
223        }
224
225        self.insert_inner(updates);
226    }
227
228    /// Insert a batch of updates.
229    ///
230    /// All times are expected to be >= the `since`.
231    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
232        debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
233
234        if let Some(chain) = self.stage.insert(updates) {
235            self.log_chain_created(&chain);
236            self.chains.push(chain);
237
238            // Restore the chain invariant.
239            let merge_needed = |chains: &[Chain<_>]| match chains {
240                [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(),
241                _ => false,
242            };
243
244            while merge_needed(&self.chains) {
245                let a = self.chains.pop().unwrap();
246                let b = self.chains.pop().unwrap();
247                self.log_chain_dropped(&a);
248                self.log_chain_dropped(&b);
249
250                let merged = merge_chains([a, b], &self.since);
251                self.log_chain_created(&merged);
252                self.chains.push(merged);
253            }
254        };
255
256        self.update_metrics();
257    }
258
259    /// Return consolidated updates before the given `upper`.
260    pub fn updates_before<'a>(
261        &'a mut self,
262        upper: &Antichain<Timestamp>,
263    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + 'a {
264        let mut result = None;
265
266        if !PartialOrder::less_than(&self.since, upper) {
267            // All contained updates are beyond the upper.
268            return result.into_iter().flatten();
269        }
270
271        self.consolidate_before(upper);
272
273        // There is at most one chain that contains updates before `upper` now.
274        result = self
275            .chains
276            .iter()
277            .find(|c| c.first().is_some_and(|(_, t, _)| !upper.less_equal(&t)))
278            .map(move |c| {
279                let upper = upper.clone();
280                c.iter().take_while(move |(_, t, _)| !upper.less_equal(t))
281            });
282
283        result.into_iter().flatten()
284    }
285
286    /// Consolidate all updates before the given `upper`.
287    ///
288    /// Once this method returns, all remaining updates before `upper` are contained in a single
289    /// chain. Note that this chain might also contain updates beyond `upper` though!
290    fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
291        if self.chains.is_empty() && self.stage.is_empty() {
292            return;
293        }
294
295        let mut chains = std::mem::take(&mut self.chains);
296
297        // To keep things simple, we log the dropping of all chains here and log the creation of
298        // all remaining chains at the end. This causes more event churn than necessary, but the
299        // consolidated result is correct.
300        chains.iter().for_each(|c| self.log_chain_dropped(c));
301
302        chains.extend(self.stage.flush());
303
304        if chains.is_empty() {
305            // We can only get here if the stage contained updates but they all got consolidated
306            // away by `flush`, so we need to update the metrics before we return.
307            self.update_metrics();
308            return;
309        }
310
311        let (merged, remains) = merge_chains_up_to(chains, &self.since, upper);
312
313        self.chains = remains;
314        if !merged.is_empty() {
315            // We put the merged chain at the end, assuming that its contents are likely to
316            // consolidate with retractions that will arrive soon.
317            self.chains.push(merged);
318        }
319
320        // Restore the chain invariant.
321        //
322        // This part isn't great. We've taken great care so far to only look at updates with times
323        // before `upper`, but now we might end up merging all chains anyway in the worst case.
324        // There might be something smarter we could do to avoid merging as much as possible. For
325        // example, we could consider sorting chains by length first, or inspect the contained
326        // times and prefer merging chains that have a chance at consolidating with one another.
327        let mut i = self.chains.len().saturating_sub(1);
328        while i > 0 {
329            let needs_merge = self.chains.get(i).is_some_and(|a| {
330                let b = &self.chains[i - 1];
331                a.len() * CHAIN_PROPORTIONALITY > b.len()
332            });
333            if needs_merge {
334                let a = self.chains.remove(i);
335                let b = std::mem::take(&mut self.chains[i - 1]);
336                let merged = merge_chains([a, b], &self.since);
337                self.chains[i - 1] = merged;
338            } else {
339                // Only advance the index if we didn't merge. A merge can reduce the size of the
340                // chain at `i - 1`, causing an violation of the chain invariant with the next
341                // chain, so we might need to merge the two before proceeding to lower indexes.
342                i -= 1;
343            }
344        }
345
346        self.chains.iter().for_each(|c| self.log_chain_created(c));
347        self.update_metrics();
348    }
349
350    /// Advance the since frontier.
351    ///
352    /// # Panics
353    ///
354    /// Panics if the given `since` is less than the current since frontier.
355    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
356        assert!(PartialOrder::less_equal(&self.since, &since));
357        self.stage.advance_times(&since);
358        self.since = since;
359    }
360
361    /// Consolidate all updates at the current `since`.
362    pub fn consolidate_at_since(&mut self) {
363        let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
364        if let Some(upper_ts) = upper_ts {
365            let upper = Antichain::from_elem(upper_ts);
366            self.consolidate_before(&upper);
367        }
368    }
369
370    fn log_chain_created(&self, chain: &Chain<D>) {
371        if let Some(logging) = &self.logging {
372            logging.chain_created(chain.update_count);
373        }
374    }
375
376    fn log_chain_dropped(&self, chain: &Chain<D>) {
377        if let Some(logging) = &self.logging {
378            logging.chain_dropped(chain.update_count);
379        }
380    }
381
382    /// Update persist sink metrics.
383    fn update_metrics(&mut self) {
384        let mut new_size = self.stage.get_size();
385        let mut new_length = self.stage.data.len();
386        for chain in &mut self.chains {
387            new_size += chain.get_size();
388            new_length += chain.update_count;
389        }
390
391        self.update_metrics_inner(new_size, new_length);
392    }
393
394    /// Update persist sink metrics to the given new size and length.
395    fn update_metrics_inner(&mut self, new_size: SizeMetrics, new_length: usize) {
396        let old_size = self.prev_size;
397        let old_length = self.prev_update_count;
398        let len_delta = UpdateDelta::new(new_length, old_length);
399        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
400        self.metrics
401            .report_correction_update_deltas(len_delta, cap_delta);
402        self.worker_metrics
403            .report_correction_update_totals(new_length, new_size.capacity);
404
405        if let Some(logging) = &self.logging {
406            let i = |x: usize| isize::try_from(x).expect("must fit");
407            logging.report_size_diff(i(new_size.size) - i(old_size.size));
408            logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
409            logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
410        }
411
412        self.prev_size = new_size;
413        self.prev_update_count = new_length;
414    }
415}
416
417impl<D: Data> Drop for CorrectionV2<D> {
418    fn drop(&mut self) {
419        self.chains.iter().for_each(|c| self.log_chain_dropped(c));
420        self.update_metrics_inner(Default::default(), 0);
421    }
422}
423
424/// A chain of [`Chunk`]s containing updates.
425///
426/// All updates in a chain are sorted by (time, data) and consolidated.
427///
428/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
429/// keeping around empty chains.
430#[derive(Debug)]
431struct Chain<D: Data> {
432    /// The contained chunks.
433    chunks: Vec<Chunk<D>>,
434    /// The number of updates contained in all chunks, for efficient updating of metrics.
435    update_count: usize,
436    /// Cached value of the current chain size, for efficient updating of metrics.
437    cached_size: Option<SizeMetrics>,
438}
439
440impl<D: Data> Default for Chain<D> {
441    fn default() -> Self {
442        Self {
443            chunks: Default::default(),
444            update_count: 0,
445            cached_size: None,
446        }
447    }
448}
449
450impl<D: Data> Chain<D> {
451    /// Return whether the chain is empty.
452    fn is_empty(&self) -> bool {
453        self.chunks.is_empty()
454    }
455
456    /// Return the length of the chain, in chunks.
457    fn len(&self) -> usize {
458        self.chunks.len()
459    }
460
461    /// Push an update onto the chain.
462    ///
463    /// The update must sort after all updates already in the chain, in (time, data)-order, to
464    /// ensure the chain remains sorted.
465    fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
466        let (d, t, r) = update;
467        let update = (d.borrow(), t, r);
468
469        debug_assert!(self.can_accept(update));
470
471        match self.chunks.last_mut() {
472            Some(c) if !c.at_capacity() => c.push(update),
473            Some(_) | None => {
474                let chunk = Chunk::from_update(update);
475                self.push_chunk(chunk);
476            }
477        }
478
479        self.update_count += 1;
480        self.invalidate_cached_size();
481    }
482
483    /// Push a chunk onto the chain.
484    ///
485    /// All updates in the chunk must sort after all updates already in the chain, in
486    /// (time, data)-order, to ensure the chain remains sorted.
487    fn push_chunk(&mut self, chunk: Chunk<D>) {
488        debug_assert!(self.can_accept(chunk.first()));
489
490        self.update_count += chunk.len();
491        self.chunks.push(chunk);
492        self.invalidate_cached_size();
493    }
494
495    /// Push the updates produced by a cursor onto the chain.
496    ///
497    /// All updates produced by the cursor must sort after all updates already in the chain, in
498    /// (time, data)-order, to ensure the chain remains sorted.
499    fn push_cursor(&mut self, cursor: Cursor<D>) {
500        let mut rest = Some(cursor);
501        while let Some(cursor) = rest.take() {
502            let update = cursor.get();
503            self.push(update);
504            rest = cursor.step();
505        }
506    }
507
508    /// Return whether the chain can accept the given update.
509    ///
510    /// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
511    fn can_accept(&self, update: (&D, Timestamp, Diff)) -> bool {
512        self.last().is_none_or(|(dc, tc, _)| {
513            let (d, t, _) = update;
514            (tc, dc) < (t, d)
515        })
516    }
517
518    /// Return the first update in the chain, if any.
519    fn first(&self) -> Option<(&D, Timestamp, Diff)> {
520        self.chunks.first().map(|c| c.first())
521    }
522
523    /// Return the last update in the chain, if any.
524    fn last(&self) -> Option<(&D, Timestamp, Diff)> {
525        self.chunks.last().map(|c| c.last())
526    }
527
528    /// Convert the chain into a cursor over the contained updates.
529    fn into_cursor(self) -> Option<Cursor<D>> {
530        let chunks = self.chunks.into_iter().map(Rc::new).collect();
531        Cursor::new(chunks)
532    }
533
534    /// Return an iterator over the contained updates.
535    fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
536        self.chunks
537            .iter()
538            .flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
539    }
540
541    /// Return the size of the chain, for use in metrics.
542    fn get_size(&mut self) -> SizeMetrics {
543        // This operation can be expensive as it requires inspecting the individual chunks and
544        // their backing regions. We thus cache the result to hopefully avoid the cost most of the
545        // time.
546        if self.cached_size.is_none() {
547            let mut metrics = SizeMetrics::default();
548            for chunk in &mut self.chunks {
549                metrics += chunk.get_size();
550            }
551            self.cached_size = Some(metrics);
552        }
553
554        self.cached_size.unwrap()
555    }
556
557    /// Invalidate the cached chain size.
558    ///
559    /// This method must be called every time the size of the chain changed.
560    fn invalidate_cached_size(&mut self) {
561        self.cached_size = None;
562    }
563}
564
565impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
566    fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
567        for update in iter {
568            self.push(update);
569        }
570    }
571}
572
573/// A cursor over updates in a chain.
574///
575/// A cursor provides two guarantees:
576///  * Produced updates are ordered and consolidated.
577///  * A cursor always yields at least one update.
578///
579/// The second guarantee is enforced through the type system: Every method that steps a cursor
580/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
581/// over the last update.
582///
583/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
584/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
585/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
586#[derive(Clone, Debug)]
587struct Cursor<D: Data> {
588    /// The chunks from which updates can still be produced.
589    chunks: VecDeque<Rc<Chunk<D>>>,
590    /// The current offset into `chunks.front()`.
591    chunk_offset: usize,
592    /// An optional limit for the number of updates the cursor will produce.
593    limit: Option<usize>,
594    /// An optional overwrite for the timestamp of produced updates.
595    overwrite_ts: Option<Timestamp>,
596}
597
598impl<D: Data> Cursor<D> {
599    /// Construct a cursor over a list of chunks.
600    ///
601    /// Returns `None` if `chunks` is empty.
602    fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
603        if chunks.is_empty() {
604            return None;
605        }
606
607        Some(Self {
608            chunks,
609            chunk_offset: 0,
610            limit: None,
611            overwrite_ts: None,
612        })
613    }
614
615    /// Set a limit for the number of updates this cursor will produce.
616    ///
617    /// # Panics
618    ///
619    /// Panics if there is already a limit lower than the new one.
620    fn set_limit(mut self, limit: usize) -> Option<Self> {
621        assert!(self.limit.is_none_or(|l| l >= limit));
622
623        if limit == 0 {
624            return None;
625        }
626
627        // Release chunks made unreachable by the limit.
628        let mut count = 0;
629        let mut idx = 0;
630        let mut offset = self.chunk_offset;
631        while idx < self.chunks.len() && count < limit {
632            let chunk = &self.chunks[idx];
633            count += chunk.len() - offset;
634            idx += 1;
635            offset = 0;
636        }
637        self.chunks.truncate(idx);
638
639        if count > limit {
640            self.limit = Some(limit);
641        }
642
643        Some(self)
644    }
645
646    /// Get a reference to the current update.
647    fn get(&self) -> (&D, Timestamp, Diff) {
648        let chunk = self.get_chunk();
649        let (d, t, r) = chunk.index(self.chunk_offset);
650        let t = self.overwrite_ts.unwrap_or(t);
651        (d, t, r)
652    }
653
654    /// Get a reference to the current chunk.
655    fn get_chunk(&self) -> &Chunk<D> {
656        &self.chunks[0]
657    }
658
659    /// Step to the next update.
660    ///
661    /// Returns the stepped cursor, or `None` if the step was over the last update.
662    fn step(mut self) -> Option<Self> {
663        if self.chunk_offset == self.get_chunk().len() - 1 {
664            return self.skip_chunk().map(|(c, _)| c);
665        }
666
667        self.chunk_offset += 1;
668
669        if let Some(limit) = &mut self.limit {
670            *limit -= 1;
671            if *limit == 0 {
672                return None;
673            }
674        }
675
676        Some(self)
677    }
678
679    /// Skip the remainder of the current chunk.
680    ///
681    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
682    /// left after the skip.
683    fn skip_chunk(mut self) -> Option<(Self, usize)> {
684        let chunk = self.chunks.pop_front().expect("cursor invariant");
685
686        if self.chunks.is_empty() {
687            return None;
688        }
689
690        let skipped = chunk.len() - self.chunk_offset;
691        self.chunk_offset = 0;
692
693        if let Some(limit) = &mut self.limit {
694            if skipped >= *limit {
695                return None;
696            }
697            *limit -= skipped;
698        }
699
700        Some((self, skipped))
701    }
702
703    /// Skip all updates with times <= the given time.
704    ///
705    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
706    /// left after the skip.
707    fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
708        if self.overwrite_ts.is_some_and(|ts| ts <= time) {
709            return None;
710        } else if self.get().1 > time {
711            return Some((self, 0));
712        }
713
714        let mut skipped = 0;
715
716        let new_offset = loop {
717            let chunk = self.get_chunk();
718            if let Some(index) = chunk.find_time_greater_than(time) {
719                break index;
720            }
721
722            let (cursor, count) = self.skip_chunk()?;
723            self = cursor;
724            skipped += count;
725        };
726
727        skipped += new_offset - self.chunk_offset;
728        self.chunk_offset = new_offset;
729
730        Some((self, skipped))
731    }
732
733    /// Advance all updates in this cursor by the given `since_ts`.
734    ///
735    /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
736    /// been advanced by `since_ts`.
737    fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
738        // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
739        // only need to advance the `overwrite_ts` by the `since_ts`.
740        if let Some(ts) = self.overwrite_ts {
741            if ts < since_ts {
742                self.overwrite_ts = Some(since_ts);
743            }
744            return vec![self];
745        }
746
747        // Otherwise we need to split the cursor so that each new cursor only yields runs of
748        // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
749        // this by splitting the cursor at each time <= `since_ts`.
750        let mut splits = Vec::new();
751        let mut remaining = Some(self);
752
753        while let Some(cursor) = remaining.take() {
754            let (_, time, _) = cursor.get();
755            if time >= since_ts {
756                splits.push(cursor);
757                break;
758            }
759
760            let mut current = cursor.clone();
761            if let Some((cursor, skipped)) = cursor.skip_time(time) {
762                remaining = Some(cursor);
763                current = current.set_limit(skipped).expect("skipped at least 1");
764            }
765            current.overwrite_ts = Some(since_ts);
766            splits.push(current);
767        }
768
769        splits
770    }
771
772    /// Split the cursor at the given time.
773    ///
774    /// Returns two cursors, the first yielding all updates at times < `time`, the second yielding
775    /// all updates at times >= `time`. Both can be `None` if they would be empty.
776    fn split_at_time(self, time: Timestamp) -> (Option<Self>, Option<Self>) {
777        let Some(skip_ts) = time.step_back() else {
778            return (None, Some(self));
779        };
780
781        let before = self.clone();
782        match self.skip_time(skip_ts) {
783            Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
784            None => (Some(before), None),
785        }
786    }
787
788    /// Attempt to unwrap the cursor into a [`Chain`].
789    ///
790    /// This operation efficiently reuses chunks by directly inserting them into the output chain
791    /// where possible.
792    ///
793    /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
794    /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
795    /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
796    /// chain by copying chunks rather than reusing them.
797    fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
798        if self.limit.is_some() {
799            return Err(("cursor with limit", self));
800        }
801        if self.overwrite_ts.is_some() {
802            return Err(("cursor with overwrite_ts", self));
803        }
804        if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
805            return Err(("cursor on shared chunks", self));
806        }
807
808        let mut chain = Chain::default();
809        let mut remaining = Some(self);
810
811        // We might be partway through the first chunk, in which case we can't reuse it but need to
812        // allocate a new one to contain only the updates the cursor can still yield.
813        while let Some(cursor) = remaining.take() {
814            if cursor.chunk_offset == 0 {
815                remaining = Some(cursor);
816                break;
817            }
818            let update = cursor.get();
819            chain.push(update);
820            remaining = cursor.step();
821        }
822
823        if let Some(cursor) = remaining {
824            for chunk in cursor.chunks {
825                let chunk = Rc::into_inner(chunk).expect("checked above");
826                chain.push_chunk(chunk);
827            }
828        }
829
830        Ok(chain)
831    }
832}
833
834impl<D: Data> From<Cursor<D>> for Chain<D> {
835    fn from(cursor: Cursor<D>) -> Self {
836        match cursor.try_unwrap() {
837            Ok(chain) => chain,
838            Err((_, cursor)) => {
839                let mut chain = Chain::default();
840                chain.push_cursor(cursor);
841                chain
842            }
843        }
844    }
845}
846
847/// A non-empty chunk of updates, backed by a columnation region.
848///
849/// All updates in a chunk are sorted by (time, data) and consolidated.
850///
851/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
852/// re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API doesn't
853/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
854/// spirit.
855struct Chunk<D: Data> {
856    /// The contained updates.
857    data: TimelyStack<(D, Timestamp, Diff)>,
858    /// Cached value of the current chunk size, for efficient updating of metrics.
859    cached_size: Option<SizeMetrics>,
860}
861
862impl<D: Data> Default for Chunk<D> {
863    fn default() -> Self {
864        let mut data = TimelyStack::default();
865        data.ensure_capacity(&mut None);
866
867        Self {
868            data,
869            cached_size: None,
870        }
871    }
872}
873
874impl<D: Data> fmt::Debug for Chunk<D> {
875    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
876        write!(f, "Chunk(<{}>)", self.len())
877    }
878}
879
880impl<D: Data> Chunk<D> {
881    /// Create a new chunk containing a single update.
882    fn from_update<DT: Borrow<D>>(update: (DT, Timestamp, Diff)) -> Self {
883        let (d, t, r) = update;
884
885        let mut chunk = Self::default();
886        chunk.data.copy_destructured(d.borrow(), &t, &r);
887
888        chunk
889    }
890
891    /// Return the number of updates in the chunk.
892    fn len(&self) -> usize {
893        self.data.len()
894    }
895
896    /// Return the (local) capacity of the chunk.
897    fn capacity(&self) -> usize {
898        self.data.capacity()
899    }
900
901    /// Return whether the chunk is at capacity.
902    fn at_capacity(&self) -> bool {
903        self.data.at_capacity()
904    }
905
906    /// Return the update at the given index.
907    ///
908    /// # Panics
909    ///
910    /// Panics if the given index is not populated.
911    fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
912        let (d, t, r) = self.data.index(idx);
913        (d, *t, *r)
914    }
915
916    /// Return the first update in the chunk.
917    fn first(&self) -> (&D, Timestamp, Diff) {
918        self.index(0)
919    }
920
921    /// Return the last update in the chunk.
922    fn last(&self) -> (&D, Timestamp, Diff) {
923        self.index(self.len() - 1)
924    }
925
926    /// Push an update onto the chunk.
927    fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
928        let (d, t, r) = update;
929        self.data.copy_destructured(d.borrow(), &t, &r);
930
931        self.invalidate_cached_size();
932    }
933
934    /// Return the index of the first update at a time greater than `time`, or `None` if no such
935    /// update exists.
936    fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
937        if self.last().1 <= time {
938            return None;
939        }
940
941        let mut lower = 0;
942        let mut upper = self.len();
943        while lower < upper {
944            let idx = (lower + upper) / 2;
945            if self.index(idx).1 > time {
946                upper = idx;
947            } else {
948                lower = idx + 1;
949            }
950        }
951
952        Some(lower)
953    }
954
955    /// Return the size of the chunk, for use in metrics.
956    fn get_size(&mut self) -> SizeMetrics {
957        if self.cached_size.is_none() {
958            let mut size = 0;
959            let mut capacity = 0;
960            self.data.heap_size(|sz, cap| {
961                size += sz;
962                capacity += cap;
963            });
964            self.cached_size = Some(SizeMetrics {
965                size,
966                capacity,
967                allocations: 1,
968            });
969        }
970
971        self.cached_size.unwrap()
972    }
973
974    /// Invalidate the cached chunk size.
975    ///
976    /// This method must be called every time the size of the chunk changed.
977    fn invalidate_cached_size(&mut self) {
978        self.cached_size = None;
979    }
980}
981
982/// A buffer for staging updates before they are inserted into the sorted chains.
983#[derive(Debug)]
984struct Stage<D> {
985    /// The contained updates.
986    ///
987    /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
988    data: Vec<(D, Timestamp, Diff)>,
989    /// Introspection logging.
990    ///
991    /// We want to report the number of records in the stage. To do so, we pretend that the stage
992    /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
993    /// re-created.
994    logging: Option<Logging>,
995}
996
997impl<D: Data> Stage<D> {
998    fn new(logging: Option<Logging>) -> Self {
999        // Make sure that the `Stage` has the same capacity as a `Chunk`.
1000        let chunk = Chunk::<D>::default();
1001        let data = Vec::with_capacity(chunk.capacity());
1002
1003        // For logging, we pretend the stage consists of a single chain.
1004        if let Some(logging) = &logging {
1005            logging.chain_created(0);
1006        }
1007
1008        Self { data, logging }
1009    }
1010
1011    fn is_empty(&self) -> bool {
1012        self.data.is_empty()
1013    }
1014
1015    /// Insert a batch of updates, possibly producing a ready [`Chain`].
1016    fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
1017        if updates.is_empty() {
1018            return None;
1019        }
1020
1021        let prev_length = self.ilen();
1022
1023        // Determine how many chunks we can fill with the available updates.
1024        let update_count = self.data.len() + updates.len();
1025        let chunk_size = self.data.capacity();
1026        let chunk_count = update_count / chunk_size;
1027
1028        let mut new_updates = updates.drain(..);
1029
1030        // If we have enough shipable updates, collect them, consolidate, and build a chain.
1031        let maybe_chain = if chunk_count > 0 {
1032            let ship_count = chunk_count * chunk_size;
1033            let mut buffer = Vec::with_capacity(ship_count);
1034
1035            buffer.append(&mut self.data);
1036            while buffer.len() < ship_count {
1037                let update = new_updates.next().unwrap();
1038                buffer.push(update);
1039            }
1040
1041            consolidate(&mut buffer);
1042
1043            let mut chain = Chain::default();
1044            chain.extend(buffer);
1045            Some(chain)
1046        } else {
1047            None
1048        };
1049
1050        // Stage the remaining updates.
1051        self.data.extend(new_updates);
1052
1053        self.log_length_diff(self.ilen() - prev_length);
1054
1055        maybe_chain
1056    }
1057
1058    /// Flush all currently staged updates into a chain.
1059    fn flush(&mut self) -> Option<Chain<D>> {
1060        self.log_length_diff(-self.ilen());
1061
1062        consolidate(&mut self.data);
1063
1064        if self.data.is_empty() {
1065            return None;
1066        }
1067
1068        let mut chain = Chain::default();
1069        chain.extend(self.data.drain(..));
1070        Some(chain)
1071    }
1072
1073    /// Advance the times of staged updates by the given `since`.
1074    fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1075        let Some(since_ts) = since.as_option() else {
1076            // If the since is the empty frontier, discard all updates.
1077            self.log_length_diff(-self.ilen());
1078            self.data.clear();
1079            return;
1080        };
1081
1082        for (_, time, _) in &mut self.data {
1083            *time = std::cmp::max(*time, *since_ts);
1084        }
1085    }
1086
1087    /// Return the size of the stage, for use in metrics.
1088    ///
1089    /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1090    /// under-estimates. That's fine as the stage should always be small.
1091    fn get_size(&self) -> SizeMetrics {
1092        SizeMetrics {
1093            size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1094            capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1095            allocations: 1,
1096        }
1097    }
1098
1099    /// Return the number of updates in the stage, as an `isize`.
1100    fn ilen(&self) -> isize {
1101        self.data.len().try_into().expect("must fit")
1102    }
1103
1104    fn log_length_diff(&self, diff: isize) {
1105        let Some(logging) = &self.logging else { return };
1106        if diff > 0 {
1107            let count = usize::try_from(diff).expect("must fit");
1108            logging.chain_created(count);
1109            logging.chain_dropped(0);
1110        } else if diff < 0 {
1111            let count = usize::try_from(-diff).expect("must fit");
1112            logging.chain_created(0);
1113            logging.chain_dropped(count);
1114        }
1115    }
1116}
1117
1118impl<D> Drop for Stage<D> {
1119    fn drop(&mut self) {
1120        if let Some(logging) = &self.logging {
1121            logging.chain_dropped(self.data.len());
1122        }
1123    }
1124}
1125
1126/// Sort and consolidate the given list of updates.
1127///
1128/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1129/// except that it sorts updates by (time, data) instead of (data, time).
1130fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1131    if updates.len() <= 1 {
1132        return;
1133    }
1134
1135    let diff = |update: &(_, _, Diff)| update.2;
1136
1137    updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1138
1139    let mut offset = 0;
1140    let mut accum = diff(&updates[0]);
1141
1142    for idx in 1..updates.len() {
1143        let this = &updates[idx];
1144        let prev = &updates[idx - 1];
1145        if this.0 == prev.0 && this.1 == prev.1 {
1146            accum += diff(&updates[idx]);
1147        } else {
1148            if accum != Diff::ZERO {
1149                updates.swap(offset, idx - 1);
1150                updates[offset].2 = accum;
1151                offset += 1;
1152            }
1153            accum = diff(&updates[idx]);
1154        }
1155    }
1156
1157    if accum != Diff::ZERO {
1158        let len = updates.len();
1159        updates.swap(offset, len - 1);
1160        updates[offset].2 = accum;
1161        offset += 1;
1162    }
1163
1164    updates.truncate(offset);
1165}
1166
1167/// Merge the given chains, advancing times by the given `since` in the process.
1168fn merge_chains<D: Data>(
1169    chains: impl IntoIterator<Item = Chain<D>>,
1170    since: &Antichain<Timestamp>,
1171) -> Chain<D> {
1172    let Some(&since_ts) = since.as_option() else {
1173        return Chain::default();
1174    };
1175
1176    let mut to_merge = Vec::new();
1177    for chain in chains {
1178        if let Some(cursor) = chain.into_cursor() {
1179            let mut runs = cursor.advance_by(since_ts);
1180            to_merge.append(&mut runs);
1181        }
1182    }
1183
1184    merge_cursors(to_merge)
1185}
1186
1187/// Merge the given chains, advancing times by the given `since` in the process, but only up to the
1188/// given `upper`.
1189///
1190/// Returns the merged chain and a list of non-empty remainders of the input chains.
1191fn merge_chains_up_to<D: Data>(
1192    chains: Vec<Chain<D>>,
1193    since: &Antichain<Timestamp>,
1194    upper: &Antichain<Timestamp>,
1195) -> (Chain<D>, Vec<Chain<D>>) {
1196    let Some(&since_ts) = since.as_option() else {
1197        return (Chain::default(), Vec::new());
1198    };
1199    let Some(&upper_ts) = upper.as_option() else {
1200        let merged = merge_chains(chains, since);
1201        return (merged, Vec::new());
1202    };
1203
1204    if since_ts >= upper_ts {
1205        // After advancing by `since` there will be no updates before `upper`.
1206        return (Chain::default(), chains);
1207    }
1208
1209    let mut to_merge = Vec::new();
1210    let mut to_keep = Vec::new();
1211    for chain in chains {
1212        if let Some(cursor) = chain.into_cursor() {
1213            let mut runs = cursor.advance_by(since_ts);
1214            if let Some(last) = runs.pop() {
1215                let (before, beyond) = last.split_at_time(upper_ts);
1216                before.map(|c| runs.push(c));
1217                beyond.map(|c| to_keep.push(c));
1218            }
1219            to_merge.append(&mut runs);
1220        }
1221    }
1222
1223    let merged = merge_cursors(to_merge);
1224    let remains = to_keep
1225        .into_iter()
1226        .map(|c| c.try_unwrap().expect("unwrapable"))
1227        .collect();
1228
1229    (merged, remains)
1230}
1231
1232/// Merge the given cursors into one chain.
1233fn merge_cursors<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
1234    match cursors.len() {
1235        0 => Chain::default(),
1236        1 => {
1237            let [cur] = cursors.try_into().unwrap();
1238            Chain::from(cur)
1239        }
1240        2 => {
1241            let [a, b] = cursors.try_into().unwrap();
1242            merge_2(a, b)
1243        }
1244        _ => merge_many(cursors),
1245    }
1246}
1247
1248/// Merge the given two cursors using a 2-way merge.
1249///
1250/// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
1251fn merge_2<D: Data>(cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
1252    let mut rest1 = Some(cursor1);
1253    let mut rest2 = Some(cursor2);
1254    let mut merged = Chain::default();
1255
1256    loop {
1257        match (rest1, rest2) {
1258            (Some(c1), Some(c2)) => {
1259                let (d1, t1, r1) = c1.get();
1260                let (d2, t2, r2) = c2.get();
1261
1262                match (t1, d1).cmp(&(t2, d2)) {
1263                    Ordering::Less => {
1264                        merged.push((d1, t1, r1));
1265                        rest1 = c1.step();
1266                        rest2 = Some(c2);
1267                    }
1268                    Ordering::Greater => {
1269                        merged.push((d2, t2, r2));
1270                        rest1 = Some(c1);
1271                        rest2 = c2.step();
1272                    }
1273                    Ordering::Equal => {
1274                        let r = r1 + r2;
1275                        if r != Diff::ZERO {
1276                            merged.push((d1, t1, r));
1277                        }
1278                        rest1 = c1.step();
1279                        rest2 = c2.step();
1280                    }
1281                }
1282            }
1283            (Some(c), None) | (None, Some(c)) => {
1284                merged.push_cursor(c);
1285                break;
1286            }
1287            (None, None) => break,
1288        }
1289    }
1290
1291    merged
1292}
1293
1294/// Merge the given cursors using a k-way merge with a binary heap.
1295fn merge_many<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
1296    let mut heap = MergeHeap::from_iter(cursors);
1297    let mut merged = Chain::default();
1298    while let Some(cursor1) = heap.pop() {
1299        let (data, time, mut diff) = cursor1.get();
1300
1301        while let Some((cursor2, r)) = heap.pop_equal(data, time) {
1302            diff += r;
1303            if let Some(cursor2) = cursor2.step() {
1304                heap.push(cursor2);
1305            }
1306        }
1307
1308        if diff != Diff::ZERO {
1309            merged.push((data, time, diff));
1310        }
1311        if let Some(cursor1) = cursor1.step() {
1312            heap.push(cursor1);
1313        }
1314    }
1315
1316    merged
1317}
1318
1319/// A binary heap specialized for merging [`Cursor`]s.
1320struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1321
1322impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1323    fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1324        let inner = cursors.into_iter().map(MergeCursor).collect();
1325        Self(inner)
1326    }
1327}
1328
1329impl<D: Data> MergeHeap<D> {
1330    /// Pop the next cursor (the one yielding the least update) from the heap.
1331    fn pop(&mut self) -> Option<Cursor<D>> {
1332        self.0.pop().map(|MergeCursor(c)| c)
1333    }
1334
1335    /// Pop the next cursor from the heap, provided the data and time of its current update are
1336    /// equal to the given values.
1337    ///
1338    /// Returns both the cursor and the diff corresponding to `data` and `time`.
1339    fn pop_equal(&mut self, data: &D, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1340        let MergeCursor(cursor) = self.0.peek()?;
1341        let (d, t, r) = cursor.get();
1342        if d == data && t == time {
1343            let cursor = self.pop().expect("checked above");
1344            Some((cursor, r))
1345        } else {
1346            None
1347        }
1348    }
1349
1350    /// Push a cursor onto the heap.
1351    fn push(&mut self, cursor: Cursor<D>) {
1352        self.0.push(MergeCursor(cursor));
1353    }
1354}
1355
1356/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1357///
1358/// Implements the cursor ordering required for merging cursors.
1359struct MergeCursor<D: Data>(Cursor<D>);
1360
1361impl<D: Data> PartialEq for MergeCursor<D> {
1362    fn eq(&self, other: &Self) -> bool {
1363        self.cmp(other).is_eq()
1364    }
1365}
1366
1367impl<D: Data> Eq for MergeCursor<D> {}
1368
1369impl<D: Data> PartialOrd for MergeCursor<D> {
1370    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1371        Some(self.cmp(other))
1372    }
1373}
1374
1375impl<D: Data> Ord for MergeCursor<D> {
1376    fn cmp(&self, other: &Self) -> Ordering {
1377        let (d1, t1, _) = self.0.get();
1378        let (d2, t2, _) = other.0.get();
1379        (t1, d1).cmp(&(t2, d2)).reverse()
1380    }
1381}