mz_compute/sink/correction_v2.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//! * insert new updates
15//! * advance the compaction frontier (called `since`)
16//! * obtain an iterator over consolidated updates before some `upper`
17//! * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of [`Chain`]s containing [`Chunk`]s of stashed updates. Each
32//! [`Chunk`] is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! ```text
36//! chain[0] | chain[1] | chain[2]
37//! | |
38//! chunk[0] | chunk[0] | chunk[0]
39//! (a, 1, +1) | (a, 1, +1) | (d, 3, +1)
40//! (b, 1, +1) | (b, 2, -1) | (d, 4, -1)
41//! chunk[1] | chunk[1] |
42//! (c, 1, +1) | (c, 2, -2) |
43//! (a, 2, -1) | (c, 4, -1) |
44//! chunk[2] | |
45//! (b, 2, +1) | |
46//! (c, 2, +1) | |
47//! chunk[3] | |
48//! (b, 3, -1) | |
49//! (c, 3, +1) | |
50//! ```
51//!
52//! The "chain invariant" states that each chain has at least `chain_proportionality` times as
53//! many chunks as the next one. This means that chain sizes will often be powers of
54//! `chain_proportionality`, but they don't have to be. For example, for a proportionality of 2,
55//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
56//!
57//! Choosing the `chain_proportionality` value allows tuning the trade-off between memory and CPU
58//! resources required to maintain corrections. A higher proportionality forces more frequent chain
59//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
60//!
61//! ## Inserting Updates
62//!
63//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
64//! list until the chain invariant is restored.
65//!
66//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
67//! chunk, copying the update in, and then likely merging with an existing chain to restore the
68//! chain invariant. If updates trickle in in small batches, this can cause a considerable
69//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
70//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
71//! [`Chunk`], they are sorted an inserted into the chains.
72//!
73//! The insert operation has an amortized complexity of O(log N), with N being the current number
74//! of updates stored.
75//!
76//! ## Retrieving Consolidated Updates
77//!
78//! Retrieving consolidated updates before a given `upper` works by first consolidating all updates
79//! at times before the `upper`, merging them all into one chain, then returning an iterator over
80//! that chain.
81//!
82//! Because each chain contains updates ordered by time first, consolidation of all updates before
83//! an `upper` is possible without touching updates at future times. It works by merging the chains
84//! only up to the `upper`, producing a merged chain containing consolidated times before the
85//! `upper` and leaving behind the chain parts containing later times. The complexity of this
86//! operation is O(U log K), with U being the number of updates before `upper` and K the number
87//! of chains.
88//!
89//! Unfortunately, performing consolidation as described above can break the chain invariant and we
90//! might need to restore it by merging chains, including ones containing future updates. This is
91//! something that would be great to fix! In the meantime the hope is that in steady state it
92//! doesn't matter too much because either there are no future retractions and U is approximately
93//! equal to N, or the amount of future retractions is much larger than the amount of current
94//! changes, in which case removing the current changes has a good chance of leaving the chain
95//! invariant intact.
96//!
97//! ## Merging Chains
98//!
99//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
100//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
101//! complexity of a merge of K chains containing N updates is O(N log K).
102//!
103//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
104//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
105//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
106//!
107//! For example, consider these two chains, assuming `since = [2]`:
108//! chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
109//! chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
110//! After time advancement, the chains look like this:
111//! chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
112//! chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
113//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
114//! neither sorted nor consolidated.
115//!
116//! Instead we need to merge sub-chains, one for each distinct time that's before or at the
117//! `since`. Each of these sub-chains retains the (time, data) ordering after the time advancement
118//! to `since`, so merging those yields the expected result.
119//!
120//! For the above example, the chains we would merge are:
121//! chain 1.a: [(c, 2, +1)]
122//! chain 1.b: [(b, 2, -1), (a, 3, -1)]
123//! chain 2.a: [(b, 2, +1)],
124//! chain 2.b: [(a, 2, +1), (c, 2, -1)]
125
126use std::borrow::Borrow;
127use std::cmp::Ordering;
128use std::collections::{BinaryHeap, VecDeque};
129use std::fmt;
130use std::rc::Rc;
131
132use columnation::Columnation;
133use differential_dataflow::trace::implementations::BatchContainer;
134use mz_ore::cast::CastLossy;
135use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
136use mz_repr::{Diff, Timestamp};
137use mz_timely_util::columnation::ColumnationStack;
138use timely::PartialOrder;
139use timely::progress::Antichain;
140
141use crate::sink::correction::{Logging, SizeMetrics};
142
143/// Convenient alias for use in data trait bounds.
144pub trait Data: differential_dataflow::Data + Columnation {}
145impl<D: differential_dataflow::Data + Columnation> Data for D {}
146
147/// A data structure used to store corrections in the MV sink implementation.
148///
149/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
150/// allowing their memory to be transparently spilled to disk.
151#[derive(Debug)]
152pub(super) struct CorrectionV2<D: Data> {
153 /// Chains containing sorted updates.
154 chains: Vec<Chain<D>>,
155 /// A staging area for updates, to speed up small inserts.
156 stage: Stage<D>,
157 /// The frontier by which all contained times are advanced.
158 since: Antichain<Timestamp>,
159 /// The size factor of subsequent chains required by the chain invariant.
160 chain_proportionality: f64,
161 /// The capacity of each [`Chunk`].
162 chunk_capacity: usize,
163
164 /// Total count of updates in the correction buffer.
165 ///
166 /// Tracked to compute deltas in `update_metrics`.
167 prev_update_count: usize,
168 /// Total heap size used by the correction buffer.
169 ///
170 /// Tracked to compute deltas in `update_metrics`.
171 prev_size: SizeMetrics,
172 /// Global persist sink metrics.
173 metrics: SinkMetrics,
174 /// Per-worker persist sink metrics.
175 worker_metrics: SinkWorkerMetrics,
176 /// Introspection logging.
177 logging: Option<Logging>,
178}
179
180impl<D: Data> CorrectionV2<D> {
181 /// Construct a new [`CorrectionV2`] instance.
182 pub fn new(
183 metrics: SinkMetrics,
184 worker_metrics: SinkWorkerMetrics,
185 logging: Option<Logging>,
186 chain_proportionality: f64,
187 chunk_size: usize,
188 ) -> Self {
189 let update_size = std::mem::size_of::<(D, Timestamp, Diff)>();
190 let chunk_capacity = std::cmp::max(chunk_size / update_size, 1);
191
192 Self {
193 chains: Default::default(),
194 stage: Stage::new(logging.clone(), chunk_capacity),
195 since: Antichain::from_elem(Timestamp::MIN),
196 chain_proportionality,
197 chunk_capacity,
198 prev_update_count: 0,
199 prev_size: Default::default(),
200 metrics,
201 worker_metrics,
202 logging,
203 }
204 }
205
206 /// Insert a batch of updates.
207 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
208 let Some(since_ts) = self.since.as_option() else {
209 // If the since is the empty frontier, discard all updates.
210 updates.clear();
211 return;
212 };
213
214 for (_, time, _) in &mut *updates {
215 *time = std::cmp::max(*time, *since_ts);
216 }
217
218 self.insert_inner(updates);
219 }
220
221 /// Insert a batch of updates, after negating their diffs.
222 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
223 let Some(since_ts) = self.since.as_option() else {
224 // If the since is the empty frontier, discard all updates.
225 updates.clear();
226 return;
227 };
228
229 for (_, time, diff) in &mut *updates {
230 *time = std::cmp::max(*time, *since_ts);
231 *diff = -*diff;
232 }
233
234 self.insert_inner(updates);
235 }
236
237 /// Insert a batch of updates.
238 ///
239 /// All times are expected to be >= the `since`.
240 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
241 debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
242
243 if let Some(chain) = self.stage.insert(updates) {
244 self.log_chain_created(&chain);
245 self.chains.push(chain);
246
247 // Restore the chain invariant.
248 let prop = self.chain_proportionality;
249 let merge_needed = |chains: &[Chain<_>]| match chains {
250 [.., prev, last] => {
251 let last_len = f64::cast_lossy(last.len());
252 let prev_len = f64::cast_lossy(prev.len());
253 last_len * prop > prev_len
254 }
255 _ => false,
256 };
257
258 while merge_needed(&self.chains) {
259 let a = self.chains.pop().unwrap();
260 let b = self.chains.pop().unwrap();
261 self.log_chain_dropped(&a);
262 self.log_chain_dropped(&b);
263
264 let merged = self.merge_chains([a, b]);
265 self.log_chain_created(&merged);
266 self.chains.push(merged);
267 }
268 };
269
270 self.update_metrics();
271 }
272
273 /// Return consolidated updates before the given `upper`.
274 pub fn updates_before<'a>(
275 &'a mut self,
276 upper: &Antichain<Timestamp>,
277 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + 'a {
278 let mut result = None;
279
280 if !PartialOrder::less_than(&self.since, upper) {
281 // All contained updates are beyond the upper.
282 return result.into_iter().flatten();
283 }
284
285 self.consolidate_before(upper);
286
287 // There is at most one chain that contains updates before `upper` now.
288 result = self
289 .chains
290 .iter()
291 .find(|c| c.first().is_some_and(|(_, t, _)| !upper.less_equal(&t)))
292 .map(move |c| {
293 let upper = upper.clone();
294 c.iter().take_while(move |(_, t, _)| !upper.less_equal(t))
295 });
296
297 result.into_iter().flatten()
298 }
299
300 /// Consolidate all updates before the given `upper`.
301 ///
302 /// Once this method returns, all remaining updates before `upper` are contained in a single
303 /// chain. Note that this chain might also contain updates beyond `upper` though!
304 fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
305 if self.chains.is_empty() && self.stage.is_empty() {
306 return;
307 }
308
309 let mut chains = std::mem::take(&mut self.chains);
310
311 // To keep things simple, we log the dropping of all chains here and log the creation of
312 // all remaining chains at the end. This causes more event churn than necessary, but the
313 // consolidated result is correct.
314 chains.iter().for_each(|c| self.log_chain_dropped(c));
315
316 chains.extend(self.stage.flush());
317
318 if chains.is_empty() {
319 // We can only get here if the stage contained updates but they all got consolidated
320 // away by `flush`, so we need to update the metrics before we return.
321 self.update_metrics();
322 return;
323 }
324
325 let (merged, remains) = self.merge_chains_up_to(chains, upper);
326
327 self.chains = remains;
328 if !merged.is_empty() {
329 // We put the merged chain at the end, assuming that its contents are likely to
330 // consolidate with retractions that will arrive soon.
331 self.chains.push(merged);
332 }
333
334 // Restore the chain invariant.
335 //
336 // This part isn't great. We've taken great care so far to only look at updates with times
337 // before `upper`, but now we might end up merging all chains anyway in the worst case.
338 // There might be something smarter we could do to avoid merging as much as possible. For
339 // example, we could consider sorting chains by length first, or inspect the contained
340 // times and prefer merging chains that have a chance at consolidating with one another.
341 let mut i = self.chains.len().saturating_sub(1);
342 while i > 0 {
343 let needs_merge = self.chains.get(i).is_some_and(|a| {
344 let b = &self.chains[i - 1];
345 let a_len = f64::cast_lossy(a.len());
346 let b_len = f64::cast_lossy(b.len());
347 a_len * self.chain_proportionality > b_len
348 });
349 if needs_merge {
350 let a = self.chains.remove(i);
351 let b = std::mem::replace(&mut self.chains[i - 1], Chain::new(0));
352 let merged = self.merge_chains([a, b]);
353 self.chains[i - 1] = merged;
354 } else {
355 // Only advance the index if we didn't merge. A merge can reduce the size of the
356 // chain at `i - 1`, causing an violation of the chain invariant with the next
357 // chain, so we might need to merge the two before proceeding to lower indexes.
358 i -= 1;
359 }
360 }
361
362 self.chains.iter().for_each(|c| self.log_chain_created(c));
363 self.update_metrics();
364 }
365
366 /// Advance the since frontier.
367 ///
368 /// # Panics
369 ///
370 /// Panics if the given `since` is less than the current since frontier.
371 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
372 assert!(PartialOrder::less_equal(&self.since, &since));
373 self.stage.advance_times(&since);
374 self.since = since;
375 }
376
377 /// Consolidate all updates at the current `since`.
378 pub fn consolidate_at_since(&mut self) {
379 let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
380 if let Some(upper_ts) = upper_ts {
381 let upper = Antichain::from_elem(upper_ts);
382 self.consolidate_before(&upper);
383 }
384 }
385
386 fn log_chain_created(&self, chain: &Chain<D>) {
387 if let Some(logging) = &self.logging {
388 logging.chain_created(chain.update_count);
389 }
390 }
391
392 fn log_chain_dropped(&self, chain: &Chain<D>) {
393 if let Some(logging) = &self.logging {
394 logging.chain_dropped(chain.update_count);
395 }
396 }
397
398 /// Update persist sink metrics.
399 fn update_metrics(&mut self) {
400 let mut new_size = self.stage.get_size();
401 let mut new_length = self.stage.data.len();
402 for chain in &mut self.chains {
403 new_size += chain.get_size();
404 new_length += chain.update_count;
405 }
406
407 self.update_metrics_inner(new_size, new_length);
408 }
409
410 /// Update persist sink metrics to the given new size and length.
411 fn update_metrics_inner(&mut self, new_size: SizeMetrics, new_length: usize) {
412 let old_size = self.prev_size;
413 let old_length = self.prev_update_count;
414 let len_delta = UpdateDelta::new(new_length, old_length);
415 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
416 self.metrics
417 .report_correction_update_deltas(len_delta, cap_delta);
418 self.worker_metrics
419 .report_correction_update_totals(new_length, new_size.capacity);
420
421 if let Some(logging) = &self.logging {
422 let i = |x: usize| isize::try_from(x).expect("must fit");
423 logging.report_size_diff(i(new_size.size) - i(old_size.size));
424 logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
425 logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
426 }
427
428 self.prev_size = new_size;
429 self.prev_update_count = new_length;
430 }
431
432 /// Merge the given chains, advancing times by the current `since` in the process.
433 fn merge_chains(&self, chains: impl IntoIterator<Item = Chain<D>>) -> Chain<D> {
434 let Some(&since_ts) = self.since.as_option() else {
435 return Chain::new(self.chunk_capacity);
436 };
437
438 let mut to_merge = Vec::new();
439 for chain in chains {
440 if let Some(cursor) = chain.into_cursor() {
441 let mut runs = cursor.advance_by(since_ts);
442 to_merge.append(&mut runs);
443 }
444 }
445
446 self.merge_cursors(to_merge)
447 }
448
449 /// Merge the given chains, advancing times by the current `since` in the process, but only up
450 /// to the given `upper`.
451 ///
452 /// Returns the merged chain and a list of non-empty remainders of the input chains.
453 fn merge_chains_up_to(
454 &self,
455 chains: Vec<Chain<D>>,
456 upper: &Antichain<Timestamp>,
457 ) -> (Chain<D>, Vec<Chain<D>>) {
458 let Some(&since_ts) = self.since.as_option() else {
459 return (Chain::new(self.chunk_capacity), Vec::new());
460 };
461 let Some(&upper_ts) = upper.as_option() else {
462 let merged = self.merge_chains(chains);
463 return (merged, Vec::new());
464 };
465
466 if since_ts >= upper_ts {
467 // After advancing by `since` there will be no updates before `upper`.
468 return (Chain::new(self.chunk_capacity), chains);
469 }
470
471 let mut to_merge = Vec::new();
472 let mut to_keep = Vec::new();
473 for chain in chains {
474 if let Some(cursor) = chain.into_cursor() {
475 let mut runs = cursor.advance_by(since_ts);
476 if let Some(last) = runs.pop() {
477 let (before, beyond) = last.split_at_time(upper_ts);
478 before.map(|c| runs.push(c));
479 beyond.map(|c| to_keep.push(c));
480 }
481 to_merge.append(&mut runs);
482 }
483 }
484
485 let merged = self.merge_cursors(to_merge);
486 let remains = to_keep
487 .into_iter()
488 .map(|c| c.try_unwrap(self.chunk_capacity).expect("unwrapable"))
489 .collect();
490
491 (merged, remains)
492 }
493
494 /// Merge the given cursors into one chain.
495 fn merge_cursors(&self, cursors: Vec<Cursor<D>>) -> Chain<D> {
496 match cursors.len() {
497 0 => Chain::new(self.chunk_capacity),
498 1 => {
499 let [cur] = cursors.try_into().unwrap();
500 cur.into_chain(self.chunk_capacity)
501 }
502 2 => {
503 let [a, b] = cursors.try_into().unwrap();
504 self.merge_2(a, b)
505 }
506 _ => self.merge_many(cursors),
507 }
508 }
509
510 /// Merge the given two cursors using a 2-way merge.
511 ///
512 /// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
513 fn merge_2(&self, cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
514 let mut rest1 = Some(cursor1);
515 let mut rest2 = Some(cursor2);
516 let mut merged = Chain::new(self.chunk_capacity);
517
518 loop {
519 match (rest1, rest2) {
520 (Some(c1), Some(c2)) => {
521 let (d1, t1, r1) = c1.get();
522 let (d2, t2, r2) = c2.get();
523
524 match (t1, d1).cmp(&(t2, d2)) {
525 Ordering::Less => {
526 merged.push((d1, t1, r1));
527 rest1 = c1.step();
528 rest2 = Some(c2);
529 }
530 Ordering::Greater => {
531 merged.push((d2, t2, r2));
532 rest1 = Some(c1);
533 rest2 = c2.step();
534 }
535 Ordering::Equal => {
536 let r = r1 + r2;
537 if r != Diff::ZERO {
538 merged.push((d1, t1, r));
539 }
540 rest1 = c1.step();
541 rest2 = c2.step();
542 }
543 }
544 }
545 (Some(c), None) | (None, Some(c)) => {
546 merged.push_cursor(c);
547 break;
548 }
549 (None, None) => break,
550 }
551 }
552
553 merged
554 }
555
556 /// Merge the given cursors using a k-way merge with a binary heap.
557 fn merge_many(&self, cursors: Vec<Cursor<D>>) -> Chain<D> {
558 let mut heap = MergeHeap::from_iter(cursors);
559 let mut merged = Chain::new(self.chunk_capacity);
560 while let Some(cursor1) = heap.pop() {
561 let (data, time, mut diff) = cursor1.get();
562
563 while let Some((cursor2, r)) = heap.pop_equal(data, time) {
564 diff += r;
565 if let Some(cursor2) = cursor2.step() {
566 heap.push(cursor2);
567 }
568 }
569
570 if diff != Diff::ZERO {
571 merged.push((data, time, diff));
572 }
573 if let Some(cursor1) = cursor1.step() {
574 heap.push(cursor1);
575 }
576 }
577
578 merged
579 }
580}
581
582impl<D: Data> Drop for CorrectionV2<D> {
583 fn drop(&mut self) {
584 self.chains.iter().for_each(|c| self.log_chain_dropped(c));
585 self.update_metrics_inner(Default::default(), 0);
586 }
587}
588
589/// A chain of [`Chunk`]s containing updates.
590///
591/// All updates in a chain are sorted by (time, data) and consolidated.
592///
593/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
594/// keeping around empty chains.
595#[derive(Debug)]
596struct Chain<D: Data> {
597 /// The contained chunks.
598 chunks: Vec<Chunk<D>>,
599 /// The number of updates contained in all chunks, for efficient updating of metrics.
600 update_count: usize,
601 /// Cached value of the current chain size, for efficient updating of metrics.
602 cached_size: Option<SizeMetrics>,
603 /// The capacity of each contained [`Chunk`].
604 chunk_capacity: usize,
605}
606
607impl<D: Data> Chain<D> {
608 /// Construct an empty chain whose chunks have the given capacity.
609 fn new(chunk_capacity: usize) -> Self {
610 Self {
611 chunks: Default::default(),
612 update_count: 0,
613 cached_size: None,
614 chunk_capacity,
615 }
616 }
617
618 /// Return whether the chain is empty.
619 fn is_empty(&self) -> bool {
620 self.chunks.is_empty()
621 }
622
623 /// Return the length of the chain, in chunks.
624 fn len(&self) -> usize {
625 self.chunks.len()
626 }
627
628 /// Push an update onto the chain.
629 ///
630 /// The update must sort after all updates already in the chain, in (time, data)-order, to
631 /// ensure the chain remains sorted.
632 fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
633 let (d, t, r) = update;
634 let update = (d.borrow(), t, r);
635
636 debug_assert!(self.can_accept(update));
637
638 match self.chunks.last_mut() {
639 Some(c) if c.len() < self.chunk_capacity => c.push(update),
640 Some(_) | None => {
641 let chunk = Chunk::from_update(update, self.chunk_capacity);
642 self.push_chunk(chunk);
643 }
644 }
645
646 self.update_count += 1;
647 self.invalidate_cached_size();
648 }
649
650 /// Push a chunk onto the chain.
651 ///
652 /// All updates in the chunk must sort after all updates already in the chain, in
653 /// (time, data)-order, to ensure the chain remains sorted.
654 fn push_chunk(&mut self, chunk: Chunk<D>) {
655 debug_assert!(self.can_accept(chunk.first()));
656
657 self.update_count += chunk.len();
658 self.chunks.push(chunk);
659 self.invalidate_cached_size();
660 }
661
662 /// Push the updates produced by a cursor onto the chain.
663 ///
664 /// All updates produced by the cursor must sort after all updates already in the chain, in
665 /// (time, data)-order, to ensure the chain remains sorted.
666 fn push_cursor(&mut self, cursor: Cursor<D>) {
667 let mut rest = Some(cursor);
668 while let Some(cursor) = rest.take() {
669 let update = cursor.get();
670 self.push(update);
671 rest = cursor.step();
672 }
673 }
674
675 /// Return whether the chain can accept the given update.
676 ///
677 /// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
678 fn can_accept(&self, update: (&D, Timestamp, Diff)) -> bool {
679 self.last().is_none_or(|(dc, tc, _)| {
680 let (d, t, _) = update;
681 (tc, dc) < (t, d)
682 })
683 }
684
685 /// Return the first update in the chain, if any.
686 fn first(&self) -> Option<(&D, Timestamp, Diff)> {
687 self.chunks.first().map(|c| c.first())
688 }
689
690 /// Return the last update in the chain, if any.
691 fn last(&self) -> Option<(&D, Timestamp, Diff)> {
692 self.chunks.last().map(|c| c.last())
693 }
694
695 /// Convert the chain into a cursor over the contained updates.
696 fn into_cursor(self) -> Option<Cursor<D>> {
697 let chunks = self.chunks.into_iter().map(Rc::new).collect();
698 Cursor::new(chunks)
699 }
700
701 /// Return an iterator over the contained updates.
702 fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
703 self.chunks
704 .iter()
705 .flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
706 }
707
708 /// Return the size of the chain, for use in metrics.
709 fn get_size(&mut self) -> SizeMetrics {
710 // This operation can be expensive as it requires inspecting the individual chunks and
711 // their backing regions. We thus cache the result to hopefully avoid the cost most of the
712 // time.
713 if self.cached_size.is_none() {
714 let mut metrics = SizeMetrics::default();
715 for chunk in &mut self.chunks {
716 metrics += chunk.get_size();
717 }
718 self.cached_size = Some(metrics);
719 }
720
721 self.cached_size.unwrap()
722 }
723
724 /// Invalidate the cached chain size.
725 ///
726 /// This method must be called every time the size of the chain changed.
727 fn invalidate_cached_size(&mut self) {
728 self.cached_size = None;
729 }
730}
731
732impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
733 fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
734 for update in iter {
735 self.push(update);
736 }
737 }
738}
739
740/// A cursor over updates in a chain.
741///
742/// A cursor provides two guarantees:
743/// * Produced updates are ordered and consolidated.
744/// * A cursor always yields at least one update.
745///
746/// The second guarantee is enforced through the type system: Every method that steps a cursor
747/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
748/// over the last update.
749///
750/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
751/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
752/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
753#[derive(Clone, Debug)]
754struct Cursor<D: Data> {
755 /// The chunks from which updates can still be produced.
756 chunks: VecDeque<Rc<Chunk<D>>>,
757 /// The current offset into `chunks.front()`.
758 chunk_offset: usize,
759 /// An optional limit for the number of updates the cursor will produce.
760 limit: Option<usize>,
761 /// An optional overwrite for the timestamp of produced updates.
762 overwrite_ts: Option<Timestamp>,
763}
764
765impl<D: Data> Cursor<D> {
766 /// Construct a cursor over a list of chunks.
767 ///
768 /// Returns `None` if `chunks` is empty.
769 fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
770 if chunks.is_empty() {
771 return None;
772 }
773
774 Some(Self {
775 chunks,
776 chunk_offset: 0,
777 limit: None,
778 overwrite_ts: None,
779 })
780 }
781
782 /// Set a limit for the number of updates this cursor will produce.
783 ///
784 /// # Panics
785 ///
786 /// Panics if there is already a limit lower than the new one.
787 fn set_limit(mut self, limit: usize) -> Option<Self> {
788 assert!(self.limit.is_none_or(|l| l >= limit));
789
790 if limit == 0 {
791 return None;
792 }
793
794 // Release chunks made unreachable by the limit.
795 let mut count = 0;
796 let mut idx = 0;
797 let mut offset = self.chunk_offset;
798 while idx < self.chunks.len() && count < limit {
799 let chunk = &self.chunks[idx];
800 count += chunk.len() - offset;
801 idx += 1;
802 offset = 0;
803 }
804 self.chunks.truncate(idx);
805
806 if count > limit {
807 self.limit = Some(limit);
808 }
809
810 Some(self)
811 }
812
813 /// Get a reference to the current update.
814 fn get(&self) -> (&D, Timestamp, Diff) {
815 let chunk = self.get_chunk();
816 let (d, t, r) = chunk.index(self.chunk_offset);
817 let t = self.overwrite_ts.unwrap_or(t);
818 (d, t, r)
819 }
820
821 /// Get a reference to the current chunk.
822 fn get_chunk(&self) -> &Chunk<D> {
823 &self.chunks[0]
824 }
825
826 /// Step to the next update.
827 ///
828 /// Returns the stepped cursor, or `None` if the step was over the last update.
829 fn step(mut self) -> Option<Self> {
830 if self.chunk_offset == self.get_chunk().len() - 1 {
831 return self.skip_chunk().map(|(c, _)| c);
832 }
833
834 self.chunk_offset += 1;
835
836 if let Some(limit) = &mut self.limit {
837 *limit -= 1;
838 if *limit == 0 {
839 return None;
840 }
841 }
842
843 Some(self)
844 }
845
846 /// Skip the remainder of the current chunk.
847 ///
848 /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
849 /// left after the skip.
850 fn skip_chunk(mut self) -> Option<(Self, usize)> {
851 let chunk = self.chunks.pop_front().expect("cursor invariant");
852
853 if self.chunks.is_empty() {
854 return None;
855 }
856
857 let skipped = chunk.len() - self.chunk_offset;
858 self.chunk_offset = 0;
859
860 if let Some(limit) = &mut self.limit {
861 if skipped >= *limit {
862 return None;
863 }
864 *limit -= skipped;
865 }
866
867 Some((self, skipped))
868 }
869
870 /// Skip all updates with times <= the given time.
871 ///
872 /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
873 /// left after the skip.
874 fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
875 if self.overwrite_ts.is_some_and(|ts| ts <= time) {
876 return None;
877 } else if self.get().1 > time {
878 return Some((self, 0));
879 }
880
881 let mut skipped = 0;
882
883 let new_offset = loop {
884 let chunk = self.get_chunk();
885 if let Some(index) = chunk.find_time_greater_than(time) {
886 break index;
887 }
888
889 let (cursor, count) = self.skip_chunk()?;
890 self = cursor;
891 skipped += count;
892 };
893
894 skipped += new_offset - self.chunk_offset;
895 self.chunk_offset = new_offset;
896
897 Some((self, skipped))
898 }
899
900 /// Advance all updates in this cursor by the given `since_ts`.
901 ///
902 /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
903 /// been advanced by `since_ts`.
904 fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
905 // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
906 // only need to advance the `overwrite_ts` by the `since_ts`.
907 if let Some(ts) = self.overwrite_ts {
908 if ts < since_ts {
909 self.overwrite_ts = Some(since_ts);
910 }
911 return vec![self];
912 }
913
914 // Otherwise we need to split the cursor so that each new cursor only yields runs of
915 // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
916 // this by splitting the cursor at each time <= `since_ts`.
917 let mut splits = Vec::new();
918 let mut remaining = Some(self);
919
920 while let Some(cursor) = remaining.take() {
921 let (_, time, _) = cursor.get();
922 if time >= since_ts {
923 splits.push(cursor);
924 break;
925 }
926
927 let mut current = cursor.clone();
928 if let Some((cursor, skipped)) = cursor.skip_time(time) {
929 remaining = Some(cursor);
930 current = current.set_limit(skipped).expect("skipped at least 1");
931 }
932 current.overwrite_ts = Some(since_ts);
933 splits.push(current);
934 }
935
936 splits
937 }
938
939 /// Split the cursor at the given time.
940 ///
941 /// Returns two cursors, the first yielding all updates at times < `time`, the second yielding
942 /// all updates at times >= `time`. Both can be `None` if they would be empty.
943 fn split_at_time(self, time: Timestamp) -> (Option<Self>, Option<Self>) {
944 let Some(skip_ts) = time.step_back() else {
945 return (None, Some(self));
946 };
947
948 let before = self.clone();
949 match self.skip_time(skip_ts) {
950 Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
951 None => (Some(before), None),
952 }
953 }
954
955 /// Drain the cursor into a [`Chain`].
956 ///
957 /// This reuses the underlying chunks if possible, and writes new ones otherwise.
958 fn into_chain(self, chunk_capacity: usize) -> Chain<D> {
959 match self.try_unwrap(chunk_capacity) {
960 Ok(chain) => chain,
961 Err((_, cursor)) => {
962 let mut chain = Chain::new(chunk_capacity);
963 chain.push_cursor(cursor);
964 chain
965 }
966 }
967 }
968
969 /// Attempt to unwrap the cursor into a [`Chain`].
970 ///
971 /// This operation efficiently reuses chunks by directly inserting them into the output chain
972 /// where possible.
973 ///
974 /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
975 /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
976 /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
977 /// chain by copying chunks rather than reusing them.
978 fn try_unwrap(self, chunk_capacity: usize) -> Result<Chain<D>, (&'static str, Self)> {
979 if self.limit.is_some() {
980 return Err(("cursor with limit", self));
981 }
982 if self.overwrite_ts.is_some() {
983 return Err(("cursor with overwrite_ts", self));
984 }
985 if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
986 return Err(("cursor on shared chunks", self));
987 }
988
989 let mut chain = Chain::new(chunk_capacity);
990 let mut remaining = Some(self);
991
992 // We might be partway through the first chunk, in which case we can't reuse it but need to
993 // allocate a new one to contain only the updates the cursor can still yield.
994 while let Some(cursor) = remaining.take() {
995 if cursor.chunk_offset == 0 {
996 remaining = Some(cursor);
997 break;
998 }
999 let update = cursor.get();
1000 chain.push(update);
1001 remaining = cursor.step();
1002 }
1003
1004 if let Some(cursor) = remaining {
1005 for chunk in cursor.chunks {
1006 let chunk = Rc::into_inner(chunk).expect("checked above");
1007 chain.push_chunk(chunk);
1008 }
1009 }
1010
1011 Ok(chain)
1012 }
1013}
1014
1015/// A non-empty chunk of updates, backed by a columnation region.
1016///
1017/// All updates in a chunk are sorted by (time, data) and consolidated.
1018///
1019/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
1020/// re-use chunk allocations. Unfortunately, the current `ColumnationStack`/`ChunkedStack` API doesn't
1021/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
1022/// spirit.
1023struct Chunk<D: Data> {
1024 /// The contained updates.
1025 data: ColumnationStack<(D, Timestamp, Diff)>,
1026 /// Cached value of the current chunk size, for efficient updating of metrics.
1027 cached_size: Option<SizeMetrics>,
1028}
1029
1030impl<D: Data> fmt::Debug for Chunk<D> {
1031 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1032 write!(f, "Chunk(<{}>)", self.len())
1033 }
1034}
1035
1036impl<D: Data> Chunk<D> {
1037 /// Create a new chunk containing a single update.
1038 fn from_update<DT: Borrow<D>>(update: (DT, Timestamp, Diff), chunk_capacity: usize) -> Self {
1039 let (d, t, r) = update;
1040
1041 let mut chunk = Self {
1042 data: ColumnationStack::with_capacity(chunk_capacity),
1043 cached_size: None,
1044 };
1045 chunk.data.copy_destructured(d.borrow(), &t, &r);
1046
1047 chunk
1048 }
1049
1050 /// Return the number of updates in the chunk.
1051 fn len(&self) -> usize {
1052 self.data.len()
1053 }
1054
1055 /// Return the update at the given index.
1056 ///
1057 /// # Panics
1058 ///
1059 /// Panics if the given index is not populated.
1060 fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
1061 let (d, t, r) = self.data.index(idx);
1062 (d, *t, *r)
1063 }
1064
1065 /// Return the first update in the chunk.
1066 fn first(&self) -> (&D, Timestamp, Diff) {
1067 self.index(0)
1068 }
1069
1070 /// Return the last update in the chunk.
1071 fn last(&self) -> (&D, Timestamp, Diff) {
1072 self.index(self.len() - 1)
1073 }
1074
1075 /// Push an update onto the chunk.
1076 fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
1077 let (d, t, r) = update;
1078 self.data.copy_destructured(d.borrow(), &t, &r);
1079
1080 self.invalidate_cached_size();
1081 }
1082
1083 /// Return the index of the first update at a time greater than `time`, or `None` if no such
1084 /// update exists.
1085 fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
1086 if self.last().1 <= time {
1087 return None;
1088 }
1089
1090 let mut lower = 0;
1091 let mut upper = self.len();
1092 while lower < upper {
1093 let idx = (lower + upper) / 2;
1094 if self.index(idx).1 > time {
1095 upper = idx;
1096 } else {
1097 lower = idx + 1;
1098 }
1099 }
1100
1101 Some(lower)
1102 }
1103
1104 /// Return the size of the chunk, for use in metrics.
1105 fn get_size(&mut self) -> SizeMetrics {
1106 if self.cached_size.is_none() {
1107 let mut size = 0;
1108 let mut capacity = 0;
1109 self.data.heap_size(|sz, cap| {
1110 size += sz;
1111 capacity += cap;
1112 });
1113 self.cached_size = Some(SizeMetrics {
1114 size,
1115 capacity,
1116 allocations: 1,
1117 });
1118 }
1119
1120 self.cached_size.unwrap()
1121 }
1122
1123 /// Invalidate the cached chunk size.
1124 ///
1125 /// This method must be called every time the size of the chunk changed.
1126 fn invalidate_cached_size(&mut self) {
1127 self.cached_size = None;
1128 }
1129}
1130
1131/// A buffer for staging updates before they are inserted into the sorted chains.
1132#[derive(Debug)]
1133struct Stage<D> {
1134 /// The contained updates.
1135 ///
1136 /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
1137 data: Vec<(D, Timestamp, Diff)>,
1138 /// Introspection logging.
1139 ///
1140 /// We want to report the number of records in the stage. To do so, we pretend that the stage
1141 /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
1142 /// re-created.
1143 logging: Option<Logging>,
1144}
1145
1146impl<D: Data> Stage<D> {
1147 fn new(logging: Option<Logging>, chunk_capacity: usize) -> Self {
1148 // For logging, we pretend the stage consists of a single chain.
1149 if let Some(logging) = &logging {
1150 logging.chain_created(0);
1151 }
1152
1153 Self {
1154 data: Vec::with_capacity(chunk_capacity),
1155 logging,
1156 }
1157 }
1158
1159 fn is_empty(&self) -> bool {
1160 self.data.is_empty()
1161 }
1162
1163 /// Insert a batch of updates, possibly producing a ready [`Chain`].
1164 fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
1165 if updates.is_empty() {
1166 return None;
1167 }
1168
1169 let prev_length = self.ilen();
1170
1171 // Determine how many chunks we can fill with the available updates.
1172 let update_count = self.data.len() + updates.len();
1173 let chunk_capacity = self.data.capacity();
1174 let chunk_count = update_count / chunk_capacity;
1175
1176 let mut new_updates = updates.drain(..);
1177
1178 // If we have enough shipable updates, collect them, consolidate, and build a chain.
1179 let maybe_chain = if chunk_count > 0 {
1180 let ship_count = chunk_count * chunk_capacity;
1181 let mut buffer = Vec::with_capacity(ship_count);
1182
1183 buffer.append(&mut self.data);
1184 while buffer.len() < ship_count {
1185 let update = new_updates.next().unwrap();
1186 buffer.push(update);
1187 }
1188
1189 consolidate(&mut buffer);
1190
1191 let mut chain = Chain::new(chunk_capacity);
1192 chain.extend(buffer);
1193 Some(chain)
1194 } else {
1195 None
1196 };
1197
1198 // Stage the remaining updates.
1199 self.data.extend(new_updates);
1200
1201 self.log_length_diff(self.ilen() - prev_length);
1202
1203 maybe_chain
1204 }
1205
1206 /// Flush all currently staged updates into a chain.
1207 fn flush(&mut self) -> Option<Chain<D>> {
1208 self.log_length_diff(-self.ilen());
1209
1210 consolidate(&mut self.data);
1211
1212 if self.data.is_empty() {
1213 return None;
1214 }
1215
1216 let chunk_capacity = self.data.capacity();
1217 let mut chain = Chain::new(chunk_capacity);
1218 chain.extend(self.data.drain(..));
1219 Some(chain)
1220 }
1221
1222 /// Advance the times of staged updates by the given `since`.
1223 fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1224 let Some(since_ts) = since.as_option() else {
1225 // If the since is the empty frontier, discard all updates.
1226 self.log_length_diff(-self.ilen());
1227 self.data.clear();
1228 return;
1229 };
1230
1231 for (_, time, _) in &mut self.data {
1232 *time = std::cmp::max(*time, *since_ts);
1233 }
1234 }
1235
1236 /// Return the size of the stage, for use in metrics.
1237 ///
1238 /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1239 /// under-estimates. That's fine as the stage should always be small.
1240 fn get_size(&self) -> SizeMetrics {
1241 SizeMetrics {
1242 size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1243 capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1244 allocations: 1,
1245 }
1246 }
1247
1248 /// Return the number of updates in the stage, as an `isize`.
1249 fn ilen(&self) -> isize {
1250 self.data.len().try_into().expect("must fit")
1251 }
1252
1253 fn log_length_diff(&self, diff: isize) {
1254 let Some(logging) = &self.logging else { return };
1255 if diff > 0 {
1256 let count = usize::try_from(diff).expect("must fit");
1257 logging.chain_created(count);
1258 logging.chain_dropped(0);
1259 } else if diff < 0 {
1260 let count = usize::try_from(-diff).expect("must fit");
1261 logging.chain_created(0);
1262 logging.chain_dropped(count);
1263 }
1264 }
1265}
1266
1267impl<D> Drop for Stage<D> {
1268 fn drop(&mut self) {
1269 if let Some(logging) = &self.logging {
1270 logging.chain_dropped(self.data.len());
1271 }
1272 }
1273}
1274
1275/// Sort and consolidate the given list of updates.
1276///
1277/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1278/// except that it sorts updates by (time, data) instead of (data, time).
1279fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1280 if updates.len() <= 1 {
1281 return;
1282 }
1283
1284 let diff = |update: &(_, _, Diff)| update.2;
1285
1286 updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1287
1288 let mut offset = 0;
1289 let mut accum = diff(&updates[0]);
1290
1291 for idx in 1..updates.len() {
1292 let this = &updates[idx];
1293 let prev = &updates[idx - 1];
1294 if this.0 == prev.0 && this.1 == prev.1 {
1295 accum += diff(&updates[idx]);
1296 } else {
1297 if accum != Diff::ZERO {
1298 updates.swap(offset, idx - 1);
1299 updates[offset].2 = accum;
1300 offset += 1;
1301 }
1302 accum = diff(&updates[idx]);
1303 }
1304 }
1305
1306 if accum != Diff::ZERO {
1307 let len = updates.len();
1308 updates.swap(offset, len - 1);
1309 updates[offset].2 = accum;
1310 offset += 1;
1311 }
1312
1313 updates.truncate(offset);
1314}
1315
1316/// A binary heap specialized for merging [`Cursor`]s.
1317struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1318
1319impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1320 fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1321 let inner = cursors.into_iter().map(MergeCursor).collect();
1322 Self(inner)
1323 }
1324}
1325
1326impl<D: Data> MergeHeap<D> {
1327 /// Pop the next cursor (the one yielding the least update) from the heap.
1328 fn pop(&mut self) -> Option<Cursor<D>> {
1329 self.0.pop().map(|MergeCursor(c)| c)
1330 }
1331
1332 /// Pop the next cursor from the heap, provided the data and time of its current update are
1333 /// equal to the given values.
1334 ///
1335 /// Returns both the cursor and the diff corresponding to `data` and `time`.
1336 fn pop_equal(&mut self, data: &D, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1337 let MergeCursor(cursor) = self.0.peek()?;
1338 let (d, t, r) = cursor.get();
1339 if d == data && t == time {
1340 let cursor = self.pop().expect("checked above");
1341 Some((cursor, r))
1342 } else {
1343 None
1344 }
1345 }
1346
1347 /// Push a cursor onto the heap.
1348 fn push(&mut self, cursor: Cursor<D>) {
1349 self.0.push(MergeCursor(cursor));
1350 }
1351}
1352
1353/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1354///
1355/// Implements the cursor ordering required for merging cursors.
1356struct MergeCursor<D: Data>(Cursor<D>);
1357
1358impl<D: Data> PartialEq for MergeCursor<D> {
1359 fn eq(&self, other: &Self) -> bool {
1360 self.cmp(other).is_eq()
1361 }
1362}
1363
1364impl<D: Data> Eq for MergeCursor<D> {}
1365
1366impl<D: Data> PartialOrd for MergeCursor<D> {
1367 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1368 Some(self.cmp(other))
1369 }
1370}
1371
1372impl<D: Data> Ord for MergeCursor<D> {
1373 fn cmp(&self, other: &Self) -> Ordering {
1374 let (d1, t1, _) = self.0.get();
1375 let (d2, t2, _) = other.0.get();
1376 (t1, d1).cmp(&(t2, d2)).reverse()
1377 }
1378}