Skip to main content

mz_compute/sink/
correction_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//!  * insert new updates
15//!  * advance the compaction frontier (called `since`)
16//!  * obtain an iterator over consolidated updates before some `upper`
17//!  * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of [`Chain`]s containing [`Chunk`]s of stashed updates. Each
32//! [`Chunk`] is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! ```text
36//!       chain[0]   |   chain[1]   |   chain[2]
37//!                  |              |
38//!     chunk[0]     | chunk[0]     | chunk[0]
39//!       (a, 1, +1) |   (a, 1, +1) |   (d, 3, +1)
40//!       (b, 1, +1) |   (b, 2, -1) |   (d, 4, -1)
41//!     chunk[1]     | chunk[1]     |
42//!       (c, 1, +1) |   (c, 2, -2) |
43//!       (a, 2, -1) |   (c, 4, -1) |
44//!     chunk[2]     |              |
45//!       (b, 2, +1) |              |
46//!       (c, 2, +1) |              |
47//!     chunk[3]     |              |
48//!       (b, 3, -1) |              |
49//!       (c, 3, +1) |              |
50//! ```
51//!
52//! The "chain invariant" states that each chain has at least `chain_proportionality` times as
53//! many chunks as the next one. This means that chain sizes will often be powers of
54//! `chain_proportionality`, but they don't have to be. For example, for a proportionality of 2,
55//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
56//!
57//! Choosing the `chain_proportionality` value allows tuning the trade-off between memory and CPU
58//! resources required to maintain corrections. A higher proportionality forces more frequent chain
59//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
60//!
61//! ## Inserting Updates
62//!
63//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
64//! list until the chain invariant is restored.
65//!
66//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
67//! chunk, copying the update in, and then likely merging with an existing chain to restore the
68//! chain invariant. If updates trickle in in small batches, this can cause a considerable
69//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
70//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
71//! [`Chunk`], they are sorted an inserted into the chains.
72//!
73//! The insert operation has an amortized complexity of O(log N), with N being the current number
74//! of updates stored.
75//!
76//! ## Retrieving Consolidated Updates
77//!
78//! Retrieving consolidated updates before a given `upper` works by first consolidating all updates
79//! at times before the `upper`, merging them all into one chain, then returning an iterator over
80//! that chain.
81//!
82//! Because each chain contains updates ordered by time first, consolidation of all updates before
83//! an `upper` is possible without touching updates at future times. It works by merging the chains
84//! only up to the `upper`, producing a merged chain containing consolidated times before the
85//! `upper` and leaving behind the chain parts containing later times. The complexity of this
86//! operation is O(U log K), with U being the number of updates before `upper` and K the number
87//! of chains.
88//!
89//! Unfortunately, performing consolidation as described above can break the chain invariant and we
90//! might need to restore it by merging chains, including ones containing future updates. This is
91//! something that would be great to fix! In the meantime the hope is that in steady state it
92//! doesn't matter too much because either there are no future retractions and U is approximately
93//! equal to N, or the amount of future retractions is much larger than the amount of current
94//! changes, in which case removing the current changes has a good chance of leaving the chain
95//! invariant intact.
96//!
97//! ## Merging Chains
98//!
99//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
100//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
101//! complexity of a merge of K chains containing N updates is O(N log K).
102//!
103//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
104//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
105//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
106//!
107//! For example, consider these two chains, assuming `since = [2]`:
108//!   chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
109//!   chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
110//! After time advancement, the chains look like this:
111//!   chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
112//!   chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
113//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
114//! neither sorted nor consolidated.
115//!
116//! Instead we need to merge sub-chains, one for each distinct time that's before or at the
117//! `since`. Each of these sub-chains retains the (time, data) ordering after the time advancement
118//! to `since`, so merging those yields the expected result.
119//!
120//! For the above example, the chains we would merge are:
121//!   chain 1.a: [(c, 2, +1)]
122//!   chain 1.b: [(b, 2, -1), (a, 3, -1)]
123//!   chain 2.a: [(b, 2, +1)],
124//!   chain 2.b: [(a, 2, +1), (c, 2, -1)]
125
126use std::borrow::Borrow;
127use std::cmp::Ordering;
128use std::collections::{BinaryHeap, VecDeque};
129use std::fmt;
130use std::rc::Rc;
131
132use columnation::Columnation;
133use differential_dataflow::trace::implementations::BatchContainer;
134use mz_ore::cast::CastLossy;
135use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
136use mz_repr::{Diff, Timestamp};
137use mz_timely_util::columnation::ColumnationStack;
138use timely::PartialOrder;
139use timely::progress::Antichain;
140
141use crate::sink::correction::{ChannelLogging, SizeMetrics};
142
143/// Convenient alias for use in data trait bounds.
144pub trait Data:
145    differential_dataflow::Data + Columnation<InnerRegion: Send + Sync> + Send + Sync
146{
147}
148impl<D: differential_dataflow::Data + Columnation<InnerRegion: Send + Sync> + Send + Sync> Data
149    for D
150{
151}
152
153/// A data structure used to store corrections in the MV sink implementation.
154///
155/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
156/// allowing their memory to be transparently spilled to disk.
157#[derive(Debug)]
158pub(super) struct CorrectionV2<D: Data> {
159    /// Chains containing sorted updates.
160    chains: Vec<Chain<D>>,
161    /// A staging area for updates, to speed up small inserts.
162    stage: Stage<D>,
163    /// The frontier by which all contained times are advanced.
164    since: Antichain<Timestamp>,
165    /// The size factor of subsequent chains required by the chain invariant.
166    chain_proportionality: f64,
167    /// The capacity of each [`Chunk`].
168    chunk_capacity: usize,
169
170    /// Total count of updates in the correction buffer.
171    ///
172    /// Tracked to compute deltas in `update_metrics`.
173    prev_update_count: usize,
174    /// Total heap size used by the correction buffer.
175    ///
176    /// Tracked to compute deltas in `update_metrics`.
177    prev_size: SizeMetrics,
178    /// Global persist sink metrics.
179    metrics: SinkMetrics,
180    /// Per-worker persist sink metrics.
181    worker_metrics: SinkWorkerMetrics,
182    /// Introspection logging.
183    logging: Option<ChannelLogging>,
184}
185
186impl<D: Data> CorrectionV2<D> {
187    /// Construct a new [`CorrectionV2`] instance.
188    pub fn new(
189        metrics: SinkMetrics,
190        worker_metrics: SinkWorkerMetrics,
191        logging: Option<ChannelLogging>,
192        chain_proportionality: f64,
193        chunk_size: usize,
194    ) -> Self {
195        let update_size = std::mem::size_of::<(D, Timestamp, Diff)>();
196        let chunk_capacity = std::cmp::max(chunk_size / update_size, 1);
197
198        Self {
199            chains: Default::default(),
200            stage: Stage::new(logging.clone(), chunk_capacity),
201            since: Antichain::from_elem(Timestamp::MIN),
202            chain_proportionality,
203            chunk_capacity,
204            prev_update_count: 0,
205            prev_size: Default::default(),
206            metrics,
207            worker_metrics,
208            logging,
209        }
210    }
211
212    /// Insert a batch of updates.
213    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
214        let Some(since_ts) = self.since.as_option() else {
215            // If the since is the empty frontier, discard all updates.
216            updates.clear();
217            return;
218        };
219
220        for (_, time, _) in &mut *updates {
221            *time = std::cmp::max(*time, *since_ts);
222        }
223
224        self.insert_inner(updates);
225    }
226
227    /// Insert a batch of updates, after negating their diffs.
228    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
229        let Some(since_ts) = self.since.as_option() else {
230            // If the since is the empty frontier, discard all updates.
231            updates.clear();
232            return;
233        };
234
235        for (_, time, diff) in &mut *updates {
236            *time = std::cmp::max(*time, *since_ts);
237            *diff = -*diff;
238        }
239
240        self.insert_inner(updates);
241    }
242
243    /// Insert a batch of updates.
244    ///
245    /// All times are expected to be >= the `since`.
246    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
247        debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
248
249        if let Some(chain) = self.stage.insert(updates) {
250            self.log_chain_created(&chain);
251            self.chains.push(chain);
252
253            // Restore the chain invariant.
254            let prop = self.chain_proportionality;
255            let merge_needed = |chains: &[Chain<_>]| match chains {
256                [.., prev, last] => {
257                    let last_len = f64::cast_lossy(last.len());
258                    let prev_len = f64::cast_lossy(prev.len());
259                    last_len * prop > prev_len
260                }
261                _ => false,
262            };
263
264            while merge_needed(&self.chains) {
265                let a = self.chains.pop().unwrap();
266                let b = self.chains.pop().unwrap();
267                self.log_chain_dropped(&a);
268                self.log_chain_dropped(&b);
269
270                let merged = self.merge_chains([a, b]);
271                self.log_chain_created(&merged);
272                self.chains.push(merged);
273            }
274        };
275
276        self.update_metrics();
277    }
278
279    /// Return consolidated updates before the given `upper`.
280    pub fn updates_before<'a>(
281        &'a mut self,
282        upper: &Antichain<Timestamp>,
283    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + Send + 'a {
284        let mut result = None;
285
286        if !PartialOrder::less_than(&self.since, upper) {
287            // All contained updates are beyond the upper.
288            return result.into_iter().flatten();
289        }
290
291        self.consolidate_before(upper);
292
293        // There is at most one chain that contains updates before `upper` now.
294        result = self
295            .chains
296            .iter()
297            .find(|c| c.first().is_some_and(|(_, t, _)| !upper.less_equal(&t)))
298            .map(move |c| {
299                let upper = upper.clone();
300                c.iter().take_while(move |(_, t, _)| !upper.less_equal(t))
301            });
302
303        result.into_iter().flatten()
304    }
305
306    /// Consolidate all updates before the given `upper`.
307    ///
308    /// Once this method returns, all remaining updates before `upper` are contained in a single
309    /// chain. Note that this chain might also contain updates beyond `upper` though!
310    fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
311        if self.chains.is_empty() && self.stage.is_empty() {
312            return;
313        }
314
315        let mut chains = std::mem::take(&mut self.chains);
316
317        // To keep things simple, we log the dropping of all chains here and log the creation of
318        // all remaining chains at the end. This causes more event churn than necessary, but the
319        // consolidated result is correct.
320        chains.iter().for_each(|c| self.log_chain_dropped(c));
321
322        chains.extend(self.stage.flush());
323
324        if chains.is_empty() {
325            // We can only get here if the stage contained updates but they all got consolidated
326            // away by `flush`, so we need to update the metrics before we return.
327            self.update_metrics();
328            return;
329        }
330
331        let (merged, remains) = self.merge_chains_up_to(chains, upper);
332
333        self.chains = remains;
334        if !merged.is_empty() {
335            // We put the merged chain at the end, assuming that its contents are likely to
336            // consolidate with retractions that will arrive soon.
337            self.chains.push(merged);
338        }
339
340        // Restore the chain invariant.
341        //
342        // This part isn't great. We've taken great care so far to only look at updates with times
343        // before `upper`, but now we might end up merging all chains anyway in the worst case.
344        // There might be something smarter we could do to avoid merging as much as possible. For
345        // example, we could consider sorting chains by length first, or inspect the contained
346        // times and prefer merging chains that have a chance at consolidating with one another.
347        let mut i = self.chains.len().saturating_sub(1);
348        while i > 0 {
349            let needs_merge = self.chains.get(i).is_some_and(|a| {
350                let b = &self.chains[i - 1];
351                let a_len = f64::cast_lossy(a.len());
352                let b_len = f64::cast_lossy(b.len());
353                a_len * self.chain_proportionality > b_len
354            });
355            if needs_merge {
356                let a = self.chains.remove(i);
357                let b = std::mem::replace(&mut self.chains[i - 1], Chain::new(0));
358                let merged = self.merge_chains([a, b]);
359                self.chains[i - 1] = merged;
360            } else {
361                // Only advance the index if we didn't merge. A merge can reduce the size of the
362                // chain at `i - 1`, causing an violation of the chain invariant with the next
363                // chain, so we might need to merge the two before proceeding to lower indexes.
364                i -= 1;
365            }
366        }
367
368        self.chains.iter().for_each(|c| self.log_chain_created(c));
369        self.update_metrics();
370    }
371
372    /// Advance the since frontier.
373    ///
374    /// # Panics
375    ///
376    /// Panics if the given `since` is less than the current since frontier.
377    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
378        assert!(PartialOrder::less_equal(&self.since, &since));
379        self.stage.advance_times(&since);
380        self.since = since;
381    }
382
383    /// Consolidate all updates at the current `since`.
384    pub fn consolidate_at_since(&mut self) {
385        let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
386        if let Some(upper_ts) = upper_ts {
387            let upper = Antichain::from_elem(upper_ts);
388            self.consolidate_before(&upper);
389        }
390    }
391
392    fn log_chain_created(&self, chain: &Chain<D>) {
393        if let Some(logging) = &self.logging {
394            logging.chain_created(chain.update_count);
395        }
396    }
397
398    fn log_chain_dropped(&self, chain: &Chain<D>) {
399        if let Some(logging) = &self.logging {
400            logging.chain_dropped(chain.update_count);
401        }
402    }
403
404    /// Update persist sink metrics.
405    fn update_metrics(&mut self) {
406        let mut new_size = self.stage.get_size();
407        let mut new_length = self.stage.data.len();
408        for chain in &mut self.chains {
409            new_size += chain.get_size();
410            new_length += chain.update_count;
411        }
412
413        self.update_metrics_inner(new_size, new_length);
414    }
415
416    /// Update persist sink metrics to the given new size and length.
417    fn update_metrics_inner(&mut self, new_size: SizeMetrics, new_length: usize) {
418        let old_size = self.prev_size;
419        let old_length = self.prev_update_count;
420        let len_delta = UpdateDelta::new(new_length, old_length);
421        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
422        self.metrics
423            .report_correction_update_deltas(len_delta, cap_delta);
424        self.worker_metrics
425            .report_correction_update_totals(new_length, new_size.capacity);
426
427        if let Some(logging) = &self.logging {
428            let i = |x: usize| isize::try_from(x).expect("must fit");
429            logging.report_size_diff(i(new_size.size) - i(old_size.size));
430            logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
431            logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
432        }
433
434        self.prev_size = new_size;
435        self.prev_update_count = new_length;
436    }
437
438    /// Merge the given chains, advancing times by the current `since` in the process.
439    fn merge_chains(&self, chains: impl IntoIterator<Item = Chain<D>>) -> Chain<D> {
440        let Some(&since_ts) = self.since.as_option() else {
441            return Chain::new(self.chunk_capacity);
442        };
443
444        let mut to_merge = Vec::new();
445        for chain in chains {
446            if let Some(cursor) = chain.into_cursor() {
447                let mut runs = cursor.advance_by(since_ts);
448                to_merge.append(&mut runs);
449            }
450        }
451
452        self.merge_cursors(to_merge)
453    }
454
455    /// Merge the given chains, advancing times by the current `since` in the process, but only up
456    /// to the given `upper`.
457    ///
458    /// Returns the merged chain and a list of non-empty remainders of the input chains.
459    fn merge_chains_up_to(
460        &self,
461        chains: Vec<Chain<D>>,
462        upper: &Antichain<Timestamp>,
463    ) -> (Chain<D>, Vec<Chain<D>>) {
464        let Some(&since_ts) = self.since.as_option() else {
465            return (Chain::new(self.chunk_capacity), Vec::new());
466        };
467        let Some(&upper_ts) = upper.as_option() else {
468            let merged = self.merge_chains(chains);
469            return (merged, Vec::new());
470        };
471
472        if since_ts >= upper_ts {
473            // After advancing by `since` there will be no updates before `upper`.
474            return (Chain::new(self.chunk_capacity), chains);
475        }
476
477        let mut to_merge = Vec::new();
478        let mut to_keep = Vec::new();
479        for chain in chains {
480            if let Some(cursor) = chain.into_cursor() {
481                let mut runs = cursor.advance_by(since_ts);
482                if let Some(last) = runs.pop() {
483                    let (before, beyond) = last.split_at_time(upper_ts);
484                    before.map(|c| runs.push(c));
485                    beyond.map(|c| to_keep.push(c));
486                }
487                to_merge.append(&mut runs);
488            }
489        }
490
491        let merged = self.merge_cursors(to_merge);
492        let remains = to_keep
493            .into_iter()
494            .map(|c| c.try_unwrap(self.chunk_capacity).expect("unwrapable"))
495            .collect();
496
497        (merged, remains)
498    }
499
500    /// Merge the given cursors into one chain.
501    fn merge_cursors(&self, cursors: Vec<Cursor<D>>) -> Chain<D> {
502        match cursors.len() {
503            0 => Chain::new(self.chunk_capacity),
504            1 => {
505                let [cur] = cursors.try_into().unwrap();
506                cur.into_chain(self.chunk_capacity)
507            }
508            2 => {
509                let [a, b] = cursors.try_into().unwrap();
510                self.merge_2(a, b)
511            }
512            _ => self.merge_many(cursors),
513        }
514    }
515
516    /// Merge the given two cursors using a 2-way merge.
517    ///
518    /// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
519    fn merge_2(&self, cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
520        let mut rest1 = Some(cursor1);
521        let mut rest2 = Some(cursor2);
522        let mut merged = Chain::new(self.chunk_capacity);
523
524        loop {
525            match (rest1, rest2) {
526                (Some(c1), Some(c2)) => {
527                    let (d1, t1, r1) = c1.get();
528                    let (d2, t2, r2) = c2.get();
529
530                    match (t1, d1).cmp(&(t2, d2)) {
531                        Ordering::Less => {
532                            merged.push((d1, t1, r1));
533                            rest1 = c1.step();
534                            rest2 = Some(c2);
535                        }
536                        Ordering::Greater => {
537                            merged.push((d2, t2, r2));
538                            rest1 = Some(c1);
539                            rest2 = c2.step();
540                        }
541                        Ordering::Equal => {
542                            let r = r1 + r2;
543                            if r != Diff::ZERO {
544                                merged.push((d1, t1, r));
545                            }
546                            rest1 = c1.step();
547                            rest2 = c2.step();
548                        }
549                    }
550                }
551                (Some(c), None) | (None, Some(c)) => {
552                    merged.push_cursor(c);
553                    break;
554                }
555                (None, None) => break,
556            }
557        }
558
559        merged
560    }
561
562    /// Merge the given cursors using a k-way merge with a binary heap.
563    fn merge_many(&self, cursors: Vec<Cursor<D>>) -> Chain<D> {
564        let mut heap = MergeHeap::from_iter(cursors);
565        let mut merged = Chain::new(self.chunk_capacity);
566        while let Some(cursor1) = heap.pop() {
567            let (data, time, mut diff) = cursor1.get();
568
569            while let Some((cursor2, r)) = heap.pop_equal(data, time) {
570                diff += r;
571                if let Some(cursor2) = cursor2.step() {
572                    heap.push(cursor2);
573                }
574            }
575
576            if diff != Diff::ZERO {
577                merged.push((data, time, diff));
578            }
579            if let Some(cursor1) = cursor1.step() {
580                heap.push(cursor1);
581            }
582        }
583
584        merged
585    }
586}
587
588impl<D: Data> Drop for CorrectionV2<D> {
589    fn drop(&mut self) {
590        self.chains.iter().for_each(|c| self.log_chain_dropped(c));
591        self.update_metrics_inner(Default::default(), 0);
592    }
593}
594
595/// A chain of [`Chunk`]s containing updates.
596///
597/// All updates in a chain are sorted by (time, data) and consolidated.
598///
599/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
600/// keeping around empty chains.
601#[derive(Debug)]
602struct Chain<D: Data> {
603    /// The contained chunks.
604    chunks: Vec<Chunk<D>>,
605    /// The number of updates contained in all chunks, for efficient updating of metrics.
606    update_count: usize,
607    /// Cached value of the current chain size, for efficient updating of metrics.
608    cached_size: Option<SizeMetrics>,
609    /// The capacity of each contained [`Chunk`].
610    chunk_capacity: usize,
611}
612
613impl<D: Data> Chain<D> {
614    /// Construct an empty chain whose chunks have the given capacity.
615    fn new(chunk_capacity: usize) -> Self {
616        Self {
617            chunks: Default::default(),
618            update_count: 0,
619            cached_size: None,
620            chunk_capacity,
621        }
622    }
623
624    /// Return whether the chain is empty.
625    fn is_empty(&self) -> bool {
626        self.chunks.is_empty()
627    }
628
629    /// Return the length of the chain, in chunks.
630    fn len(&self) -> usize {
631        self.chunks.len()
632    }
633
634    /// Push an update onto the chain.
635    ///
636    /// The update must sort after all updates already in the chain, in (time, data)-order, to
637    /// ensure the chain remains sorted.
638    fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
639        let (d, t, r) = update;
640        let update = (d.borrow(), t, r);
641
642        debug_assert!(self.can_accept(update));
643
644        match self.chunks.last_mut() {
645            Some(c) if c.len() < self.chunk_capacity => c.push(update),
646            Some(_) | None => {
647                let chunk = Chunk::from_update(update, self.chunk_capacity);
648                self.push_chunk(chunk);
649            }
650        }
651
652        self.update_count += 1;
653        self.invalidate_cached_size();
654    }
655
656    /// Push a chunk onto the chain.
657    ///
658    /// All updates in the chunk must sort after all updates already in the chain, in
659    /// (time, data)-order, to ensure the chain remains sorted.
660    fn push_chunk(&mut self, chunk: Chunk<D>) {
661        debug_assert!(self.can_accept(chunk.first()));
662
663        self.update_count += chunk.len();
664        self.chunks.push(chunk);
665        self.invalidate_cached_size();
666    }
667
668    /// Push the updates produced by a cursor onto the chain.
669    ///
670    /// All updates produced by the cursor must sort after all updates already in the chain, in
671    /// (time, data)-order, to ensure the chain remains sorted.
672    fn push_cursor(&mut self, cursor: Cursor<D>) {
673        let mut rest = Some(cursor);
674        while let Some(cursor) = rest.take() {
675            let update = cursor.get();
676            self.push(update);
677            rest = cursor.step();
678        }
679    }
680
681    /// Return whether the chain can accept the given update.
682    ///
683    /// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
684    fn can_accept(&self, update: (&D, Timestamp, Diff)) -> bool {
685        self.last().is_none_or(|(dc, tc, _)| {
686            let (d, t, _) = update;
687            (tc, dc) < (t, d)
688        })
689    }
690
691    /// Return the first update in the chain, if any.
692    fn first(&self) -> Option<(&D, Timestamp, Diff)> {
693        self.chunks.first().map(|c| c.first())
694    }
695
696    /// Return the last update in the chain, if any.
697    fn last(&self) -> Option<(&D, Timestamp, Diff)> {
698        self.chunks.last().map(|c| c.last())
699    }
700
701    /// Convert the chain into a cursor over the contained updates.
702    fn into_cursor(self) -> Option<Cursor<D>> {
703        let chunks = self.chunks.into_iter().map(Rc::new).collect();
704        Cursor::new(chunks)
705    }
706
707    /// Return an iterator over the contained updates.
708    fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
709        self.chunks
710            .iter()
711            .flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
712    }
713
714    /// Return the size of the chain, for use in metrics.
715    fn get_size(&mut self) -> SizeMetrics {
716        // This operation can be expensive as it requires inspecting the individual chunks and
717        // their backing regions. We thus cache the result to hopefully avoid the cost most of the
718        // time.
719        if self.cached_size.is_none() {
720            let mut metrics = SizeMetrics::default();
721            for chunk in &mut self.chunks {
722                metrics += chunk.get_size();
723            }
724            self.cached_size = Some(metrics);
725        }
726
727        self.cached_size.unwrap()
728    }
729
730    /// Invalidate the cached chain size.
731    ///
732    /// This method must be called every time the size of the chain changed.
733    fn invalidate_cached_size(&mut self) {
734        self.cached_size = None;
735    }
736}
737
738impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
739    fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
740        for update in iter {
741            self.push(update);
742        }
743    }
744}
745
746/// A cursor over updates in a chain.
747///
748/// A cursor provides two guarantees:
749///  * Produced updates are ordered and consolidated.
750///  * A cursor always yields at least one update.
751///
752/// The second guarantee is enforced through the type system: Every method that steps a cursor
753/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
754/// over the last update.
755///
756/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
757/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
758/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
759#[derive(Clone, Debug)]
760struct Cursor<D: Data> {
761    /// The chunks from which updates can still be produced.
762    chunks: VecDeque<Rc<Chunk<D>>>,
763    /// The current offset into `chunks.front()`.
764    chunk_offset: usize,
765    /// An optional limit for the number of updates the cursor will produce.
766    limit: Option<usize>,
767    /// An optional overwrite for the timestamp of produced updates.
768    overwrite_ts: Option<Timestamp>,
769}
770
771impl<D: Data> Cursor<D> {
772    /// Construct a cursor over a list of chunks.
773    ///
774    /// Returns `None` if `chunks` is empty.
775    fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
776        if chunks.is_empty() {
777            return None;
778        }
779
780        Some(Self {
781            chunks,
782            chunk_offset: 0,
783            limit: None,
784            overwrite_ts: None,
785        })
786    }
787
788    /// Set a limit for the number of updates this cursor will produce.
789    ///
790    /// # Panics
791    ///
792    /// Panics if there is already a limit lower than the new one.
793    fn set_limit(mut self, limit: usize) -> Option<Self> {
794        assert!(self.limit.is_none_or(|l| l >= limit));
795
796        if limit == 0 {
797            return None;
798        }
799
800        // Release chunks made unreachable by the limit.
801        let mut count = 0;
802        let mut idx = 0;
803        let mut offset = self.chunk_offset;
804        while idx < self.chunks.len() && count < limit {
805            let chunk = &self.chunks[idx];
806            count += chunk.len() - offset;
807            idx += 1;
808            offset = 0;
809        }
810        self.chunks.truncate(idx);
811
812        if count > limit {
813            self.limit = Some(limit);
814        }
815
816        Some(self)
817    }
818
819    /// Get a reference to the current update.
820    fn get(&self) -> (&D, Timestamp, Diff) {
821        let chunk = self.get_chunk();
822        let (d, t, r) = chunk.index(self.chunk_offset);
823        let t = self.overwrite_ts.unwrap_or(t);
824        (d, t, r)
825    }
826
827    /// Get a reference to the current chunk.
828    fn get_chunk(&self) -> &Chunk<D> {
829        &self.chunks[0]
830    }
831
832    /// Step to the next update.
833    ///
834    /// Returns the stepped cursor, or `None` if the step was over the last update.
835    fn step(mut self) -> Option<Self> {
836        if self.chunk_offset == self.get_chunk().len() - 1 {
837            return self.skip_chunk().map(|(c, _)| c);
838        }
839
840        self.chunk_offset += 1;
841
842        if let Some(limit) = &mut self.limit {
843            *limit -= 1;
844            if *limit == 0 {
845                return None;
846            }
847        }
848
849        Some(self)
850    }
851
852    /// Skip the remainder of the current chunk.
853    ///
854    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
855    /// left after the skip.
856    fn skip_chunk(mut self) -> Option<(Self, usize)> {
857        let chunk = self.chunks.pop_front().expect("cursor invariant");
858
859        if self.chunks.is_empty() {
860            return None;
861        }
862
863        let skipped = chunk.len() - self.chunk_offset;
864        self.chunk_offset = 0;
865
866        if let Some(limit) = &mut self.limit {
867            if skipped >= *limit {
868                return None;
869            }
870            *limit -= skipped;
871        }
872
873        Some((self, skipped))
874    }
875
876    /// Skip all updates with times <= the given time.
877    ///
878    /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
879    /// left after the skip.
880    fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
881        if self.overwrite_ts.is_some_and(|ts| ts <= time) {
882            return None;
883        } else if self.get().1 > time {
884            return Some((self, 0));
885        }
886
887        let mut skipped = 0;
888
889        let new_offset = loop {
890            let chunk = self.get_chunk();
891            if let Some(index) = chunk.find_time_greater_than(time) {
892                break index;
893            }
894
895            let (cursor, count) = self.skip_chunk()?;
896            self = cursor;
897            skipped += count;
898        };
899
900        skipped += new_offset - self.chunk_offset;
901        self.chunk_offset = new_offset;
902
903        Some((self, skipped))
904    }
905
906    /// Advance all updates in this cursor by the given `since_ts`.
907    ///
908    /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
909    /// been advanced by `since_ts`.
910    fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
911        // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
912        // only need to advance the `overwrite_ts` by the `since_ts`.
913        if let Some(ts) = self.overwrite_ts {
914            if ts < since_ts {
915                self.overwrite_ts = Some(since_ts);
916            }
917            return vec![self];
918        }
919
920        // Otherwise we need to split the cursor so that each new cursor only yields runs of
921        // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
922        // this by splitting the cursor at each time <= `since_ts`.
923        let mut splits = Vec::new();
924        let mut remaining = Some(self);
925
926        while let Some(cursor) = remaining.take() {
927            let (_, time, _) = cursor.get();
928            if time >= since_ts {
929                splits.push(cursor);
930                break;
931            }
932
933            let mut current = cursor.clone();
934            if let Some((cursor, skipped)) = cursor.skip_time(time) {
935                remaining = Some(cursor);
936                current = current.set_limit(skipped).expect("skipped at least 1");
937            }
938            current.overwrite_ts = Some(since_ts);
939            splits.push(current);
940        }
941
942        splits
943    }
944
945    /// Split the cursor at the given time.
946    ///
947    /// Returns two cursors, the first yielding all updates at times < `time`, the second yielding
948    /// all updates at times >= `time`. Both can be `None` if they would be empty.
949    fn split_at_time(self, time: Timestamp) -> (Option<Self>, Option<Self>) {
950        let Some(skip_ts) = time.step_back() else {
951            return (None, Some(self));
952        };
953
954        let before = self.clone();
955        match self.skip_time(skip_ts) {
956            Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
957            None => (Some(before), None),
958        }
959    }
960
961    /// Drain the cursor into a [`Chain`].
962    ///
963    /// This reuses the underlying chunks if possible, and writes new ones otherwise.
964    fn into_chain(self, chunk_capacity: usize) -> Chain<D> {
965        match self.try_unwrap(chunk_capacity) {
966            Ok(chain) => chain,
967            Err((_, cursor)) => {
968                let mut chain = Chain::new(chunk_capacity);
969                chain.push_cursor(cursor);
970                chain
971            }
972        }
973    }
974
975    /// Attempt to unwrap the cursor into a [`Chain`].
976    ///
977    /// This operation efficiently reuses chunks by directly inserting them into the output chain
978    /// where possible.
979    ///
980    /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
981    /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
982    /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
983    /// chain by copying chunks rather than reusing them.
984    fn try_unwrap(self, chunk_capacity: usize) -> Result<Chain<D>, (&'static str, Self)> {
985        if self.limit.is_some() {
986            return Err(("cursor with limit", self));
987        }
988        if self.overwrite_ts.is_some() {
989            return Err(("cursor with overwrite_ts", self));
990        }
991        if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
992            return Err(("cursor on shared chunks", self));
993        }
994
995        let mut chain = Chain::new(chunk_capacity);
996        let mut remaining = Some(self);
997
998        // We might be partway through the first chunk, in which case we can't reuse it but need to
999        // allocate a new one to contain only the updates the cursor can still yield.
1000        while let Some(cursor) = remaining.take() {
1001            if cursor.chunk_offset == 0 {
1002                remaining = Some(cursor);
1003                break;
1004            }
1005            let update = cursor.get();
1006            chain.push(update);
1007            remaining = cursor.step();
1008        }
1009
1010        if let Some(cursor) = remaining {
1011            for chunk in cursor.chunks {
1012                let chunk = Rc::into_inner(chunk).expect("checked above");
1013                chain.push_chunk(chunk);
1014            }
1015        }
1016
1017        Ok(chain)
1018    }
1019}
1020
1021/// A non-empty chunk of updates, backed by a columnation region.
1022///
1023/// All updates in a chunk are sorted by (time, data) and consolidated.
1024///
1025/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
1026/// re-use chunk allocations. Unfortunately, the current `ColumnationStack`/`ChunkedStack` API doesn't
1027/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
1028/// spirit.
1029struct Chunk<D: Data> {
1030    /// The contained updates.
1031    data: ColumnationStack<(D, Timestamp, Diff)>,
1032    /// Cached value of the current chunk size, for efficient updating of metrics.
1033    cached_size: Option<SizeMetrics>,
1034}
1035
1036impl<D: Data> fmt::Debug for Chunk<D> {
1037    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1038        write!(f, "Chunk(<{}>)", self.len())
1039    }
1040}
1041
1042impl<D: Data> Chunk<D> {
1043    /// Create a new chunk containing a single update.
1044    fn from_update<DT: Borrow<D>>(update: (DT, Timestamp, Diff), chunk_capacity: usize) -> Self {
1045        let (d, t, r) = update;
1046
1047        let mut chunk = Self {
1048            data: ColumnationStack::with_capacity(chunk_capacity),
1049            cached_size: None,
1050        };
1051        chunk.data.copy_destructured(d.borrow(), &t, &r);
1052
1053        chunk
1054    }
1055
1056    /// Return the number of updates in the chunk.
1057    fn len(&self) -> usize {
1058        self.data.len()
1059    }
1060
1061    /// Return the update at the given index.
1062    ///
1063    /// # Panics
1064    ///
1065    /// Panics if the given index is not populated.
1066    fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
1067        let (d, t, r) = self.data.index(idx);
1068        (d, *t, *r)
1069    }
1070
1071    /// Return the first update in the chunk.
1072    fn first(&self) -> (&D, Timestamp, Diff) {
1073        self.index(0)
1074    }
1075
1076    /// Return the last update in the chunk.
1077    fn last(&self) -> (&D, Timestamp, Diff) {
1078        self.index(self.len() - 1)
1079    }
1080
1081    /// Push an update onto the chunk.
1082    fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
1083        let (d, t, r) = update;
1084        self.data.copy_destructured(d.borrow(), &t, &r);
1085
1086        self.invalidate_cached_size();
1087    }
1088
1089    /// Return the index of the first update at a time greater than `time`, or `None` if no such
1090    /// update exists.
1091    fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
1092        if self.last().1 <= time {
1093            return None;
1094        }
1095
1096        let mut lower = 0;
1097        let mut upper = self.len();
1098        while lower < upper {
1099            let idx = (lower + upper) / 2;
1100            if self.index(idx).1 > time {
1101                upper = idx;
1102            } else {
1103                lower = idx + 1;
1104            }
1105        }
1106
1107        Some(lower)
1108    }
1109
1110    /// Return the size of the chunk, for use in metrics.
1111    fn get_size(&mut self) -> SizeMetrics {
1112        if self.cached_size.is_none() {
1113            let mut size = 0;
1114            let mut capacity = 0;
1115            self.data.heap_size(|sz, cap| {
1116                size += sz;
1117                capacity += cap;
1118            });
1119            self.cached_size = Some(SizeMetrics {
1120                size,
1121                capacity,
1122                allocations: 1,
1123            });
1124        }
1125
1126        self.cached_size.unwrap()
1127    }
1128
1129    /// Invalidate the cached chunk size.
1130    ///
1131    /// This method must be called every time the size of the chunk changed.
1132    fn invalidate_cached_size(&mut self) {
1133        self.cached_size = None;
1134    }
1135}
1136
1137/// A buffer for staging updates before they are inserted into the sorted chains.
1138#[derive(Debug)]
1139struct Stage<D> {
1140    /// The contained updates.
1141    ///
1142    /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
1143    data: Vec<(D, Timestamp, Diff)>,
1144    /// Introspection logging.
1145    ///
1146    /// We want to report the number of records in the stage. To do so, we pretend that the stage
1147    /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
1148    /// re-created.
1149    logging: Option<ChannelLogging>,
1150}
1151
1152impl<D: Data> Stage<D> {
1153    fn new(logging: Option<ChannelLogging>, chunk_capacity: usize) -> Self {
1154        // For logging, we pretend the stage consists of a single chain.
1155        if let Some(logging) = &logging {
1156            logging.chain_created(0);
1157        }
1158
1159        Self {
1160            data: Vec::with_capacity(chunk_capacity),
1161            logging,
1162        }
1163    }
1164
1165    fn is_empty(&self) -> bool {
1166        self.data.is_empty()
1167    }
1168
1169    /// Insert a batch of updates, possibly producing a ready [`Chain`].
1170    fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
1171        if updates.is_empty() {
1172            return None;
1173        }
1174
1175        let prev_length = self.ilen();
1176
1177        // Determine how many chunks we can fill with the available updates.
1178        let update_count = self.data.len() + updates.len();
1179        let chunk_capacity = self.data.capacity();
1180        let chunk_count = update_count / chunk_capacity;
1181
1182        let mut new_updates = updates.drain(..);
1183
1184        // If we have enough shipable updates, collect them, consolidate, and build a chain.
1185        let maybe_chain = if chunk_count > 0 {
1186            let ship_count = chunk_count * chunk_capacity;
1187            let mut buffer = Vec::with_capacity(ship_count);
1188
1189            buffer.append(&mut self.data);
1190            while buffer.len() < ship_count {
1191                let update = new_updates.next().unwrap();
1192                buffer.push(update);
1193            }
1194
1195            consolidate(&mut buffer);
1196
1197            let mut chain = Chain::new(chunk_capacity);
1198            chain.extend(buffer);
1199            Some(chain)
1200        } else {
1201            None
1202        };
1203
1204        // Stage the remaining updates.
1205        self.data.extend(new_updates);
1206
1207        self.log_length_diff(self.ilen() - prev_length);
1208
1209        maybe_chain
1210    }
1211
1212    /// Flush all currently staged updates into a chain.
1213    fn flush(&mut self) -> Option<Chain<D>> {
1214        self.log_length_diff(-self.ilen());
1215
1216        consolidate(&mut self.data);
1217
1218        if self.data.is_empty() {
1219            return None;
1220        }
1221
1222        let chunk_capacity = self.data.capacity();
1223        let mut chain = Chain::new(chunk_capacity);
1224        chain.extend(self.data.drain(..));
1225        Some(chain)
1226    }
1227
1228    /// Advance the times of staged updates by the given `since`.
1229    fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1230        let Some(since_ts) = since.as_option() else {
1231            // If the since is the empty frontier, discard all updates.
1232            self.log_length_diff(-self.ilen());
1233            self.data.clear();
1234            return;
1235        };
1236
1237        for (_, time, _) in &mut self.data {
1238            *time = std::cmp::max(*time, *since_ts);
1239        }
1240    }
1241
1242    /// Return the size of the stage, for use in metrics.
1243    ///
1244    /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1245    /// under-estimates. That's fine as the stage should always be small.
1246    fn get_size(&self) -> SizeMetrics {
1247        SizeMetrics {
1248            size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1249            capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1250            allocations: 1,
1251        }
1252    }
1253
1254    /// Return the number of updates in the stage, as an `isize`.
1255    fn ilen(&self) -> isize {
1256        self.data.len().try_into().expect("must fit")
1257    }
1258
1259    fn log_length_diff(&self, diff: isize) {
1260        let Some(logging) = &self.logging else { return };
1261        if diff > 0 {
1262            let count = usize::try_from(diff).expect("must fit");
1263            logging.chain_created(count);
1264            logging.chain_dropped(0);
1265        } else if diff < 0 {
1266            let count = usize::try_from(-diff).expect("must fit");
1267            logging.chain_created(0);
1268            logging.chain_dropped(count);
1269        }
1270    }
1271}
1272
1273impl<D> Drop for Stage<D> {
1274    fn drop(&mut self) {
1275        if let Some(logging) = &self.logging {
1276            logging.chain_dropped(self.data.len());
1277        }
1278    }
1279}
1280
1281/// Sort and consolidate the given list of updates.
1282///
1283/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1284/// except that it sorts updates by (time, data) instead of (data, time).
1285fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1286    if updates.len() <= 1 {
1287        return;
1288    }
1289
1290    let diff = |update: &(_, _, Diff)| update.2;
1291
1292    updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1293
1294    let mut offset = 0;
1295    let mut accum = diff(&updates[0]);
1296
1297    for idx in 1..updates.len() {
1298        let this = &updates[idx];
1299        let prev = &updates[idx - 1];
1300        if this.0 == prev.0 && this.1 == prev.1 {
1301            accum += diff(&updates[idx]);
1302        } else {
1303            if accum != Diff::ZERO {
1304                updates.swap(offset, idx - 1);
1305                updates[offset].2 = accum;
1306                offset += 1;
1307            }
1308            accum = diff(&updates[idx]);
1309        }
1310    }
1311
1312    if accum != Diff::ZERO {
1313        let len = updates.len();
1314        updates.swap(offset, len - 1);
1315        updates[offset].2 = accum;
1316        offset += 1;
1317    }
1318
1319    updates.truncate(offset);
1320}
1321
1322/// A binary heap specialized for merging [`Cursor`]s.
1323struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1324
1325impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1326    fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1327        let inner = cursors.into_iter().map(MergeCursor).collect();
1328        Self(inner)
1329    }
1330}
1331
1332impl<D: Data> MergeHeap<D> {
1333    /// Pop the next cursor (the one yielding the least update) from the heap.
1334    fn pop(&mut self) -> Option<Cursor<D>> {
1335        self.0.pop().map(|MergeCursor(c)| c)
1336    }
1337
1338    /// Pop the next cursor from the heap, provided the data and time of its current update are
1339    /// equal to the given values.
1340    ///
1341    /// Returns both the cursor and the diff corresponding to `data` and `time`.
1342    fn pop_equal(&mut self, data: &D, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1343        let MergeCursor(cursor) = self.0.peek()?;
1344        let (d, t, r) = cursor.get();
1345        if d == data && t == time {
1346            let cursor = self.pop().expect("checked above");
1347            Some((cursor, r))
1348        } else {
1349            None
1350        }
1351    }
1352
1353    /// Push a cursor onto the heap.
1354    fn push(&mut self, cursor: Cursor<D>) {
1355        self.0.push(MergeCursor(cursor));
1356    }
1357}
1358
1359/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1360///
1361/// Implements the cursor ordering required for merging cursors.
1362struct MergeCursor<D: Data>(Cursor<D>);
1363
1364impl<D: Data> PartialEq for MergeCursor<D> {
1365    fn eq(&self, other: &Self) -> bool {
1366        self.cmp(other).is_eq()
1367    }
1368}
1369
1370impl<D: Data> Eq for MergeCursor<D> {}
1371
1372impl<D: Data> PartialOrd for MergeCursor<D> {
1373    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1374        Some(self.cmp(other))
1375    }
1376}
1377
1378impl<D: Data> Ord for MergeCursor<D> {
1379    fn cmp(&self, other: &Self) -> Ordering {
1380        let (d1, t1, _) = self.0.get();
1381        let (d2, t2, _) = other.0.get();
1382        (t1, d1).cmp(&(t2, d2)).reverse()
1383    }
1384}