mz_compute/sink/correction_v2.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//! * insert new updates
15//! * advance the compaction frontier (called `since`)
16//! * obtain an iterator over consolidated updates before some `upper`
17//! * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of [`Chain`]s containing [`Chunk`]s of stashed updates. Each
32//! [`Chunk`] is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! ```text
36//! chain[0] | chain[1] | chain[2]
37//! | |
38//! chunk[0] | chunk[0] | chunk[0]
39//! (a, 1, +1) | (a, 1, +1) | (d, 3, +1)
40//! (b, 1, +1) | (b, 2, -1) | (d, 4, -1)
41//! chunk[1] | chunk[1] |
42//! (c, 1, +1) | (c, 2, -2) |
43//! (a, 2, -1) | (c, 4, -1) |
44//! chunk[2] | |
45//! (b, 2, +1) | |
46//! (c, 2, +1) | |
47//! chunk[3] | |
48//! (b, 3, -1) | |
49//! (c, 3, +1) | |
50//! ```
51//!
52//! The "chain invariant" states that each chain has at least [`CHAIN_PROPORTIONALITY`] times as
53//! many chunks as the next one. This means that chain sizes will often be powers of
54//! `CHAIN_PROPORTIONALITY`, but they don't have to be. For example, for a proportionality of 2,
55//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
56//!
57//! Choosing the `CHAIN_PROPORTIONALITY` value allows tuning the trade-off between memory and CPU
58//! resources required to maintain corrections. A higher proportionality forces more frequent chain
59//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
60//!
61//! ## Inserting Updates
62//!
63//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
64//! list until the chain invariant is restored.
65//!
66//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
67//! chunk, copying the update in, and then likely merging with an existing chain to restore the
68//! chain invariant. If updates trickle in in small batches, this can cause a considerable
69//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
70//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
71//! [`Chunk`], they are sorted an inserted into the chains.
72//!
73//! The insert operation has an amortized complexity of O(log N), with N being the current number
74//! of updates stored.
75//!
76//! ## Retrieving Consolidated Updates
77//!
78//! Retrieving consolidated updates before a given `upper` works by first consolidating all updates
79//! at times before the `upper`, merging them all into one chain, then returning an iterator over
80//! that chain.
81//!
82//! Because each chain contains updates ordered by time first, consolidation of all updates before
83//! an `upper` is possible without touching updates at future times. It works by merging the chains
84//! only up to the `upper`, producing a merged chain containing consolidated times before the
85//! `upper` and leaving behind the chain parts containing later times. The complexity of this
86//! operation is O(U log K), with U being the number of updates before `upper` and K the number
87//! of chains.
88//!
89//! Unfortunately, performing consolidation as described above can break the chain invariant and we
90//! might need to restore it by merging chains, including ones containing future updates. This is
91//! something that would be great to fix! In the meantime the hope is that in steady state it
92//! doesn't matter too much because either there are no future retractions and U is approximately
93//! equal to N, or the amount of future retractions is much larger than the amount of current
94//! changes, in which case removing the current changes has a good chance of leaving the chain
95//! invariant intact.
96//!
97//! ## Merging Chains
98//!
99//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
100//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
101//! complexity of a merge of K chains containing N updates is O(N log K).
102//!
103//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
104//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
105//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
106//!
107//! For example, consider these two chains, assuming `since = [2]`:
108//! chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
109//! chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
110//! After time advancement, the chains look like this:
111//! chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
112//! chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
113//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
114//! neither sorted nor consolidated.
115//!
116//! Instead we need to merge sub-chains, one for each distinct time that's before or at the
117//! `since`. Each of these sub-chains retains the (time, data) ordering after the time advancement
118//! to `since`, so merging those yields the expected result.
119//!
120//! For the above example, the chains we would merge are:
121//! chain 1.a: [(c, 2, +1)]
122//! chain 1.b: [(b, 2, -1), (a, 3, -1)]
123//! chain 2.a: [(b, 2, +1)],
124//! chain 2.b: [(a, 2, +1), (c, 2, -1)]
125
126use std::borrow::Borrow;
127use std::cmp::Ordering;
128use std::collections::{BinaryHeap, VecDeque};
129use std::fmt;
130use std::rc::Rc;
131
132use differential_dataflow::containers::{Columnation, TimelyStack};
133use differential_dataflow::trace::implementations::BatchContainer;
134use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
135use mz_repr::{Diff, Timestamp};
136use timely::PartialOrder;
137use timely::container::SizableContainer;
138use timely::progress::Antichain;
139
140use crate::sink::correction::{Logging, SizeMetrics};
141
142/// Determines the size factor of subsequent chains required by the chain invariant.
143const CHAIN_PROPORTIONALITY: usize = 3;
144
145/// Convenient alias for use in data trait bounds.
146pub trait Data: differential_dataflow::Data + Columnation {}
147impl<D: differential_dataflow::Data + Columnation> Data for D {}
148
149/// A data structure used to store corrections in the MV sink implementation.
150///
151/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
152/// allowing their memory to be transparently spilled to disk.
153#[derive(Debug)]
154pub(super) struct CorrectionV2<D: Data> {
155 /// Chains containing sorted updates.
156 chains: Vec<Chain<D>>,
157 /// A staging area for updates, to speed up small inserts.
158 stage: Stage<D>,
159 /// The frontier by which all contained times are advanced.
160 since: Antichain<Timestamp>,
161
162 /// Total count of updates in the correction buffer.
163 ///
164 /// Tracked to compute deltas in `update_metrics`.
165 prev_update_count: usize,
166 /// Total heap size used by the correction buffer.
167 ///
168 /// Tracked to compute deltas in `update_metrics`.
169 prev_size: SizeMetrics,
170 /// Global persist sink metrics.
171 metrics: SinkMetrics,
172 /// Per-worker persist sink metrics.
173 worker_metrics: SinkWorkerMetrics,
174 /// Introspection logging.
175 logging: Option<Logging>,
176}
177
178impl<D: Data> CorrectionV2<D> {
179 /// Construct a new [`CorrectionV2`] instance.
180 pub fn new(
181 metrics: SinkMetrics,
182 worker_metrics: SinkWorkerMetrics,
183 logging: Option<Logging>,
184 ) -> Self {
185 Self {
186 chains: Default::default(),
187 stage: Stage::new(logging.clone()),
188 since: Antichain::from_elem(Timestamp::MIN),
189 prev_update_count: 0,
190 prev_size: Default::default(),
191 metrics,
192 worker_metrics,
193 logging,
194 }
195 }
196
197 /// Insert a batch of updates.
198 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
199 let Some(since_ts) = self.since.as_option() else {
200 // If the since is the empty frontier, discard all updates.
201 updates.clear();
202 return;
203 };
204
205 for (_, time, _) in &mut *updates {
206 *time = std::cmp::max(*time, *since_ts);
207 }
208
209 self.insert_inner(updates);
210 }
211
212 /// Insert a batch of updates, after negating their diffs.
213 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
214 let Some(since_ts) = self.since.as_option() else {
215 // If the since is the empty frontier, discard all updates.
216 updates.clear();
217 return;
218 };
219
220 for (_, time, diff) in &mut *updates {
221 *time = std::cmp::max(*time, *since_ts);
222 *diff = -*diff;
223 }
224
225 self.insert_inner(updates);
226 }
227
228 /// Insert a batch of updates.
229 ///
230 /// All times are expected to be >= the `since`.
231 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
232 debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
233
234 if let Some(chain) = self.stage.insert(updates) {
235 self.log_chain_created(&chain);
236 self.chains.push(chain);
237
238 // Restore the chain invariant.
239 let merge_needed = |chains: &[Chain<_>]| match chains {
240 [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(),
241 _ => false,
242 };
243
244 while merge_needed(&self.chains) {
245 let a = self.chains.pop().unwrap();
246 let b = self.chains.pop().unwrap();
247 self.log_chain_dropped(&a);
248 self.log_chain_dropped(&b);
249
250 let merged = merge_chains([a, b], &self.since);
251 self.log_chain_created(&merged);
252 self.chains.push(merged);
253 }
254 };
255
256 self.update_metrics();
257 }
258
259 /// Return consolidated updates before the given `upper`.
260 pub fn updates_before<'a>(
261 &'a mut self,
262 upper: &Antichain<Timestamp>,
263 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + 'a {
264 let mut result = None;
265
266 if !PartialOrder::less_than(&self.since, upper) {
267 // All contained updates are beyond the upper.
268 return result.into_iter().flatten();
269 }
270
271 self.consolidate_before(upper);
272
273 // There is at most one chain that contains updates before `upper` now.
274 result = self
275 .chains
276 .iter()
277 .find(|c| c.first().is_some_and(|(_, t, _)| !upper.less_equal(&t)))
278 .map(move |c| {
279 let upper = upper.clone();
280 c.iter().take_while(move |(_, t, _)| !upper.less_equal(t))
281 });
282
283 result.into_iter().flatten()
284 }
285
286 /// Consolidate all updates before the given `upper`.
287 ///
288 /// Once this method returns, all remaining updates before `upper` are contained in a single
289 /// chain. Note that this chain might also contain updates beyond `upper` though!
290 fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
291 if self.chains.is_empty() && self.stage.is_empty() {
292 return;
293 }
294
295 let mut chains = std::mem::take(&mut self.chains);
296
297 // To keep things simple, we log the dropping of all chains here and log the creation of
298 // all remaining chains at the end. This causes more event churn than necessary, but the
299 // consolidated result is correct.
300 chains.iter().for_each(|c| self.log_chain_dropped(c));
301
302 chains.extend(self.stage.flush());
303
304 if chains.is_empty() {
305 // We can only get here if the stage contained updates but they all got consolidated
306 // away by `flush`, so we need to update the metrics before we return.
307 self.update_metrics();
308 return;
309 }
310
311 let (merged, remains) = merge_chains_up_to(chains, &self.since, upper);
312
313 self.chains = remains;
314 if !merged.is_empty() {
315 // We put the merged chain at the end, assuming that its contents are likely to
316 // consolidate with retractions that will arrive soon.
317 self.chains.push(merged);
318 }
319
320 // Restore the chain invariant.
321 //
322 // This part isn't great. We've taken great care so far to only look at updates with times
323 // before `upper`, but now we might end up merging all chains anyway in the worst case.
324 // There might be something smarter we could do to avoid merging as much as possible. For
325 // example, we could consider sorting chains by length first, or inspect the contained
326 // times and prefer merging chains that have a chance at consolidating with one another.
327 let mut i = self.chains.len().saturating_sub(1);
328 while i > 0 {
329 let needs_merge = self.chains.get(i).is_some_and(|a| {
330 let b = &self.chains[i - 1];
331 a.len() * CHAIN_PROPORTIONALITY > b.len()
332 });
333 if needs_merge {
334 let a = self.chains.remove(i);
335 let b = std::mem::take(&mut self.chains[i - 1]);
336 let merged = merge_chains([a, b], &self.since);
337 self.chains[i - 1] = merged;
338 } else {
339 // Only advance the index if we didn't merge. A merge can reduce the size of the
340 // chain at `i - 1`, causing an violation of the chain invariant with the next
341 // chain, so we might need to merge the two before proceeding to lower indexes.
342 i -= 1;
343 }
344 }
345
346 self.chains.iter().for_each(|c| self.log_chain_created(c));
347 self.update_metrics();
348 }
349
350 /// Advance the since frontier.
351 ///
352 /// # Panics
353 ///
354 /// Panics if the given `since` is less than the current since frontier.
355 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
356 assert!(PartialOrder::less_equal(&self.since, &since));
357 self.stage.advance_times(&since);
358 self.since = since;
359 }
360
361 /// Consolidate all updates at the current `since`.
362 pub fn consolidate_at_since(&mut self) {
363 let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
364 if let Some(upper_ts) = upper_ts {
365 let upper = Antichain::from_elem(upper_ts);
366 self.consolidate_before(&upper);
367 }
368 }
369
370 fn log_chain_created(&self, chain: &Chain<D>) {
371 if let Some(logging) = &self.logging {
372 logging.chain_created(chain.update_count);
373 }
374 }
375
376 fn log_chain_dropped(&self, chain: &Chain<D>) {
377 if let Some(logging) = &self.logging {
378 logging.chain_dropped(chain.update_count);
379 }
380 }
381
382 /// Update persist sink metrics.
383 fn update_metrics(&mut self) {
384 let mut new_size = self.stage.get_size();
385 let mut new_length = self.stage.data.len();
386 for chain in &mut self.chains {
387 new_size += chain.get_size();
388 new_length += chain.update_count;
389 }
390
391 let old_size = self.prev_size;
392 let old_length = self.prev_update_count;
393 let len_delta = UpdateDelta::new(new_length, old_length);
394 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
395 self.metrics
396 .report_correction_update_deltas(len_delta, cap_delta);
397 self.worker_metrics
398 .report_correction_update_totals(new_length, new_size.capacity);
399
400 if let Some(logging) = &self.logging {
401 let i = |x: usize| isize::try_from(x).expect("must fit");
402 logging.report_size_diff(i(new_size.size) - i(old_size.size));
403 logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
404 logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
405 }
406
407 self.prev_size = new_size;
408 self.prev_update_count = new_length;
409 }
410}
411
412impl<D: Data> Drop for CorrectionV2<D> {
413 fn drop(&mut self) {
414 self.chains.iter().for_each(|c| self.log_chain_dropped(c));
415 }
416}
417
418/// A chain of [`Chunk`]s containing updates.
419///
420/// All updates in a chain are sorted by (time, data) and consolidated.
421///
422/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
423/// keeping around empty chains.
424#[derive(Debug)]
425struct Chain<D: Data> {
426 /// The contained chunks.
427 chunks: Vec<Chunk<D>>,
428 /// The number of updates contained in all chunks, for efficient updating of metrics.
429 update_count: usize,
430 /// Cached value of the current chain size, for efficient updating of metrics.
431 cached_size: Option<SizeMetrics>,
432}
433
434impl<D: Data> Default for Chain<D> {
435 fn default() -> Self {
436 Self {
437 chunks: Default::default(),
438 update_count: 0,
439 cached_size: None,
440 }
441 }
442}
443
444impl<D: Data> Chain<D> {
445 /// Return whether the chain is empty.
446 fn is_empty(&self) -> bool {
447 self.chunks.is_empty()
448 }
449
450 /// Return the length of the chain, in chunks.
451 fn len(&self) -> usize {
452 self.chunks.len()
453 }
454
455 /// Push an update onto the chain.
456 ///
457 /// The update must sort after all updates already in the chain, in (time, data)-order, to
458 /// ensure the chain remains sorted.
459 fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
460 let (d, t, r) = update;
461 let update = (d.borrow(), t, r);
462
463 debug_assert!(self.can_accept(update));
464
465 match self.chunks.last_mut() {
466 Some(c) if !c.at_capacity() => c.push(update),
467 Some(_) | None => {
468 let chunk = Chunk::from_update(update);
469 self.push_chunk(chunk);
470 }
471 }
472
473 self.update_count += 1;
474 self.invalidate_cached_size();
475 }
476
477 /// Push a chunk onto the chain.
478 ///
479 /// All updates in the chunk must sort after all updates already in the chain, in
480 /// (time, data)-order, to ensure the chain remains sorted.
481 fn push_chunk(&mut self, chunk: Chunk<D>) {
482 debug_assert!(self.can_accept(chunk.first()));
483
484 self.update_count += chunk.len();
485 self.chunks.push(chunk);
486 self.invalidate_cached_size();
487 }
488
489 /// Push the updates produced by a cursor onto the chain.
490 ///
491 /// All updates produced by the cursor must sort after all updates already in the chain, in
492 /// (time, data)-order, to ensure the chain remains sorted.
493 fn push_cursor(&mut self, cursor: Cursor<D>) {
494 let mut rest = Some(cursor);
495 while let Some(cursor) = rest.take() {
496 let update = cursor.get();
497 self.push(update);
498 rest = cursor.step();
499 }
500 }
501
502 /// Return whether the chain can accept the given update.
503 ///
504 /// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
505 fn can_accept(&self, update: (&D, Timestamp, Diff)) -> bool {
506 self.last().is_none_or(|(dc, tc, _)| {
507 let (d, t, _) = update;
508 (tc, dc) < (t, d)
509 })
510 }
511
512 /// Return the first update in the chain, if any.
513 fn first(&self) -> Option<(&D, Timestamp, Diff)> {
514 self.chunks.first().map(|c| c.first())
515 }
516
517 /// Return the last update in the chain, if any.
518 fn last(&self) -> Option<(&D, Timestamp, Diff)> {
519 self.chunks.last().map(|c| c.last())
520 }
521
522 /// Convert the chain into a cursor over the contained updates.
523 fn into_cursor(self) -> Option<Cursor<D>> {
524 let chunks = self.chunks.into_iter().map(Rc::new).collect();
525 Cursor::new(chunks)
526 }
527
528 /// Return an iterator over the contained updates.
529 fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
530 self.chunks
531 .iter()
532 .flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
533 }
534
535 /// Return the size of the chain, for use in metrics.
536 fn get_size(&mut self) -> SizeMetrics {
537 // This operation can be expensive as it requires inspecting the individual chunks and
538 // their backing regions. We thus cache the result to hopefully avoid the cost most of the
539 // time.
540 if self.cached_size.is_none() {
541 let mut metrics = SizeMetrics::default();
542 for chunk in &mut self.chunks {
543 metrics += chunk.get_size();
544 }
545 self.cached_size = Some(metrics);
546 }
547
548 self.cached_size.unwrap()
549 }
550
551 /// Invalidate the cached chain size.
552 ///
553 /// This method must be called every time the size of the chain changed.
554 fn invalidate_cached_size(&mut self) {
555 self.cached_size = None;
556 }
557}
558
559impl<D: Data> Extend<(D, Timestamp, Diff)> for Chain<D> {
560 fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
561 for update in iter {
562 self.push(update);
563 }
564 }
565}
566
567/// A cursor over updates in a chain.
568///
569/// A cursor provides two guarantees:
570/// * Produced updates are ordered and consolidated.
571/// * A cursor always yields at least one update.
572///
573/// The second guarantee is enforced through the type system: Every method that steps a cursor
574/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
575/// over the last update.
576///
577/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
578/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
579/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
580#[derive(Clone, Debug)]
581struct Cursor<D: Data> {
582 /// The chunks from which updates can still be produced.
583 chunks: VecDeque<Rc<Chunk<D>>>,
584 /// The current offset into `chunks.front()`.
585 chunk_offset: usize,
586 /// An optional limit for the number of updates the cursor will produce.
587 limit: Option<usize>,
588 /// An optional overwrite for the timestamp of produced updates.
589 overwrite_ts: Option<Timestamp>,
590}
591
592impl<D: Data> Cursor<D> {
593 /// Construct a cursor over a list of chunks.
594 ///
595 /// Returns `None` if `chunks` is empty.
596 fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
597 if chunks.is_empty() {
598 return None;
599 }
600
601 Some(Self {
602 chunks,
603 chunk_offset: 0,
604 limit: None,
605 overwrite_ts: None,
606 })
607 }
608
609 /// Set a limit for the number of updates this cursor will produce.
610 ///
611 /// # Panics
612 ///
613 /// Panics if there is already a limit lower than the new one.
614 fn set_limit(mut self, limit: usize) -> Option<Self> {
615 assert!(self.limit.is_none_or(|l| l >= limit));
616
617 if limit == 0 {
618 return None;
619 }
620
621 // Release chunks made unreachable by the limit.
622 let mut count = 0;
623 let mut idx = 0;
624 let mut offset = self.chunk_offset;
625 while idx < self.chunks.len() && count < limit {
626 let chunk = &self.chunks[idx];
627 count += chunk.len() - offset;
628 idx += 1;
629 offset = 0;
630 }
631 self.chunks.truncate(idx);
632
633 if count > limit {
634 self.limit = Some(limit);
635 }
636
637 Some(self)
638 }
639
640 /// Get a reference to the current update.
641 fn get(&self) -> (&D, Timestamp, Diff) {
642 let chunk = self.get_chunk();
643 let (d, t, r) = chunk.index(self.chunk_offset);
644 let t = self.overwrite_ts.unwrap_or(t);
645 (d, t, r)
646 }
647
648 /// Get a reference to the current chunk.
649 fn get_chunk(&self) -> &Chunk<D> {
650 &self.chunks[0]
651 }
652
653 /// Step to the next update.
654 ///
655 /// Returns the stepped cursor, or `None` if the step was over the last update.
656 fn step(mut self) -> Option<Self> {
657 if self.chunk_offset == self.get_chunk().len() - 1 {
658 return self.skip_chunk().map(|(c, _)| c);
659 }
660
661 self.chunk_offset += 1;
662
663 if let Some(limit) = &mut self.limit {
664 *limit -= 1;
665 if *limit == 0 {
666 return None;
667 }
668 }
669
670 Some(self)
671 }
672
673 /// Skip the remainder of the current chunk.
674 ///
675 /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
676 /// left after the skip.
677 fn skip_chunk(mut self) -> Option<(Self, usize)> {
678 let chunk = self.chunks.pop_front().expect("cursor invariant");
679
680 if self.chunks.is_empty() {
681 return None;
682 }
683
684 let skipped = chunk.len() - self.chunk_offset;
685 self.chunk_offset = 0;
686
687 if let Some(limit) = &mut self.limit {
688 if skipped >= *limit {
689 return None;
690 }
691 *limit -= skipped;
692 }
693
694 Some((self, skipped))
695 }
696
697 /// Skip all updates with times <= the given time.
698 ///
699 /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
700 /// left after the skip.
701 fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
702 if self.overwrite_ts.is_some_and(|ts| ts <= time) {
703 return None;
704 } else if self.get().1 > time {
705 return Some((self, 0));
706 }
707
708 let mut skipped = 0;
709
710 let new_offset = loop {
711 let chunk = self.get_chunk();
712 if let Some(index) = chunk.find_time_greater_than(time) {
713 break index;
714 }
715
716 let (cursor, count) = self.skip_chunk()?;
717 self = cursor;
718 skipped += count;
719 };
720
721 skipped += new_offset - self.chunk_offset;
722 self.chunk_offset = new_offset;
723
724 Some((self, skipped))
725 }
726
727 /// Advance all updates in this cursor by the given `since_ts`.
728 ///
729 /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
730 /// been advanced by `since_ts`.
731 fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
732 // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
733 // only need to advance the `overwrite_ts` by the `since_ts`.
734 if let Some(ts) = self.overwrite_ts {
735 if ts < since_ts {
736 self.overwrite_ts = Some(since_ts);
737 }
738 return vec![self];
739 }
740
741 // Otherwise we need to split the cursor so that each new cursor only yields runs of
742 // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
743 // this by splitting the cursor at each time <= `since_ts`.
744 let mut splits = Vec::new();
745 let mut remaining = Some(self);
746
747 while let Some(cursor) = remaining.take() {
748 let (_, time, _) = cursor.get();
749 if time >= since_ts {
750 splits.push(cursor);
751 break;
752 }
753
754 let mut current = cursor.clone();
755 if let Some((cursor, skipped)) = cursor.skip_time(time) {
756 remaining = Some(cursor);
757 current = current.set_limit(skipped).expect("skipped at least 1");
758 }
759 current.overwrite_ts = Some(since_ts);
760 splits.push(current);
761 }
762
763 splits
764 }
765
766 /// Split the cursor at the given time.
767 ///
768 /// Returns two cursors, the first yielding all updates at times < `time`, the second yielding
769 /// all updates at times >= `time`. Both can be `None` if they would be empty.
770 fn split_at_time(self, time: Timestamp) -> (Option<Self>, Option<Self>) {
771 let Some(skip_ts) = time.step_back() else {
772 return (None, Some(self));
773 };
774
775 let before = self.clone();
776 match self.skip_time(skip_ts) {
777 Some((beyond, skipped)) => (before.set_limit(skipped), Some(beyond)),
778 None => (Some(before), None),
779 }
780 }
781
782 /// Attempt to unwrap the cursor into a [`Chain`].
783 ///
784 /// This operation efficiently reuses chunks by directly inserting them into the output chain
785 /// where possible.
786 ///
787 /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
788 /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
789 /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
790 /// chain by copying chunks rather than reusing them.
791 fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
792 if self.limit.is_some() {
793 return Err(("cursor with limit", self));
794 }
795 if self.overwrite_ts.is_some() {
796 return Err(("cursor with overwrite_ts", self));
797 }
798 if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
799 return Err(("cursor on shared chunks", self));
800 }
801
802 let mut chain = Chain::default();
803 let mut remaining = Some(self);
804
805 // We might be partway through the first chunk, in which case we can't reuse it but need to
806 // allocate a new one to contain only the updates the cursor can still yield.
807 while let Some(cursor) = remaining.take() {
808 if cursor.chunk_offset == 0 {
809 remaining = Some(cursor);
810 break;
811 }
812 let update = cursor.get();
813 chain.push(update);
814 remaining = cursor.step();
815 }
816
817 if let Some(cursor) = remaining {
818 for chunk in cursor.chunks {
819 let chunk = Rc::into_inner(chunk).expect("checked above");
820 chain.push_chunk(chunk);
821 }
822 }
823
824 Ok(chain)
825 }
826}
827
828impl<D: Data> From<Cursor<D>> for Chain<D> {
829 fn from(cursor: Cursor<D>) -> Self {
830 match cursor.try_unwrap() {
831 Ok(chain) => chain,
832 Err((_, cursor)) => {
833 let mut chain = Chain::default();
834 chain.push_cursor(cursor);
835 chain
836 }
837 }
838 }
839}
840
841/// A non-empty chunk of updates, backed by a columnation region.
842///
843/// All updates in a chunk are sorted by (time, data) and consolidated.
844///
845/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
846/// re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API doesn't
847/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
848/// spirit.
849struct Chunk<D: Data> {
850 /// The contained updates.
851 data: TimelyStack<(D, Timestamp, Diff)>,
852 /// Cached value of the current chunk size, for efficient updating of metrics.
853 cached_size: Option<SizeMetrics>,
854}
855
856impl<D: Data> Default for Chunk<D> {
857 fn default() -> Self {
858 let mut data = TimelyStack::default();
859 data.ensure_capacity(&mut None);
860
861 Self {
862 data,
863 cached_size: None,
864 }
865 }
866}
867
868impl<D: Data> fmt::Debug for Chunk<D> {
869 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
870 write!(f, "Chunk(<{}>)", self.len())
871 }
872}
873
874impl<D: Data> Chunk<D> {
875 /// Create a new chunk containing a single update.
876 fn from_update<DT: Borrow<D>>(update: (DT, Timestamp, Diff)) -> Self {
877 let (d, t, r) = update;
878
879 let mut chunk = Self::default();
880 chunk.data.copy_destructured(d.borrow(), &t, &r);
881
882 chunk
883 }
884
885 /// Return the number of updates in the chunk.
886 fn len(&self) -> usize {
887 self.data.len()
888 }
889
890 /// Return the (local) capacity of the chunk.
891 fn capacity(&self) -> usize {
892 self.data.capacity()
893 }
894
895 /// Return whether the chunk is at capacity.
896 fn at_capacity(&self) -> bool {
897 self.data.at_capacity()
898 }
899
900 /// Return the update at the given index.
901 ///
902 /// # Panics
903 ///
904 /// Panics if the given index is not populated.
905 fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
906 let (d, t, r) = self.data.index(idx);
907 (d, *t, *r)
908 }
909
910 /// Return the first update in the chunk.
911 fn first(&self) -> (&D, Timestamp, Diff) {
912 self.index(0)
913 }
914
915 /// Return the last update in the chunk.
916 fn last(&self) -> (&D, Timestamp, Diff) {
917 self.index(self.len() - 1)
918 }
919
920 /// Push an update onto the chunk.
921 fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
922 let (d, t, r) = update;
923 self.data.copy_destructured(d.borrow(), &t, &r);
924
925 self.invalidate_cached_size();
926 }
927
928 /// Return the index of the first update at a time greater than `time`, or `None` if no such
929 /// update exists.
930 fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
931 if self.last().1 <= time {
932 return None;
933 }
934
935 let mut lower = 0;
936 let mut upper = self.len();
937 while lower < upper {
938 let idx = (lower + upper) / 2;
939 if self.index(idx).1 > time {
940 upper = idx;
941 } else {
942 lower = idx + 1;
943 }
944 }
945
946 Some(lower)
947 }
948
949 /// Return the size of the chunk, for use in metrics.
950 fn get_size(&mut self) -> SizeMetrics {
951 if self.cached_size.is_none() {
952 let mut size = 0;
953 let mut capacity = 0;
954 self.data.heap_size(|sz, cap| {
955 size += sz;
956 capacity += cap;
957 });
958 self.cached_size = Some(SizeMetrics {
959 size,
960 capacity,
961 allocations: 1,
962 });
963 }
964
965 self.cached_size.unwrap()
966 }
967
968 /// Invalidate the cached chunk size.
969 ///
970 /// This method must be called every time the size of the chunk changed.
971 fn invalidate_cached_size(&mut self) {
972 self.cached_size = None;
973 }
974}
975
976/// A buffer for staging updates before they are inserted into the sorted chains.
977#[derive(Debug)]
978struct Stage<D> {
979 /// The contained updates.
980 ///
981 /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
982 data: Vec<(D, Timestamp, Diff)>,
983 /// Introspection logging.
984 ///
985 /// We want to report the number of records in the stage. To do so, we pretend that the stage
986 /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
987 /// re-created.
988 logging: Option<Logging>,
989}
990
991impl<D: Data> Stage<D> {
992 fn new(logging: Option<Logging>) -> Self {
993 // Make sure that the `Stage` has the same capacity as a `Chunk`.
994 let chunk = Chunk::<D>::default();
995 let data = Vec::with_capacity(chunk.capacity());
996
997 // For logging, we pretend the stage consists of a single chain.
998 if let Some(logging) = &logging {
999 logging.chain_created(0);
1000 }
1001
1002 Self { data, logging }
1003 }
1004
1005 fn is_empty(&self) -> bool {
1006 self.data.is_empty()
1007 }
1008
1009 /// Insert a batch of updates, possibly producing a ready [`Chain`].
1010 fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
1011 if updates.is_empty() {
1012 return None;
1013 }
1014
1015 let prev_length = self.ilen();
1016
1017 // Determine how many chunks we can fill with the available updates.
1018 let update_count = self.data.len() + updates.len();
1019 let chunk_size = self.data.capacity();
1020 let chunk_count = update_count / chunk_size;
1021
1022 let mut new_updates = updates.drain(..);
1023
1024 // If we have enough shipable updates, collect them, consolidate, and build a chain.
1025 let maybe_chain = if chunk_count > 0 {
1026 let ship_count = chunk_count * chunk_size;
1027 let mut buffer = Vec::with_capacity(ship_count);
1028
1029 buffer.append(&mut self.data);
1030 while buffer.len() < ship_count {
1031 let update = new_updates.next().unwrap();
1032 buffer.push(update);
1033 }
1034
1035 consolidate(&mut buffer);
1036
1037 let mut chain = Chain::default();
1038 chain.extend(buffer);
1039 Some(chain)
1040 } else {
1041 None
1042 };
1043
1044 // Stage the remaining updates.
1045 self.data.extend(new_updates);
1046
1047 self.log_length_diff(self.ilen() - prev_length);
1048
1049 maybe_chain
1050 }
1051
1052 /// Flush all currently staged updates into a chain.
1053 fn flush(&mut self) -> Option<Chain<D>> {
1054 self.log_length_diff(-self.ilen());
1055
1056 consolidate(&mut self.data);
1057
1058 if self.data.is_empty() {
1059 return None;
1060 }
1061
1062 let mut chain = Chain::default();
1063 chain.extend(self.data.drain(..));
1064 Some(chain)
1065 }
1066
1067 /// Advance the times of staged updates by the given `since`.
1068 fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1069 let Some(since_ts) = since.as_option() else {
1070 // If the since is the empty frontier, discard all updates.
1071 self.log_length_diff(-self.ilen());
1072 self.data.clear();
1073 return;
1074 };
1075
1076 for (_, time, _) in &mut self.data {
1077 *time = std::cmp::max(*time, *since_ts);
1078 }
1079 }
1080
1081 /// Return the size of the stage, for use in metrics.
1082 ///
1083 /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1084 /// under-estimates. That's fine as the stage should always be small.
1085 fn get_size(&self) -> SizeMetrics {
1086 SizeMetrics {
1087 size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1088 capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1089 allocations: 1,
1090 }
1091 }
1092
1093 /// Return the number of updates in the stage, as an `isize`.
1094 fn ilen(&self) -> isize {
1095 self.data.len().try_into().expect("must fit")
1096 }
1097
1098 fn log_length_diff(&self, diff: isize) {
1099 let Some(logging) = &self.logging else { return };
1100 if diff > 0 {
1101 let count = usize::try_from(diff).expect("must fit");
1102 logging.chain_created(count);
1103 logging.chain_dropped(0);
1104 } else if diff < 0 {
1105 let count = usize::try_from(-diff).expect("must fit");
1106 logging.chain_created(0);
1107 logging.chain_dropped(count);
1108 }
1109 }
1110}
1111
1112impl<D> Drop for Stage<D> {
1113 fn drop(&mut self) {
1114 if let Some(logging) = &self.logging {
1115 logging.chain_dropped(self.data.len());
1116 }
1117 }
1118}
1119
1120/// Sort and consolidate the given list of updates.
1121///
1122/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1123/// except that it sorts updates by (time, data) instead of (data, time).
1124fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1125 if updates.len() <= 1 {
1126 return;
1127 }
1128
1129 let diff = |update: &(_, _, Diff)| update.2;
1130
1131 updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1132
1133 let mut offset = 0;
1134 let mut accum = diff(&updates[0]);
1135
1136 for idx in 1..updates.len() {
1137 let this = &updates[idx];
1138 let prev = &updates[idx - 1];
1139 if this.0 == prev.0 && this.1 == prev.1 {
1140 accum += diff(&updates[idx]);
1141 } else {
1142 if accum != Diff::ZERO {
1143 updates.swap(offset, idx - 1);
1144 updates[offset].2 = accum;
1145 offset += 1;
1146 }
1147 accum = diff(&updates[idx]);
1148 }
1149 }
1150
1151 if accum != Diff::ZERO {
1152 let len = updates.len();
1153 updates.swap(offset, len - 1);
1154 updates[offset].2 = accum;
1155 offset += 1;
1156 }
1157
1158 updates.truncate(offset);
1159}
1160
1161/// Merge the given chains, advancing times by the given `since` in the process.
1162fn merge_chains<D: Data>(
1163 chains: impl IntoIterator<Item = Chain<D>>,
1164 since: &Antichain<Timestamp>,
1165) -> Chain<D> {
1166 let Some(&since_ts) = since.as_option() else {
1167 return Chain::default();
1168 };
1169
1170 let mut to_merge = Vec::new();
1171 for chain in chains {
1172 if let Some(cursor) = chain.into_cursor() {
1173 let mut runs = cursor.advance_by(since_ts);
1174 to_merge.append(&mut runs);
1175 }
1176 }
1177
1178 merge_cursors(to_merge)
1179}
1180
1181/// Merge the given chains, advancing times by the given `since` in the process, but only up to the
1182/// given `upper`.
1183///
1184/// Returns the merged chain and a list of non-empty remainders of the input chains.
1185fn merge_chains_up_to<D: Data>(
1186 chains: Vec<Chain<D>>,
1187 since: &Antichain<Timestamp>,
1188 upper: &Antichain<Timestamp>,
1189) -> (Chain<D>, Vec<Chain<D>>) {
1190 let Some(&since_ts) = since.as_option() else {
1191 return (Chain::default(), Vec::new());
1192 };
1193 let Some(&upper_ts) = upper.as_option() else {
1194 let merged = merge_chains(chains, since);
1195 return (merged, Vec::new());
1196 };
1197
1198 if since_ts >= upper_ts {
1199 // After advancing by `since` there will be no updates before `upper`.
1200 return (Chain::default(), chains);
1201 }
1202
1203 let mut to_merge = Vec::new();
1204 let mut to_keep = Vec::new();
1205 for chain in chains {
1206 if let Some(cursor) = chain.into_cursor() {
1207 let mut runs = cursor.advance_by(since_ts);
1208 if let Some(last) = runs.pop() {
1209 let (before, beyond) = last.split_at_time(upper_ts);
1210 before.map(|c| runs.push(c));
1211 beyond.map(|c| to_keep.push(c));
1212 }
1213 to_merge.append(&mut runs);
1214 }
1215 }
1216
1217 let merged = merge_cursors(to_merge);
1218 let remains = to_keep
1219 .into_iter()
1220 .map(|c| c.try_unwrap().expect("unwrapable"))
1221 .collect();
1222
1223 (merged, remains)
1224}
1225
1226/// Merge the given cursors into one chain.
1227fn merge_cursors<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
1228 match cursors.len() {
1229 0 => Chain::default(),
1230 1 => {
1231 let [cur] = cursors.try_into().unwrap();
1232 Chain::from(cur)
1233 }
1234 2 => {
1235 let [a, b] = cursors.try_into().unwrap();
1236 merge_2(a, b)
1237 }
1238 _ => merge_many(cursors),
1239 }
1240}
1241
1242/// Merge the given two cursors using a 2-way merge.
1243///
1244/// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
1245fn merge_2<D: Data>(cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
1246 let mut rest1 = Some(cursor1);
1247 let mut rest2 = Some(cursor2);
1248 let mut merged = Chain::default();
1249
1250 loop {
1251 match (rest1, rest2) {
1252 (Some(c1), Some(c2)) => {
1253 let (d1, t1, r1) = c1.get();
1254 let (d2, t2, r2) = c2.get();
1255
1256 match (t1, d1).cmp(&(t2, d2)) {
1257 Ordering::Less => {
1258 merged.push((d1, t1, r1));
1259 rest1 = c1.step();
1260 rest2 = Some(c2);
1261 }
1262 Ordering::Greater => {
1263 merged.push((d2, t2, r2));
1264 rest1 = Some(c1);
1265 rest2 = c2.step();
1266 }
1267 Ordering::Equal => {
1268 let r = r1 + r2;
1269 if r != Diff::ZERO {
1270 merged.push((d1, t1, r));
1271 }
1272 rest1 = c1.step();
1273 rest2 = c2.step();
1274 }
1275 }
1276 }
1277 (Some(c), None) | (None, Some(c)) => {
1278 merged.push_cursor(c);
1279 break;
1280 }
1281 (None, None) => break,
1282 }
1283 }
1284
1285 merged
1286}
1287
1288/// Merge the given cursors using a k-way merge with a binary heap.
1289fn merge_many<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
1290 let mut heap = MergeHeap::from_iter(cursors);
1291 let mut merged = Chain::default();
1292 while let Some(cursor1) = heap.pop() {
1293 let (data, time, mut diff) = cursor1.get();
1294
1295 while let Some((cursor2, r)) = heap.pop_equal(data, time) {
1296 diff += r;
1297 if let Some(cursor2) = cursor2.step() {
1298 heap.push(cursor2);
1299 }
1300 }
1301
1302 if diff != Diff::ZERO {
1303 merged.push((data, time, diff));
1304 }
1305 if let Some(cursor1) = cursor1.step() {
1306 heap.push(cursor1);
1307 }
1308 }
1309
1310 merged
1311}
1312
1313/// A binary heap specialized for merging [`Cursor`]s.
1314struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1315
1316impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1317 fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1318 let inner = cursors.into_iter().map(MergeCursor).collect();
1319 Self(inner)
1320 }
1321}
1322
1323impl<D: Data> MergeHeap<D> {
1324 /// Pop the next cursor (the one yielding the least update) from the heap.
1325 fn pop(&mut self) -> Option<Cursor<D>> {
1326 self.0.pop().map(|MergeCursor(c)| c)
1327 }
1328
1329 /// Pop the next cursor from the heap, provided the data and time of its current update are
1330 /// equal to the given values.
1331 ///
1332 /// Returns both the cursor and the diff corresponding to `data` and `time`.
1333 fn pop_equal(&mut self, data: &D, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1334 let MergeCursor(cursor) = self.0.peek()?;
1335 let (d, t, r) = cursor.get();
1336 if d == data && t == time {
1337 let cursor = self.pop().expect("checked above");
1338 Some((cursor, r))
1339 } else {
1340 None
1341 }
1342 }
1343
1344 /// Push a cursor onto the heap.
1345 fn push(&mut self, cursor: Cursor<D>) {
1346 self.0.push(MergeCursor(cursor));
1347 }
1348}
1349
1350/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1351///
1352/// Implements the cursor ordering required for merging cursors.
1353struct MergeCursor<D: Data>(Cursor<D>);
1354
1355impl<D: Data> PartialEq for MergeCursor<D> {
1356 fn eq(&self, other: &Self) -> bool {
1357 self.cmp(other).is_eq()
1358 }
1359}
1360
1361impl<D: Data> Eq for MergeCursor<D> {}
1362
1363impl<D: Data> PartialOrd for MergeCursor<D> {
1364 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1365 Some(self.cmp(other))
1366 }
1367}
1368
1369impl<D: Data> Ord for MergeCursor<D> {
1370 fn cmp(&self, other: &Self) -> Ordering {
1371 let (d1, t1, _) = self.0.get();
1372 let (d2, t2, _) = other.0.get();
1373 (t1, d1).cmp(&(t2, d2)).reverse()
1374 }
1375}