mz_compute/sink/correction_v2.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An implementation of the `Correction` data structure used by the MV sink's `write_batches`
11//! operator to stash updates before they are written.
12//!
13//! The `Correction` data structure provides methods to:
14//! * insert new updates
15//! * advance the compaction frontier (called `since`)
16//! * obtain an iterator over consolidated updates before some `upper`
17//! * force consolidation of updates before some `upper`
18//!
19//! The goal is to provide good performance for each of these operations, even in the presence of
20//! future updates. MVs downstream of temporal filters might have to deal with large amounts of
21//! retractions for future times and we want those to be handled efficiently as well.
22//!
23//! Note that `Correction` does not provide a method to directly remove updates. Instead updates
24//! are removed by inserting their retractions so that they consolidate away to nothing.
25//!
26//! ## Storage of Updates
27//!
28//! Stored updates are of the form `(data, time, diff)`, where `time` and `diff` are fixed to
29//! [`mz_repr::Timestamp`] and [`mz_repr::Diff`], respectively.
30//!
31//! [`CorrectionV2`] holds onto a list of `Chain`s containing `Chunk`s of stashed updates. Each
32//! `Chunk` is a columnation region containing a fixed maximum number of updates. All updates in
33//! a chunk, and all updates in a chain, are ordered by (time, data) and consolidated.
34//!
35//! Chains live in three places:
36//!
37//! * A [`BucketChain`] partitions times at or beyond the `boundary` (the largest read `upper`
38//! seen so far) into buckets of exponentially growing time ranges, each holding a list of
39//! chains. Reads only touch the buckets below their `upper`, so the bulk of the buffered
40//! updates — in particular far-future retractions produced by temporal filters — is left
41//! alone.
42//! * `pending_low` holds chains at times below the `boundary`, mostly insertions arriving
43//! through the persist feedback.
44//! * `emitted` is a single chain holding the updates returned by the last read. Updates must
45//! stay in the buffer until their feedback retractions arrive, and keeping them separate from
46//! the bucket chain means reads never have to re-merge future updates.
47//!
48//! ```text
49//! chain[0] | chain[1] | chain[2]
50//! | |
51//! chunk[0] | chunk[0] | chunk[0]
52//! (a, 1, +1) | (a, 1, +1) | (d, 3, +1)
53//! (b, 1, +1) | (b, 2, -1) | (d, 4, -1)
54//! chunk[1] | chunk[1] |
55//! (c, 1, +1) | (c, 2, -2) |
56//! (a, 2, -1) | (c, 4, -1) |
57//! chunk[2] | |
58//! (b, 2, +1) | |
59//! (c, 2, +1) | |
60//! chunk[3] | |
61//! (b, 3, -1) | |
62//! (c, 3, +1) | |
63//! ```
64//!
65//! The "chain invariant" states that each chain in a bucket has at least `chain_proportionality` times as
66//! many updates as the next one. This means that chain sizes will often be powers of
67//! `chain_proportionality`, but they don't have to be. For example, for a proportionality of 2,
68//! the chain sizes `[11, 5, 2, 1]` would satisfy the chain invariant.
69//!
70//! Note that the invariant is maintained on update counts, not chunk counts. Chunks are
71//! byte-bounded (see `ChunkBuilder`), so chunk count is not proportional to update count and
72//! would be a poor proxy: any chain below the chunk byte boundary is a single chunk regardless
73//! of how many updates it holds, which would let the geometric invariant collapse and break the
74//! O(log N) amortization of inserts.
75//!
76//! Choosing the `chain_proportionality` value allows tuning the trade-off between memory and CPU
77//! resources required to maintain corrections. A higher proportionality forces more frequent chain
78//! merges, and therefore consolidation, reducing memory usage but increasing CPU usage.
79//!
80//! ## Inserting Updates
81//!
82//! A batch of updates is routed by time: updates below the `boundary` become a `pending_low`
83//! chain, the rest is appended as new chains to their respective buckets. Appending to a bucket
84//! merges chains until the chain invariant is restored.
85//!
86//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
87//! chunk, copying the update in, and then likely merging with an existing chain to restore the
88//! chain invariant. If updates trickle in in small batches, this can cause a considerable
89//! overhead. To amortize this overhead, new updates aren't immediately inserted into the sorted
90//! chains but instead stored in a `Stage` buffer. Once enough updates have been staged to fill a
91//! `Chunk`, they are sorted and routed.
92//!
93//! The insert operation has an amortized complexity of O(log N), with N being the current number
94//! of updates stored.
95//!
96//! ## Retrieving Consolidated Updates
97//!
98//! Retrieving consolidated updates before a given `upper` works by peeling all buckets below the
99//! `upper` off the bucket chain, splitting their chains, the pending low chains, and the previous
100//! `emitted` chain at the `upper`, merging the parts below the `upper` into the new `emitted`
101//! chain, and returning an iterator over that chain.
102//!
103//! Because each chain contains updates ordered by time first, splitting a chain at the `upper`
104//! reuses whole chunks and copies at most one chunk straddling the split point. Updates at times
105//! at or beyond the `upper` are never touched, no matter how many the buffer holds. The
106//! complexity of a read is O(U log K), with U being the number of updates before `upper` and K
107//! the number of chains containing them.
108//!
109//! ## Merging Chains
110//!
111//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
112//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
113//! complexity of a merge of K chains containing N updates is O(N log K).
114//!
115//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
116//! far the times of updates should be advanced. Advancing times in a sorted chain of updates
117//! can make them become unsorted, so we cannot just merge the chains from top to bottom.
118//!
119//! For example, consider these two chains, assuming `since = [2]`:
120//! chain 1: [(c, 1, +1), (b, 2, -1), (a, 3, -1)]
121//! chain 2: [(b, 1, +1), (a, 2, +1), (c, 2, -1)]
122//! After time advancement, the chains look like this:
123//! chain 1: [(c, 2, +1), (b, 2, -1), (a, 3, -1)]
124//! chain 2: [(b, 2, +1), (a, 2, +1), (c, 2, -1)]
125//! Merging them naively yields [(b, 2, +1), (a, 2, +1), (b, 2, -1), (a, 3, -1)], a chain that's
126//! neither sorted nor consolidated.
127//!
128//! Times below the `since` can only exist in chains read by `consolidate_before`, and only if
129//! the `since` advanced past buffered times since the previous read. For few distinct stale
130//! times — the steady state, where the previously emitted chain was written just before the
131//! since advanced past it — we merge sub-chains, one for each distinct time that's before or at
132//! the `since`. Each of these sub-chains retains the (time, data) ordering after the time
133//! advancement to `since`, so merging those yields the expected result.
134//!
135//! For the above example, the chains we would merge are:
136//! chain 1.a: [(c, 2, +1)]
137//! chain 1.b: [(b, 2, -1), (a, 3, -1)]
138//! chain 2.a: [(b, 2, +1)],
139//! chain 2.b: [(a, 2, +1), (c, 2, -1)]
140//!
141//! For many distinct stale times — e.g. a since jump across many buffered timestamps when a sink
142//! restarts with an old as-of — the number of sub-chains grows with the number of distinct times,
143//! so we instead materialize the affected updates, advance their times, and sort and consolidate
144//! them in one O(U log U) pass.
145
146use std::cmp::Ordering;
147use std::collections::{BinaryHeap, VecDeque};
148use std::fmt;
149use std::rc::Rc;
150use std::sync::{Mutex, OnceLock};
151
152use columnar::{Columnar, Index, Len, Ref};
153use mz_ore::cast::CastLossy;
154use mz_ore::soft_assert_or_log;
155use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
156use mz_repr::{Diff, Timestamp};
157use mz_timely_util::column_pager::{self, PagedColumn};
158use mz_timely_util::columnar::Column;
159use mz_timely_util::temporal::{Bucket, BucketChain};
160use timely::PartialOrder;
161use timely::dataflow::channels::ContainerBytes;
162use timely::progress::Antichain;
163
164use crate::sink::correction::{ChannelLogging, SizeMetrics};
165
166/// Convenient alias for use in data trait bounds.
167///
168/// `D` is constrained to be `Columnar`, so that updates can be stored in a single columnar
169/// region per chunk, and the variable-length payload (e.g. `Row` bytes) lives in the same
170/// allocation as the rest of the chunk. The `Ref`-level `Eq + Ord` bounds let the merge/heap
171/// code compare updates directly through the columnar borrow, avoiding `into_owned` clones
172/// on the hot path.
173pub trait Data:
174 differential_dataflow::Data
175 + Columnar<Container: Send + Sync + Clone + for<'a> columnar::Borrow<Ref<'a>: Eq + Ord>>
176 + Send
177 + Sync
178{
179}
180impl<D> Data for D where
181 D: differential_dataflow::Data
182 + Columnar<Container: Send + Sync + Clone + for<'a> columnar::Borrow<Ref<'a>: Eq + Ord>>
183 + Send
184 + Sync
185{
186}
187
188/// A data structure used to store corrections in the MV sink implementation.
189///
190/// In contrast to `CorrectionV1`, this implementation stores updates in columnation regions,
191/// allowing their memory to be transparently spilled to disk.
192#[derive(Debug)]
193pub struct CorrectionV2<D: Data> {
194 /// Bucketed storage for updates at times at or beyond `boundary`.
195 ///
196 /// Buckets cover exponentially growing time ranges, so reads only touch the buckets below
197 /// their `upper`, and far-future updates (e.g. retractions produced by temporal filters) are
198 /// rarely touched.
199 chain: BucketChain<ChainBucket<D>>,
200 /// Chains at times below `boundary` that were not yet emitted.
201 ///
202 /// Filled by inserts at times below the boundary (mostly persist feedback) and by the
203 /// remainders of `emitted` when a read uses a smaller `upper` than the previous one. Merged
204 /// into `emitted` by the next read.
205 pending_low: Vec<Chain<D>>,
206 /// Updates that were emitted by `updates_before` but not yet cancelled by persist feedback.
207 ///
208 /// Sorted and consolidated, with all times advanced to the `since`.
209 emitted: Chain<D>,
210 /// A staging area for updates, to speed up small inserts.
211 stage: Stage<D>,
212 /// The lower bound of times stored in `chain`. Only ever advances.
213 ///
214 /// Times below the boundary have been peeled off the bucket chain and can only be stored in
215 /// `pending_low` or `emitted`.
216 boundary: Antichain<Timestamp>,
217 /// The frontier by which all contained times are advanced.
218 since: Antichain<Timestamp>,
219
220 /// Total count of updates in the correction buffer.
221 ///
222 /// Tracked to compute deltas in `update_metrics`.
223 prev_update_count: usize,
224 /// Total heap size used by the correction buffer.
225 ///
226 /// Tracked to compute deltas in `update_metrics`.
227 prev_size: SizeMetrics,
228 /// Global persist sink metrics.
229 metrics: SinkMetrics,
230 /// Per-worker persist sink metrics.
231 worker_metrics: SinkWorkerMetrics,
232 /// Introspection logging.
233 logging: Option<ChannelLogging>,
234}
235
236/// Fuel for restoring the bucket chain invariant after peeling.
237///
238/// Bounds the restoration work per buffer operation. The bucket chain remains functional when
239/// restoration is incomplete -- peeling and finding work on ill-formed chains, at the cost of
240/// more in-line splitting -- so leftover restoration is simply picked up by the next operation.
241///
242/// `restore` spends one unit of fuel per bucket split, and a single `peel` leaves at most
243/// `BucketTimestamp::DOMAIN` (64) buckets to re-split, so this budget completes restoration in one
244/// call for any realistic buffer. It is deliberately generous: the "incomplete restoration is
245/// picked up next op" path is a correctness safety net for pathological bucket counts, not a hot
246/// path we expect to exercise. Lower it if restoration ever needs to interleave with other work.
247const RESTORE_FUEL: i64 = 1_000_000;
248
249impl<D: Data> CorrectionV2<D> {
250 /// Construct a new [`CorrectionV2`] instance.
251 pub fn new(
252 metrics: SinkMetrics,
253 worker_metrics: SinkWorkerMetrics,
254 logging: Option<ChannelLogging>,
255 chain_proportionality: f64,
256 chunk_size: usize,
257 ) -> Self {
258 let update_size = std::mem::size_of::<(D, Timestamp, Diff)>();
259 let chunk_capacity = std::cmp::max(chunk_size / update_size, 1);
260
261 Self {
262 chain: BucketChain::new(ChainBucket::new(chain_proportionality, logging.clone())),
263 pending_low: Vec::new(),
264 emitted: Chain::new(),
265 stage: Stage::new(logging.clone(), chunk_capacity),
266 boundary: Antichain::from_elem(Timestamp::MIN),
267 since: Antichain::from_elem(Timestamp::MIN),
268 prev_update_count: 0,
269 prev_size: Default::default(),
270 metrics,
271 worker_metrics,
272 logging,
273 }
274 }
275
276 /// Insert a batch of updates.
277 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
278 let Some(since_ts) = self.since.as_option() else {
279 // If the since is the empty frontier, discard all updates.
280 updates.clear();
281 return;
282 };
283
284 for (_, time, _) in &mut *updates {
285 *time = std::cmp::max(*time, *since_ts);
286 }
287
288 self.insert_inner(updates);
289 }
290
291 /// Insert a batch of updates, after negating their diffs.
292 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
293 let Some(since_ts) = self.since.as_option() else {
294 // If the since is the empty frontier, discard all updates.
295 updates.clear();
296 return;
297 };
298
299 for (_, time, diff) in &mut *updates {
300 *time = std::cmp::max(*time, *since_ts);
301 *diff = -*diff;
302 }
303
304 self.insert_inner(updates);
305 }
306
307 /// Insert a batch of updates into the stage, flushing it when full.
308 ///
309 /// All times are expected to be >= the `since`.
310 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
311 debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));
312
313 if let Some(mut ready) = self.stage.insert(updates) {
314 self.route(&mut ready);
315 }
316
317 self.update_metrics();
318 }
319
320 /// Route a batch of sorted, consolidated updates to `pending_low` or their chain buckets.
321 fn route(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
322 // Updates at times below the boundary become a pending low chain.
323 let idx = updates.partition_point(|(_, t, _)| !self.boundary.less_equal(t));
324 if idx > 0 {
325 let mut builder = ChainBuilder::default();
326 builder.extend(updates.drain(..idx));
327 let chain = builder.finish();
328 if !chain.is_empty() {
329 self.log_chain_created(&chain);
330 self.pending_low.push(chain);
331 }
332 }
333
334 // Updates at times at or beyond the boundary go into their chain buckets. Walk ranges of
335 // times that fall into the same bucket, to push batches of updates at once.
336 let mut drain = updates.drain(..).peekable();
337 while let Some(update) = drain.next() {
338 let time = update.1;
339 let range = self
340 .chain
341 .range_of(&time)
342 .expect("bucket chain covers all times at or beyond the boundary");
343 let mut builder = ChainBuilder::default();
344 builder.extend(std::iter::once(update));
345 while let Some(update) = drain.next_if(|(_, t, _)| range.contains(t)) {
346 builder.extend(std::iter::once(update));
347 }
348 let bucket = self
349 .chain
350 .find_mut(&range.start)
351 .expect("bucket chain covers all times at or beyond the boundary");
352 bucket.push_chain(builder.finish());
353 }
354 }
355
356 /// Return consolidated updates before the given `upper`.
357 pub fn updates_before<'a>(
358 &'a mut self,
359 upper: &Antichain<Timestamp>,
360 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + Send + 'a {
361 // All contained times are advanced to at least the `since`, so a read at an `upper` that
362 // is not beyond the `since` is always empty. Short-circuit to avoid the eager peel, merge,
363 // and `boundary` advancement that `consolidate_before` would otherwise perform. Normal
364 // reads and `consolidate_at_since` always pass an `upper` beyond the `since`.
365 if !PartialOrder::less_than(&self.since, upper) {
366 return None.into_iter().flatten();
367 }
368
369 self.consolidate_before(upper);
370
371 // After `consolidate_before`, `emitted` holds exactly the updates before `upper`: every
372 // path that populates it splits at `upper` (pushing the remainder to `pending_low`), and
373 // the guard above guarantees `upper > since`, so advancing stale times to the `since`
374 // cannot lift them to or beyond `upper`. We can therefore yield all of `emitted`. Guard
375 // the invariant: a violation would write updates beyond the batch upper to persist.
376 soft_assert_or_log!(
377 self.emitted
378 .last()
379 .is_none_or(|(_, t, _)| !upper.less_equal(&t)),
380 "emitted contains times at or beyond the upper",
381 );
382 Some(self.emitted.iter()).into_iter().flatten()
383 }
384
385 /// Consolidate all updates before the given `upper` into the `emitted` chain.
386 ///
387 /// Once this method returns, `emitted` contains all updates at times before `upper`,
388 /// consolidated. It can also contain updates at times at or beyond `upper` if `upper` is not
389 /// beyond the `since`.
390 fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
391 if let Some(mut ready) = self.stage.flush() {
392 self.route(&mut ready);
393 }
394
395 let Some(&since_ts) = self.since.as_option() else {
396 // If the since is the empty frontier, discard all updates.
397 let peeled = self.chain.peel(Antichain::new().borrow());
398 for bucket in peeled {
399 for chain in bucket.into_chains() {
400 self.log_chain_dropped(&chain);
401 }
402 }
403 for chain in std::mem::take(&mut self.pending_low) {
404 self.log_chain_dropped(&chain);
405 }
406 let emitted = std::mem::replace(&mut self.emitted, Chain::new());
407 if !emitted.is_empty() {
408 self.log_chain_dropped(&emitted);
409 }
410 self.update_metrics();
411 return;
412 };
413
414 // Peel the buckets below the upper off the bucket chain. Bucket splits during the peel
415 // only touch chunks around the upper; chunks wholly on either side are reused.
416 let peeled = self.chain.peel(upper.borrow());
417 if PartialOrder::less_than(&self.boundary, upper) {
418 self.boundary = upper.clone();
419 }
420
421 // Collect candidate chains: peeled bucket contents, pending low chains, and the previous
422 // emitted chain. All contain only times below the boundary.
423 let emitted = std::mem::replace(&mut self.emitted, Chain::new());
424 let mut candidates: Vec<Chain<D>> = Vec::new();
425 for bucket in peeled {
426 candidates.extend(bucket.into_chains());
427 }
428 candidates.append(&mut self.pending_low);
429 if !emitted.is_empty() {
430 candidates.push(emitted);
431 }
432
433 if candidates.is_empty() {
434 self.restore_chain();
435 self.update_metrics();
436 return;
437 }
438
439 candidates.iter().for_each(|c| self.log_chain_dropped(c));
440
441 // Split the candidates at the upper. Parts at or beyond the upper (possible when `upper`
442 // regresses below a previous one) stay pending.
443 let mut lowers = Vec::new();
444 for chain in candidates {
445 match upper.as_option() {
446 Some(&upper_ts) => {
447 let (lower, remainder) = chain.split_at_time(upper_ts);
448 if !lower.is_empty() {
449 lowers.push(lower);
450 }
451 if !remainder.is_empty() {
452 self.log_chain_created(&remainder);
453 self.pending_low.push(remainder);
454 }
455 }
456 // The empty upper is greater than all times.
457 None => lowers.push(chain),
458 }
459 }
460
461 // Merge the lower parts into the new emitted chain, advancing times below the since.
462 // Advancing times in a (time, data)-sorted chain can break its sort order, so chains
463 // containing stale times cannot be merged as they are. Stale times are expected in steady
464 // state: the previous emitted chain was written before the since advanced past it.
465 //
466 // Count the distinct stale times, up to a small cap. For few distinct stale times -- the
467 // steady state -- split cursors into runs that remain sorted under advancement and merge
468 // those. For many distinct stale times -- e.g. a since jump across many buffered
469 // timestamps when a sink restarts with an old as-of -- the number of runs and the cost of
470 // cloning cursor state per run grow with the number of distinct times, so materialize,
471 // advance, and consolidate in one O(U log U) pass instead.
472 const MAX_STALE_RUNS: usize = 32;
473 let mut stale_times = 0;
474 for chain in &lowers {
475 stale_times += chain.distinct_times_before(since_ts, MAX_STALE_RUNS - stale_times);
476 if stale_times >= MAX_STALE_RUNS {
477 break;
478 }
479 }
480
481 let merged = if stale_times == 0 {
482 let cursors: Vec<_> = lowers.into_iter().filter_map(Chain::into_cursor).collect();
483 merge_cursors(cursors)
484 } else if stale_times < MAX_STALE_RUNS {
485 let mut runs = Vec::new();
486 for chain in lowers {
487 if let Some(cursor) = chain.into_cursor() {
488 runs.append(&mut cursor.advance_by(since_ts));
489 }
490 }
491 merge_cursors(runs)
492 } else {
493 let mut updates: Vec<_> = lowers.iter().flat_map(|c| c.iter()).collect();
494 for (_, time, _) in &mut updates {
495 *time = std::cmp::max(*time, since_ts);
496 }
497 consolidate(&mut updates);
498 let mut builder = ChainBuilder::default();
499 builder.extend(updates);
500 let chain = builder.finish();
501
502 // Advancement can move updates to or beyond the upper; such updates stay pending.
503 match upper.as_option() {
504 Some(&upper_ts) => {
505 let (lower, remainder) = chain.split_at_time(upper_ts);
506 if !remainder.is_empty() {
507 self.log_chain_created(&remainder);
508 self.pending_low.push(remainder);
509 }
510 lower
511 }
512 None => chain,
513 }
514 };
515
516 if !merged.is_empty() {
517 self.log_chain_created(&merged);
518 }
519 self.emitted = merged;
520
521 self.restore_chain();
522 self.update_metrics();
523 }
524
525 /// Perform a bounded amount of work towards restoring the bucket chain invariant.
526 ///
527 /// Restoration is allowed to remain incomplete: the bucket chain supports peeling and finding
528 /// on ill-formed chains, so any leftover work is picked up by subsequent operations. The fuel
529 /// bound keeps individual buffer operations from stalling the operator that owns the buffer.
530 fn restore_chain(&mut self) {
531 let mut fuel = RESTORE_FUEL;
532 self.chain.restore(&mut fuel);
533 }
534
535 /// Advance the since frontier.
536 ///
537 /// Time advancement of updates in the bucket chain is lazy: it happens when the updates are
538 /// consolidated by a read.
539 ///
540 /// # Panics
541 ///
542 /// Panics if the given `since` is less than the current since frontier.
543 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
544 assert!(PartialOrder::less_equal(&self.since, &since));
545 self.stage.advance_times(&since);
546 self.since = since;
547 }
548
549 /// Consolidate all updates at the current `since`.
550 pub fn consolidate_at_since(&mut self) {
551 let upper_ts = self.since.as_option().and_then(|t| t.try_step_forward());
552 if let Some(upper_ts) = upper_ts {
553 let upper = Antichain::from_elem(upper_ts);
554 self.consolidate_before(&upper);
555 }
556 }
557
558 fn log_chain_created(&self, chain: &Chain<D>) {
559 if let Some(logging) = &self.logging {
560 logging.chain_created(chain.update_count);
561 }
562 }
563
564 fn log_chain_dropped(&self, chain: &Chain<D>) {
565 if let Some(logging) = &self.logging {
566 logging.chain_dropped(chain.update_count);
567 }
568 }
569
570 /// Update persist sink metrics.
571 fn update_metrics(&mut self) {
572 let mut new_size = self.stage.get_size();
573 let mut new_length = self.stage.data.len();
574 for chain in &self.pending_low {
575 new_size += chain.get_size();
576 new_length += chain.update_count;
577 }
578 new_size += self.emitted.get_size();
579 new_length += self.emitted.update_count;
580 for bucket in self.chain.buckets() {
581 for chain in &bucket.chains {
582 new_size += chain.get_size();
583 new_length += chain.update_count;
584 }
585 }
586
587 self.update_metrics_inner(new_size, new_length);
588 }
589
590 /// Update persist sink metrics to the given new size and length.
591 fn update_metrics_inner(&mut self, new_size: SizeMetrics, new_length: usize) {
592 let old_size = self.prev_size;
593 let old_length = self.prev_update_count;
594 let len_delta = UpdateDelta::new(new_length, old_length);
595 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
596 self.metrics
597 .report_correction_update_deltas(len_delta, cap_delta);
598 self.worker_metrics
599 .report_correction_update_totals(new_length, new_size.capacity);
600
601 if let Some(logging) = &self.logging {
602 let i = |x: usize| isize::try_from(x).expect("must fit");
603 logging.report_size_diff(i(new_size.size) - i(old_size.size));
604 logging.report_capacity_diff(i(new_size.capacity) - i(old_size.capacity));
605 logging.report_allocations_diff(i(new_size.allocations) - i(old_size.allocations));
606 }
607
608 self.prev_size = new_size;
609 self.prev_update_count = new_length;
610 }
611}
612
613/// Merge the given cursors into one chain.
614fn merge_cursors<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
615 match cursors.len() {
616 0 => Chain::new(),
617 1 => {
618 let [cur] = cursors.try_into().unwrap();
619 cur.into_chain()
620 }
621 2 => {
622 let [a, b] = cursors.try_into().unwrap();
623 merge_2(a, b)
624 }
625 _ => merge_many(cursors),
626 }
627}
628
629/// Merge the given two cursors using a 2-way merge.
630///
631/// This function is a specialization of `merge_many` that avoids the overhead of a binary heap.
632fn merge_2<D: Data>(cursor1: Cursor<D>, cursor2: Cursor<D>) -> Chain<D> {
633 let mut rest1 = Some(cursor1);
634 let mut rest2 = Some(cursor2);
635 let mut merged = ChainBuilder::default();
636
637 loop {
638 match (rest1, rest2) {
639 (Some(c1), Some(c2)) => {
640 let (d1, t1, r1) = c1.get();
641 let (d2, t2, r2) = c2.get();
642
643 match (t1, d1).cmp(&(t2, d2)) {
644 Ordering::Less => {
645 merged.push_ref((d1, t1, r1));
646 rest1 = c1.step();
647 rest2 = Some(c2);
648 }
649 Ordering::Greater => {
650 merged.push_ref((d2, t2, r2));
651 rest1 = Some(c1);
652 rest2 = c2.step();
653 }
654 Ordering::Equal => {
655 let r = r1 + r2;
656 if r != Diff::ZERO {
657 merged.push_ref((d1, t1, r));
658 }
659 rest1 = c1.step();
660 rest2 = c2.step();
661 }
662 }
663 }
664 (Some(c), None) | (None, Some(c)) => {
665 merged.push_cursor(c);
666 break;
667 }
668 (None, None) => break,
669 }
670 }
671
672 merged.finish()
673}
674
675/// Merge the given cursors using a k-way merge with a binary heap.
676fn merge_many<D: Data>(cursors: Vec<Cursor<D>>) -> Chain<D> {
677 let mut heap = MergeHeap::from_iter(cursors);
678 let mut merged = ChainBuilder::default();
679 while let Some(cursor1) = heap.pop() {
680 let (data, time, mut diff) = cursor1.get();
681
682 while let Some((cursor2, r)) = heap.pop_equal(data, time) {
683 diff += r;
684 if let Some(cursor2) = cursor2.step() {
685 heap.push(cursor2);
686 }
687 }
688
689 if diff != Diff::ZERO {
690 merged.push_ref((data, time, diff));
691 }
692 if let Some(cursor1) = cursor1.step() {
693 heap.push(cursor1);
694 }
695 }
696
697 merged.finish()
698}
699
700impl<D: Data> Drop for CorrectionV2<D> {
701 fn drop(&mut self) {
702 for bucket in self.chain.buckets() {
703 bucket.chains.iter().for_each(|c| self.log_chain_dropped(c));
704 }
705 self.pending_low
706 .iter()
707 .for_each(|c| self.log_chain_dropped(c));
708 if !self.emitted.is_empty() {
709 self.log_chain_dropped(&self.emitted);
710 }
711 self.update_metrics_inner(Default::default(), 0);
712 }
713}
714
715/// A bucket of `Chain`s, for use in a [`BucketChain`].
716///
717/// All chains are individually sorted by (time, data) and consolidated, but updates can appear in
718/// multiple chains, so consumers must merge the chains to obtain consolidated updates.
719struct ChainBucket<D: Data> {
720 /// The contained chains.
721 ///
722 /// Maintained with the chain invariant on pushes; splits can leave it violated until the next
723 /// push restores it.
724 chains: Vec<Chain<D>>,
725 /// The size factor of subsequent chains required by the chain invariant.
726 chain_proportionality: f64,
727 /// Introspection logging.
728 logging: Option<ChannelLogging>,
729}
730
731impl<D: Data> fmt::Debug for ChainBucket<D> {
732 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
733 f.debug_struct("ChainBucket")
734 .field("chains", &self.chains)
735 .finish_non_exhaustive()
736 }
737}
738
739impl<D: Data> ChainBucket<D> {
740 /// Construct a new, empty `ChainBucket`.
741 fn new(chain_proportionality: f64, logging: Option<ChannelLogging>) -> Self {
742 Self {
743 chains: Vec::new(),
744 chain_proportionality,
745 logging,
746 }
747 }
748
749 /// Push a chain onto the bucket, restoring the chain invariant.
750 fn push_chain(&mut self, chain: Chain<D>) {
751 if chain.is_empty() {
752 return;
753 }
754 if let Some(logging) = &self.logging {
755 logging.chain_created(chain.update_count);
756 }
757 self.chains.push(chain);
758
759 // Restore the chain invariant.
760 let prop = self.chain_proportionality;
761 let merge_needed = |chains: &[Chain<_>]| match chains {
762 [.., prev, last] => {
763 let last_len = f64::cast_lossy(last.update_count);
764 let prev_len = f64::cast_lossy(prev.update_count);
765 last_len * prop > prev_len
766 }
767 _ => false,
768 };
769
770 while merge_needed(&self.chains) {
771 let a = self.chains.pop().unwrap();
772 let b = self.chains.pop().unwrap();
773 if let Some(logging) = &self.logging {
774 logging.chain_dropped(a.update_count);
775 logging.chain_dropped(b.update_count);
776 }
777
778 let cursors = [a, b].into_iter().filter_map(Chain::into_cursor).collect();
779 let merged = merge_cursors(cursors);
780 if !merged.is_empty() {
781 if let Some(logging) = &self.logging {
782 logging.chain_created(merged.update_count);
783 }
784 self.chains.push(merged);
785 }
786 }
787 }
788
789 /// Convert the bucket into its contained chains.
790 fn into_chains(self) -> Vec<Chain<D>> {
791 self.chains
792 }
793}
794
795impl<D: Data> Bucket for ChainBucket<D> {
796 type Timestamp = Timestamp;
797
798 fn split(self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
799 let mut lower = Self::new(self.chain_proportionality, self.logging.clone());
800 let mut upper = Self::new(self.chain_proportionality, self.logging.clone());
801
802 for chain in self.chains {
803 // Whole chunks are reused; at most one chunk straddling the timestamp is copied per
804 // chain. Account fuel at chunk granularity.
805 *fuel = fuel.saturating_sub(i64::try_from(chain.chunks.len()).expect("must fit"));
806
807 if let Some(logging) = &self.logging {
808 logging.chain_dropped(chain.update_count);
809 }
810 let (lo, hi) = chain.split_at_time(*timestamp);
811 for (part, target) in [(lo, &mut lower), (hi, &mut upper)] {
812 if !part.is_empty() {
813 if let Some(logging) = &self.logging {
814 logging.chain_created(part.update_count);
815 }
816 target.chains.push(part);
817 }
818 }
819 }
820
821 (lower, upper)
822 }
823}
824
825/// A chain of [`Chunk`]s containing updates.
826///
827/// All updates in a chain are sorted by (time, data) and consolidated.
828///
829/// Note that, in contrast to [`Chunk`]s, chains can be empty. Though we generally try to avoid
830/// keeping around empty chains.
831#[derive(Debug)]
832struct Chain<D: Data> {
833 /// The contained chunks.
834 chunks: Vec<Chunk<D>>,
835 /// The number of updates contained in all chunks.
836 update_count: usize,
837}
838
839impl<D: Data> Chain<D> {
840 /// Construct an empty chain.
841 fn new() -> Self {
842 Self {
843 chunks: Default::default(),
844 update_count: 0,
845 }
846 }
847
848 /// Return whether the chain is empty.
849 fn is_empty(&self) -> bool {
850 self.chunks.is_empty()
851 }
852
853 /// Push a chunk onto the chain.
854 ///
855 /// All updates in the chunk must sort after all updates already in the chain, in
856 /// (time, data)-order, to ensure the chain remains sorted.
857 fn push_chunk(&mut self, chunk: Chunk<D>) {
858 debug_assert!(self.can_accept_chunk(&chunk));
859
860 self.update_count += chunk.len();
861 self.chunks.push(chunk);
862 }
863
864 /// Return whether the chain can accept the given chunk at its end while preserving
865 /// (time, data)-order.
866 ///
867 /// Uses the cached boundary times and only materializes the boundary chunks when the times
868 /// tie (a single timestamp straddling the chunk boundary), so the common
869 /// strictly-increasing-time case checks the invariant without paging chunks in.
870 fn can_accept_chunk(&self, chunk: &Chunk<D>) -> bool {
871 match self.chunks.last() {
872 None => true,
873 Some(last) => match last.last_time().cmp(&chunk.first_time()) {
874 Ordering::Less => true,
875 Ordering::Greater => false,
876 Ordering::Equal => {
877 let (dc, _, _) = last.last();
878 let (d, _, _) = chunk.first();
879 dc < d
880 }
881 },
882 }
883 }
884
885 /// Return the last update in the chain, if any.
886 fn last(&self) -> Option<Ref<'_, (D, Timestamp, Diff)>> {
887 self.chunks.last().map(|c| c.last())
888 }
889
890 /// Convert the chain into a cursor over the contained updates.
891 fn into_cursor(self) -> Option<Cursor<D>> {
892 let chunks = self.chunks.into_iter().map(Rc::new).collect();
893 Cursor::new(chunks)
894 }
895
896 /// Return an iterator over the contained updates.
897 fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
898 self.chunks.iter().flat_map(|c| {
899 (0..c.len()).map(move |i| {
900 let (d, t, r) = c.index(i);
901 (D::into_owned(d), t, r)
902 })
903 })
904 }
905
906 /// Count the distinct times of updates at times before `time`, up to the given cap.
907 ///
908 /// The scan uses one binary search per distinct time, so its cost is bounded by
909 /// O(cap log chunks).
910 fn distinct_times_before(&self, time: Timestamp, cap: usize) -> usize {
911 let mut count = 0;
912 let mut chunk_idx = 0;
913 let mut offset = 0;
914 while count < cap && chunk_idx < self.chunks.len() {
915 let chunk = &self.chunks[chunk_idx];
916 let current = chunk.index(offset).1;
917 if current >= time {
918 break;
919 }
920 count += 1;
921 // Skip to the first update at a time greater than `current`.
922 match chunk.find_time_greater_than(current) {
923 Some(idx) => offset = idx,
924 None => {
925 // All later updates at `current` are in subsequent chunks.
926 chunk_idx += 1;
927 offset = 0;
928 while chunk_idx < self.chunks.len() {
929 match self.chunks[chunk_idx].find_time_greater_than(current) {
930 Some(idx) => {
931 offset = idx;
932 break;
933 }
934 None => chunk_idx += 1,
935 }
936 }
937 }
938 }
939 }
940 count
941 }
942
943 /// Split the chain at the given time.
944 ///
945 /// Returns two chains, the first containing all updates at times < `time`, the second
946 /// containing all updates at times >= `time`. Chunks fully on either side of `time` are
947 /// reused; only a chunk straddling `time` is copied.
948 fn split_at_time(mut self, time: Timestamp) -> (Self, Self) {
949 let mut lower = Self::new();
950 let mut upper = Self::new();
951
952 let Some(skip_ts) = time.step_back() else {
953 // Nothing sorts before `time`.
954 return (lower, self);
955 };
956
957 for chunk in self.chunks.drain(..) {
958 // Route whole chunks by cached boundary times so a chunk that lands entirely on one
959 // side is moved without paging it in; only a straddling chunk is materialized.
960 if chunk.last_time() < time {
961 lower.push_chunk(chunk);
962 } else if chunk.first_time() >= time {
963 upper.push_chunk(chunk);
964 } else {
965 // The chunk straddles `time`; copy its two halves.
966 let idx = chunk
967 .find_time_greater_than(skip_ts)
968 .expect("straddles time");
969 let mut builder = ChainBuilder::default();
970 for i in 0..idx {
971 builder.push_ref(chunk.index(i));
972 }
973 for part in builder.finish().chunks {
974 lower.push_chunk(part);
975 }
976 let mut builder = ChainBuilder::default();
977 for i in idx..chunk.len() {
978 builder.push_ref(chunk.index(i));
979 }
980 for part in builder.finish().chunks {
981 upper.push_chunk(part);
982 }
983 }
984 }
985
986 (lower, upper)
987 }
988
989 /// Return the size of the chain, for use in metrics.
990 fn get_size(&self) -> SizeMetrics {
991 let mut metrics = SizeMetrics::default();
992 for chunk in &self.chunks {
993 metrics += chunk.get_size();
994 }
995 metrics
996 }
997}
998
999/// A builder that constructs a [`Chain`] from a stream of updates.
1000///
1001/// Wraps a [`ChunkBuilder`] and drains its minted chunks into a [`Chain`]. Pushed updates must
1002/// arrive in (time, data) sorted order.
1003struct ChainBuilder<D: Data> {
1004 builder: ChunkBuilder<D>,
1005 chain: Chain<D>,
1006}
1007
1008impl<D: Data> Default for ChainBuilder<D> {
1009 fn default() -> Self {
1010 Self {
1011 builder: Default::default(),
1012 chain: Chain::new(),
1013 }
1014 }
1015}
1016
1017impl<D: Data> ChainBuilder<D> {
1018 /// Push a reference-form update into the builder.
1019 fn push_ref(&mut self, update: Ref<'_, (D, Timestamp, Diff)>) {
1020 self.builder.push(update);
1021 self.drain();
1022 }
1023
1024 /// Push an owned-form update into the builder.
1025 fn push_owned(&mut self, update: &(D, Timestamp, Diff)) {
1026 self.builder.push(update);
1027 self.drain();
1028 }
1029
1030 /// Push the updates produced by a cursor into the builder.
1031 fn push_cursor(&mut self, cursor: Cursor<D>) {
1032 let mut rest = Some(cursor);
1033 while let Some(cursor) = rest.take() {
1034 let update = cursor.get();
1035 self.push_ref(update);
1036 rest = cursor.step();
1037 }
1038 }
1039
1040 /// Move any minted chunks from the builder into the chain.
1041 fn drain(&mut self) {
1042 while let Some(chunk) = self.builder.pop() {
1043 self.chain.push_chunk(chunk);
1044 }
1045 }
1046
1047 /// Finish building, returning the assembled [`Chain`].
1048 fn finish(self) -> Chain<D> {
1049 let Self { builder, mut chain } = self;
1050 for chunk in builder.finish() {
1051 if chunk.len() > 0 {
1052 chain.push_chunk(chunk);
1053 }
1054 }
1055 chain
1056 }
1057}
1058
1059impl<D: Data> Extend<(D, Timestamp, Diff)> for ChainBuilder<D> {
1060 fn extend<I: IntoIterator<Item = (D, Timestamp, Diff)>>(&mut self, iter: I) {
1061 for update in iter {
1062 self.push_owned(&update);
1063 }
1064 }
1065}
1066
1067/// A cursor over updates in a chain.
1068///
1069/// A cursor provides two guarantees:
1070/// * Produced updates are ordered and consolidated.
1071/// * A cursor always yields at least one update.
1072///
1073/// The second guarantee is enforced through the type system: Every method that steps a cursor
1074/// forward consumes `self` and returns an `Option<Cursor>` that's `None` if the operation stepped
1075/// over the last update.
1076///
1077/// A cursor holds on to `Rc<Chunk>`s, allowing multiple cursors to produce updates from the same
1078/// chunks concurrently. As soon as a cursor is done producing updates from a [`Chunk`] it drops
1079/// its reference. Once the last cursor is done with a [`Chunk`] its memory can be reclaimed.
1080#[derive(Clone, Debug)]
1081struct Cursor<D: Data> {
1082 /// The chunks from which updates can still be produced.
1083 chunks: VecDeque<Rc<Chunk<D>>>,
1084 /// The current offset into `chunks.front()`.
1085 chunk_offset: usize,
1086 /// An optional limit for the number of updates the cursor will produce.
1087 limit: Option<usize>,
1088 /// An optional overwrite for the timestamp of produced updates.
1089 overwrite_ts: Option<Timestamp>,
1090}
1091
1092impl<D: Data> Cursor<D> {
1093 /// Construct a cursor over a list of chunks.
1094 ///
1095 /// Returns `None` if `chunks` is empty.
1096 fn new(chunks: VecDeque<Rc<Chunk<D>>>) -> Option<Self> {
1097 if chunks.is_empty() {
1098 return None;
1099 }
1100
1101 Some(Self {
1102 chunks,
1103 chunk_offset: 0,
1104 limit: None,
1105 overwrite_ts: None,
1106 })
1107 }
1108
1109 /// Set a limit for the number of updates this cursor will produce.
1110 ///
1111 /// # Panics
1112 ///
1113 /// Panics if there is already a limit lower than the new one.
1114 fn set_limit(mut self, limit: usize) -> Option<Self> {
1115 assert!(self.limit.is_none_or(|l| l >= limit));
1116
1117 if limit == 0 {
1118 return None;
1119 }
1120
1121 // Release chunks made unreachable by the limit.
1122 let mut count = 0;
1123 let mut idx = 0;
1124 let mut offset = self.chunk_offset;
1125 while idx < self.chunks.len() && count < limit {
1126 let chunk = &self.chunks[idx];
1127 count += chunk.len() - offset;
1128 idx += 1;
1129 offset = 0;
1130 }
1131 self.chunks.truncate(idx);
1132
1133 if count > limit {
1134 self.limit = Some(limit);
1135 }
1136
1137 Some(self)
1138 }
1139
1140 /// Get a reference to the current update.
1141 fn get(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1142 let chunk = self.get_chunk();
1143 let (d, t, r) = chunk.index(self.chunk_offset);
1144 let t = self.overwrite_ts.unwrap_or(t);
1145 (d, t, r)
1146 }
1147
1148 /// Get a reference to the current chunk.
1149 fn get_chunk(&self) -> &Chunk<D> {
1150 &self.chunks[0]
1151 }
1152
1153 /// Step to the next update.
1154 ///
1155 /// Returns the stepped cursor, or `None` if the step was over the last update.
1156 fn step(mut self) -> Option<Self> {
1157 if self.chunk_offset == self.get_chunk().len() - 1 {
1158 return self.skip_chunk().map(|(c, _)| c);
1159 }
1160
1161 self.chunk_offset += 1;
1162
1163 if let Some(limit) = &mut self.limit {
1164 *limit -= 1;
1165 if *limit == 0 {
1166 return None;
1167 }
1168 }
1169
1170 Some(self)
1171 }
1172
1173 /// Skip the remainder of the current chunk.
1174 ///
1175 /// Returns the forwarded cursor and the number of updates skipped, or `None` if no chunks are
1176 /// left after the skip.
1177 fn skip_chunk(mut self) -> Option<(Self, usize)> {
1178 let chunk = self.chunks.pop_front().expect("cursor invariant");
1179
1180 if self.chunks.is_empty() {
1181 return None;
1182 }
1183
1184 let skipped = chunk.len() - self.chunk_offset;
1185 self.chunk_offset = 0;
1186
1187 if let Some(limit) = &mut self.limit {
1188 if skipped >= *limit {
1189 return None;
1190 }
1191 *limit -= skipped;
1192 }
1193
1194 Some((self, skipped))
1195 }
1196
1197 /// Skip all updates with times <= the given time.
1198 ///
1199 /// Returns the forwarded cursor and the number of updates skipped, or `None` if no updates are
1200 /// left after the skip.
1201 fn skip_time(mut self, time: Timestamp) -> Option<(Self, usize)> {
1202 if self.overwrite_ts.is_some_and(|ts| ts <= time) {
1203 return None;
1204 } else if self.get().1 > time {
1205 return Some((self, 0));
1206 }
1207
1208 let mut skipped = 0;
1209
1210 let new_offset = loop {
1211 let chunk = self.get_chunk();
1212 if let Some(index) = chunk.find_time_greater_than(time) {
1213 break index;
1214 }
1215
1216 let (cursor, count) = self.skip_chunk()?;
1217 self = cursor;
1218 skipped += count;
1219 };
1220
1221 skipped += new_offset - self.chunk_offset;
1222 self.chunk_offset = new_offset;
1223
1224 Some((self, skipped))
1225 }
1226
1227 /// Advance all updates in this cursor by the given `since_ts`.
1228 ///
1229 /// Returns a list of cursors, each of which yields ordered and consolidated updates that have
1230 /// been advanced by `since_ts`.
1231 fn advance_by(mut self, since_ts: Timestamp) -> Vec<Self> {
1232 // If the cursor has an `overwrite_ts`, all its updates are at the same time already. We
1233 // only need to advance the `overwrite_ts` by the `since_ts`.
1234 if let Some(ts) = self.overwrite_ts {
1235 if ts < since_ts {
1236 self.overwrite_ts = Some(since_ts);
1237 }
1238 return vec![self];
1239 }
1240
1241 // Otherwise we need to split the cursor so that each new cursor only yields runs of
1242 // updates that are correctly (time, data)-ordered when advanced by `since_ts`. We achieve
1243 // this by splitting the cursor at each time <= `since_ts`.
1244 let mut splits = Vec::new();
1245 let mut remaining = Some(self);
1246
1247 while let Some(cursor) = remaining.take() {
1248 let (_, time, _) = cursor.get();
1249 if time >= since_ts {
1250 splits.push(cursor);
1251 break;
1252 }
1253
1254 let mut current = cursor.clone();
1255 if let Some((cursor, skipped)) = cursor.skip_time(time) {
1256 remaining = Some(cursor);
1257 current = current.set_limit(skipped).expect("skipped at least 1");
1258 }
1259 current.overwrite_ts = Some(since_ts);
1260 splits.push(current);
1261 }
1262
1263 splits
1264 }
1265
1266 /// Drain the cursor into a [`Chain`].
1267 ///
1268 /// This reuses the underlying chunks if possible, and writes new ones otherwise.
1269 fn into_chain(self) -> Chain<D> {
1270 match self.try_unwrap() {
1271 Ok(chain) => chain,
1272 Err((_, cursor)) => {
1273 let mut builder = ChainBuilder::default();
1274 builder.push_cursor(cursor);
1275 builder.finish()
1276 }
1277 }
1278 }
1279
1280 /// Attempt to unwrap the cursor into a [`Chain`].
1281 ///
1282 /// This operation efficiently reuses chunks by directly inserting them into the output chain
1283 /// where possible.
1284 ///
1285 /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
1286 /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
1287 /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
1288 /// chain by copying chunks rather than reusing them.
1289 fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
1290 if self.limit.is_some() {
1291 return Err(("cursor with limit", self));
1292 }
1293 if self.overwrite_ts.is_some() {
1294 return Err(("cursor with overwrite_ts", self));
1295 }
1296 if self.chunks.iter().any(|c| Rc::strong_count(c) != 1) {
1297 return Err(("cursor on shared chunks", self));
1298 }
1299
1300 let mut builder = ChainBuilder::default();
1301 let mut remaining = Some(self);
1302
1303 // We might be partway through the first chunk, in which case we can't reuse it but need to
1304 // allocate a new one to contain only the updates the cursor can still yield.
1305 while let Some(cursor) = remaining.take() {
1306 if cursor.chunk_offset == 0 {
1307 remaining = Some(cursor);
1308 break;
1309 }
1310 let update = cursor.get();
1311 builder.push_ref(update);
1312 remaining = cursor.step();
1313 }
1314
1315 let mut chain = builder.finish();
1316 if let Some(cursor) = remaining {
1317 for chunk in cursor.chunks {
1318 let chunk = Rc::into_inner(chunk).expect("checked above");
1319 chain.push_chunk(chunk);
1320 }
1321 }
1322
1323 Ok(chain)
1324 }
1325}
1326
1327/// A non-empty chunk of updates, backed by a columnar region.
1328///
1329/// All updates in a chunk are sorted by (time, data) and consolidated.
1330///
1331/// Chunks are immutable once created. They are produced by [`ChunkBuilder`], which mints a
1332/// new chunk whenever its in-progress columnar container reaches a fixed serialized byte
1333/// boundary (~2 MiB, matching the ship granularity used elsewhere in the codebase), so each
1334/// chunk corresponds to a single, predictably sized allocation.
1335struct Chunk<D: Data> {
1336 /// The paged-out form, taken on first materialization.
1337 ///
1338 /// A `Mutex` (not `RefCell`) keeps the chunk `Sync`: cursors hold chunks behind a shared
1339 /// `Rc`, and the iterator returned by [`CorrectionV2::updates_before`] borrows them across
1340 /// the persist writer's `await`, so `&Chunk` must be `Send`. The lock is taken once, at
1341 /// materialization, and is otherwise uncontended (the sink runs single-threaded per worker).
1342 paged: Mutex<Option<PagedColumn<(D, Timestamp, Diff)>>>,
1343 /// The materialized form, populated lazily by [`Chunk::column`] on first access.
1344 ///
1345 /// An `OnceLock` (not `OnceCell`) for the same `Sync` reason. Once set the slot is never
1346 /// cleared, so its address is stable and [`Chunk::index`] can hand out `Ref<'_>` borrows tied
1347 /// to `&self`. The allocation is freed when the chunk drops, which bounds resident memory to
1348 /// the chunks under an active merge front.
1349 resident: OnceLock<Column<(D, Timestamp, Diff)>>,
1350 /// Number of updates, cached so `len` and chain bookkeeping never page the chunk in.
1351 len: usize,
1352 /// Time of the first update, cached so boundary checks (`split_at_time`, `can_accept`) route
1353 /// a resting chunk without materializing it.
1354 first_time: Timestamp,
1355 /// Time of the last update, cached likewise.
1356 last_time: Timestamp,
1357}
1358
1359impl<D: Data> fmt::Debug for Chunk<D> {
1360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1361 write!(f, "Chunk(<{}>)", self.len())
1362 }
1363}
1364
1365impl<D: Data> Chunk<D> {
1366 /// Page the given non-empty column out into a chunk.
1367 ///
1368 /// Reads the cached metadata (length, boundary times) while the column is still resident, then
1369 /// hands it to the global column pager. The policy decides whether it actually spills; either
1370 /// way the chunk is born paged and materializes lazily on first read.
1371 ///
1372 /// # Panics
1373 ///
1374 /// Panics if the column is empty. Chunks are non-empty by construction; [`ChunkBuilder`] only
1375 /// ever builds a chunk from a populated column.
1376 fn from_column(mut data: Column<(D, Timestamp, Diff)>) -> Self {
1377 let (len, first_time, last_time) = {
1378 let borrowed = data.borrow();
1379 let len = borrowed.len();
1380 assert!(len > 0, "chunks are non-empty");
1381 (len, borrowed.get(0).1, borrowed.get(len - 1).1)
1382 };
1383
1384 let paged = column_pager::global_pager().page(&mut data);
1385 Self {
1386 paged: Mutex::new(Some(paged)),
1387 resident: OnceLock::new(),
1388 len,
1389 first_time,
1390 last_time,
1391 }
1392 }
1393
1394 /// Materialize the chunk's column, paging it in on first access.
1395 ///
1396 /// The returned reference is valid for as long as `&self`: the `OnceLock` slot is never
1397 /// cleared once populated, so its contents have a stable address.
1398 fn column(&self) -> &Column<(D, Timestamp, Diff)> {
1399 self.resident.get_or_init(|| {
1400 let paged = self
1401 .paged
1402 .lock()
1403 .expect("pager mutex poisoned")
1404 .take()
1405 .expect("paged form present until materialized");
1406 column_pager::global_pager().take(paged)
1407 })
1408 }
1409
1410 /// Return the number of updates in the chunk.
1411 fn len(&self) -> usize {
1412 self.len
1413 }
1414
1415 /// Return the update at the given index, paging the chunk in if necessary.
1416 ///
1417 /// # Panics
1418 ///
1419 /// Panics if the given index is not populated.
1420 fn index(&self, idx: usize) -> Ref<'_, (D, Timestamp, Diff)> {
1421 self.column().borrow().get(idx)
1422 }
1423
1424 /// Return the first update in the chunk, paging the chunk in if necessary.
1425 fn first(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1426 self.index(0)
1427 }
1428
1429 /// Return the last update in the chunk, paging the chunk in if necessary.
1430 fn last(&self) -> Ref<'_, (D, Timestamp, Diff)> {
1431 self.index(self.len - 1)
1432 }
1433
1434 /// Return the time of the first update, without materializing the chunk.
1435 fn first_time(&self) -> Timestamp {
1436 self.first_time
1437 }
1438
1439 /// Return the time of the last update, without materializing the chunk.
1440 fn last_time(&self) -> Timestamp {
1441 self.last_time
1442 }
1443
1444 /// Return the index of the first update at a time greater than `time`, or `None` if no such
1445 /// update exists.
1446 ///
1447 /// The early-out uses the cached last time, so a chunk whose updates are all at or before
1448 /// `time` is skipped without paging it in.
1449 fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
1450 if self.last_time <= time {
1451 return None;
1452 }
1453
1454 let mut lower = 0;
1455 let mut upper = self.len;
1456 while lower < upper {
1457 let idx = (lower + upper) / 2;
1458 if self.index(idx).1 > time {
1459 upper = idx;
1460 } else {
1461 lower = idx + 1;
1462 }
1463 }
1464
1465 Some(lower)
1466 }
1467
1468 /// Return the size of the chunk, for use in metrics.
1469 ///
1470 /// Reports resident bytes only: a chunk still spilled (on swap or in a pager file) is not part
1471 /// of RSS and contributes nothing, matching the accounting in
1472 /// [`mz_timely_util::columnar::merge_batcher`].
1473 fn get_size(&self) -> SizeMetrics {
1474 let resident = |col: &Column<(D, Timestamp, Diff)>| {
1475 let bytes = col.length_in_bytes();
1476 SizeMetrics {
1477 size: bytes,
1478 capacity: bytes,
1479 allocations: 1,
1480 }
1481 };
1482
1483 if let Some(col) = self.resident.get() {
1484 return resident(col);
1485 }
1486 // Not yet materialized: a policy that kept the column resident still occupies RSS, so
1487 // account for it; a genuinely spilled column does not.
1488 match &*self.paged.lock().expect("pager mutex poisoned") {
1489 Some(PagedColumn::Resident(col, _)) => resident(col),
1490 _ => SizeMetrics::default(),
1491 }
1492 }
1493}
1494
1495/// Builder that produces a stream of fixed-size [`Chunk`]s.
1496///
1497/// Wraps [`mz_timely_util::columnar::builder::ColumnBuilder`], which mints a new
1498/// [`Column::Align`] chunk whenever its in-progress columnar container reaches a fixed
1499/// serialized byte boundary (~2 MiB, matching the ship granularity used elsewhere in the
1500/// codebase). Each minted chunk is therefore a single, predictably-sized aligned allocation.
1501struct ChunkBuilder<D: Data> {
1502 inner: mz_timely_util::columnar::builder::ColumnBuilder<(D, Timestamp, Diff)>,
1503}
1504
1505impl<D: Data> Default for ChunkBuilder<D> {
1506 fn default() -> Self {
1507 Self {
1508 inner: Default::default(),
1509 }
1510 }
1511}
1512
1513impl<D: Data> ChunkBuilder<D> {
1514 /// Push an update into the builder.
1515 ///
1516 /// Accepts whatever the inner [`ColumnBuilder`]'s [`PushInto`] impl accepts — both the
1517 /// `Ref<'_, (D, T, R)>` refs produced by cursors and `&(D, T, R)` references to owned
1518 /// tuples drained from the staging buffer.
1519 ///
1520 /// [`ColumnBuilder`]: mz_timely_util::columnar::builder::ColumnBuilder
1521 /// [`PushInto`]: timely::container::PushInto
1522 #[inline]
1523 fn push<T>(&mut self, item: T)
1524 where
1525 mz_timely_util::columnar::builder::ColumnBuilder<(D, Timestamp, Diff)>:
1526 timely::container::PushInto<T>,
1527 {
1528 timely::container::PushInto::push_into(&mut self.inner, item);
1529 }
1530
1531 /// Pop a finished chunk, if one is available.
1532 fn pop(&mut self) -> Option<Chunk<D>> {
1533 use timely::container::ContainerBuilder;
1534 // `ColumnBuilder::extract` stashes the popped chunk in its `finished` slot so the
1535 // caller can read it through `&mut`; move it out with `mem::take` so we own it
1536 // (leaves `Column::Typed(Default::default())` behind, which the next `extract`
1537 // overwrites).
1538 self.inner
1539 .extract()
1540 .map(|c| Chunk::from_column(std::mem::take(c)))
1541 }
1542
1543 /// Finalize the builder: flush any in-progress updates as a typed chunk and drain pending.
1544 fn finish(mut self) -> impl Iterator<Item = Chunk<D>> {
1545 use timely::container::ContainerBuilder;
1546 // `ColumnBuilder::finish` flushes the in-progress container into the pending queue
1547 // (as `Column::Typed`) and returns the first pending entry. Subsequent calls drain
1548 // the rest until `None`. Translate that into an owning iterator.
1549 //
1550 // `finish` can hand back an empty column (e.g. when the last shipped chunk landed exactly
1551 // on the boundary). Skip those: `Chunk::from_column` requires a non-empty column, and an
1552 // empty chunk would needlessly engage the pager.
1553 std::iter::from_fn(move || {
1554 loop {
1555 let col = std::mem::take(self.inner.finish()?);
1556 if col.borrow().len() > 0 {
1557 return Some(Chunk::from_column(col));
1558 }
1559 }
1560 })
1561 }
1562}
1563
1564/// A buffer for staging updates before they are inserted into the sorted chains.
1565#[derive(Debug)]
1566struct Stage<D> {
1567 /// The contained updates.
1568 ///
1569 /// This vector has a fixed capacity equal to the [`Chunk`] capacity.
1570 data: Vec<(D, Timestamp, Diff)>,
1571 /// Introspection logging.
1572 ///
1573 /// We want to report the number of records in the stage. To do so, we pretend that the stage
1574 /// is a chain, and every time the number of updates inside changes, the chain gets dropped and
1575 /// re-created.
1576 logging: Option<ChannelLogging>,
1577}
1578
1579impl<D: Data> Stage<D> {
1580 fn new(logging: Option<ChannelLogging>, chunk_capacity: usize) -> Self {
1581 // For logging, we pretend the stage consists of a single chain.
1582 if let Some(logging) = &logging {
1583 logging.chain_created(0);
1584 }
1585
1586 Self {
1587 data: Vec::with_capacity(chunk_capacity),
1588 logging,
1589 }
1590 }
1591
1592 /// Insert a batch of updates, possibly producing a batch of sorted, consolidated updates
1593 /// ready to be stored.
1594 fn insert(
1595 &mut self,
1596 updates: &mut Vec<(D, Timestamp, Diff)>,
1597 ) -> Option<Vec<(D, Timestamp, Diff)>> {
1598 if updates.is_empty() {
1599 return None;
1600 }
1601
1602 let prev_length = self.ilen();
1603
1604 // Determine how many chunks we can fill with the available updates.
1605 let update_count = self.data.len() + updates.len();
1606 let chunk_capacity = self.data.capacity();
1607 let chunk_count = update_count / chunk_capacity;
1608
1609 let mut new_updates = updates.drain(..);
1610
1611 // If we have enough shipable updates, collect them and consolidate.
1612 let maybe_ready = if chunk_count > 0 {
1613 let ship_count = chunk_count * chunk_capacity;
1614 let mut buffer = Vec::with_capacity(ship_count);
1615
1616 buffer.append(&mut self.data);
1617 while buffer.len() < ship_count {
1618 let update = new_updates.next().unwrap();
1619 buffer.push(update);
1620 }
1621
1622 consolidate(&mut buffer);
1623
1624 Some(buffer)
1625 } else {
1626 None
1627 };
1628
1629 // Stage the remaining updates.
1630 Extend::extend(&mut self.data, new_updates);
1631
1632 self.log_length_diff(self.ilen() - prev_length);
1633
1634 maybe_ready
1635 }
1636
1637 /// Flush all currently staged updates, returning them sorted and consolidated.
1638 fn flush(&mut self) -> Option<Vec<(D, Timestamp, Diff)>> {
1639 self.log_length_diff(-self.ilen());
1640
1641 consolidate(&mut self.data);
1642
1643 if self.data.is_empty() {
1644 return None;
1645 }
1646
1647 let capacity = self.data.capacity();
1648 let data = std::mem::replace(&mut self.data, Vec::with_capacity(capacity));
1649 Some(data)
1650 }
1651
1652 /// Advance the times of staged updates by the given `since`.
1653 fn advance_times(&mut self, since: &Antichain<Timestamp>) {
1654 let Some(since_ts) = since.as_option() else {
1655 // If the since is the empty frontier, discard all updates.
1656 self.log_length_diff(-self.ilen());
1657 self.data.clear();
1658 return;
1659 };
1660
1661 for (_, time, _) in &mut self.data {
1662 *time = std::cmp::max(*time, *since_ts);
1663 }
1664 }
1665
1666 /// Return the size of the stage, for use in metrics.
1667 ///
1668 /// Note: We don't follow pointers here, so the returned `size` and `capacity` values are
1669 /// under-estimates. That's fine as the stage should always be small.
1670 fn get_size(&self) -> SizeMetrics {
1671 SizeMetrics {
1672 size: self.data.len() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1673 capacity: self.data.capacity() * std::mem::size_of::<(D, Timestamp, Diff)>(),
1674 allocations: 1,
1675 }
1676 }
1677
1678 /// Return the number of updates in the stage, as an `isize`.
1679 fn ilen(&self) -> isize {
1680 self.data.len().try_into().expect("must fit")
1681 }
1682
1683 fn log_length_diff(&self, diff: isize) {
1684 let Some(logging) = &self.logging else { return };
1685 if diff > 0 {
1686 let count = usize::try_from(diff).expect("must fit");
1687 logging.chain_created(count);
1688 logging.chain_dropped(0);
1689 } else if diff < 0 {
1690 let count = usize::try_from(-diff).expect("must fit");
1691 logging.chain_created(0);
1692 logging.chain_dropped(count);
1693 }
1694 }
1695}
1696
1697impl<D> Drop for Stage<D> {
1698 fn drop(&mut self) {
1699 if let Some(logging) = &self.logging {
1700 logging.chain_dropped(self.data.len());
1701 }
1702 }
1703}
1704
1705/// Sort and consolidate the given list of updates.
1706///
1707/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
1708/// except that it sorts updates by (time, data) instead of (data, time).
1709fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
1710 if updates.len() <= 1 {
1711 return;
1712 }
1713
1714 let diff = |update: &(_, _, Diff)| update.2;
1715
1716 updates.sort_unstable_by(|(d1, t1, _), (d2, t2, _)| (t1, d1).cmp(&(t2, d2)));
1717
1718 let mut offset = 0;
1719 let mut accum = diff(&updates[0]);
1720
1721 for idx in 1..updates.len() {
1722 let this = &updates[idx];
1723 let prev = &updates[idx - 1];
1724 if this.0 == prev.0 && this.1 == prev.1 {
1725 accum += diff(&updates[idx]);
1726 } else {
1727 if accum != Diff::ZERO {
1728 updates.swap(offset, idx - 1);
1729 updates[offset].2 = accum;
1730 offset += 1;
1731 }
1732 accum = diff(&updates[idx]);
1733 }
1734 }
1735
1736 if accum != Diff::ZERO {
1737 let len = updates.len();
1738 updates.swap(offset, len - 1);
1739 updates[offset].2 = accum;
1740 offset += 1;
1741 }
1742
1743 updates.truncate(offset);
1744}
1745
1746/// Compare two columnar refs that have unrelated input lifetimes.
1747///
1748/// `<D::Container as Borrow>::Ref<'a>` is an associated-type projection through a trait, so
1749/// the compiler treats it as invariant in `'a` and won't auto-shorten the inputs by variance.
1750/// We instead explicitly reborrow both to a fresh, local lifetime `'x` via
1751/// [`Columnar::reborrow`] before letting the inner `==` pick up the `for<'a> Ref<'a>: Eq`
1752/// bound on [`Data`].
1753#[inline]
1754fn refs_eq<D: Data>(a: Ref<'_, D>, b: Ref<'_, D>) -> bool {
1755 #[inline]
1756 fn eq<'x, D: Data>(a: Ref<'x, D>, b: Ref<'x, D>) -> bool {
1757 a == b
1758 }
1759 eq::<D>(D::reborrow(a), D::reborrow(b))
1760}
1761
1762/// A binary heap specialized for merging [`Cursor`]s.
1763struct MergeHeap<D: Data>(BinaryHeap<MergeCursor<D>>);
1764
1765impl<D: Data> FromIterator<Cursor<D>> for MergeHeap<D> {
1766 fn from_iter<I: IntoIterator<Item = Cursor<D>>>(cursors: I) -> Self {
1767 let inner = cursors.into_iter().map(MergeCursor).collect();
1768 Self(inner)
1769 }
1770}
1771
1772impl<D: Data> MergeHeap<D> {
1773 /// Pop the next cursor (the one yielding the least update) from the heap.
1774 fn pop(&mut self) -> Option<Cursor<D>> {
1775 self.0.pop().map(|MergeCursor(c)| c)
1776 }
1777
1778 /// Pop the next cursor from the heap, provided the data and time of its current update are
1779 /// equal to the given values.
1780 ///
1781 /// Returns both the cursor and the diff corresponding to `data` and `time`.
1782 fn pop_equal(&mut self, data: Ref<'_, D>, time: Timestamp) -> Option<(Cursor<D>, Diff)> {
1783 let r = {
1784 let MergeCursor(cursor) = self.0.peek()?;
1785 let (d, t, r) = cursor.get();
1786 if t != time || !refs_eq::<D>(d, data) {
1787 return None;
1788 }
1789 r
1790 };
1791 let cursor = self.pop().expect("checked above");
1792 Some((cursor, r))
1793 }
1794
1795 /// Push a cursor onto the heap.
1796 fn push(&mut self, cursor: Cursor<D>) {
1797 self.0.push(MergeCursor(cursor));
1798 }
1799}
1800
1801/// A wrapper for [`Cursor`]s on a [`MergeHeap`].
1802///
1803/// Implements the cursor ordering required for merging cursors.
1804struct MergeCursor<D: Data>(Cursor<D>);
1805
1806impl<D: Data> PartialEq for MergeCursor<D> {
1807 fn eq(&self, other: &Self) -> bool {
1808 self.cmp(other).is_eq()
1809 }
1810}
1811
1812impl<D: Data> Eq for MergeCursor<D> {}
1813
1814impl<D: Data> PartialOrd for MergeCursor<D> {
1815 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1816 Some(self.cmp(other))
1817 }
1818}
1819
1820impl<D: Data> Ord for MergeCursor<D> {
1821 fn cmp(&self, other: &Self) -> Ordering {
1822 let (d1, t1, _) = self.0.get();
1823 let (d2, t2, _) = other.0.get();
1824 (t1, d1).cmp(&(t2, d2)).reverse()
1825 }
1826}
1827
1828#[cfg(test)]
1829mod tests {
1830 use mz_ore::metrics::MetricsRegistry;
1831 use mz_persist_client::cfg::PersistConfig;
1832 use mz_persist_client::metrics::Metrics;
1833 use mz_repr::{Diff, Timestamp};
1834
1835 use super::*;
1836 use crate::sink::correction::CorrectionV1;
1837
1838 #[mz_ore::test]
1839 fn chain_builder_update_count_matches_items() {
1840 let mut builder = ChainBuilder::<i64>::default();
1841 for i in 0..10_u64 {
1842 let d = i64::try_from(i).expect("fits");
1843 builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
1844 }
1845 let chain = builder.finish();
1846 assert_eq!(chain.update_count, chain.iter().count());
1847 }
1848
1849 /// Push enough updates to cross at least one `mint()` boundary, forcing the
1850 /// `Align` encode -> `from_bytes` decode roundtrip (the spilling path this data
1851 /// structure exists to support), and assert `iter()` roundtrips values, order,
1852 /// and diffs across the spill boundary.
1853 #[mz_ore::test]
1854 #[cfg_attr(miri, ignore)] // too slow: crossing the ~2 MiB mint boundary needs ~200k updates
1855 fn chain_builder_roundtrips_across_mint_boundary() {
1856 // A single `mint()` fires near the ~2 MiB (`SHIP_WORDS`) serialized boundary. With
1857 // three 8-byte columns per update that's tens of thousands of updates; pushing 200k
1858 // comfortably forces multiple mints.
1859 let count = 200_000_u64;
1860
1861 let mut builder = ChainBuilder::<i64>::default();
1862 for i in 0..count {
1863 let d = i64::try_from(i).expect("fits");
1864 builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
1865 }
1866 let chain = builder.finish();
1867
1868 // Crossing the mint boundary must have produced more than one chunk; otherwise the spill
1869 // path (each minted chunk is paged out and read back through the pager) wouldn't be
1870 // exercised. The chunk payload itself is now behind the pager (see [`Chunk`]), so we
1871 // assert on chunk count rather than inspecting the column variant directly.
1872 assert!(
1873 chain.chunks.len() > 1,
1874 "expected multiple minted chunks, got {} chunk(s): {:?}",
1875 chain.chunks.len(),
1876 chain.chunks,
1877 );
1878
1879 // `iter()` must roundtrip every update, in order, with correct diffs.
1880 assert_eq!(chain.update_count, usize::try_from(count).expect("fits"));
1881 let mut expected = 0_u64;
1882 for (d, t, r) in chain.iter() {
1883 assert_eq!(d, i64::try_from(expected).expect("fits"));
1884 assert_eq!(t, Timestamp::new(expected));
1885 assert_eq!(r, Diff::ONE);
1886 expected += 1;
1887 }
1888 assert_eq!(expected, count);
1889 }
1890
1891 fn sink_metrics() -> SinkMetrics {
1892 let registry = MetricsRegistry::new();
1893 let metrics = Metrics::new(&PersistConfig::new_for_tests(), ®istry);
1894 metrics.sink.clone()
1895 }
1896
1897 /// Run the same stepwise-drain workload through `CorrectionV1` and `CorrectionV2` and assert
1898 /// that they emit the same updates at every step.
1899 ///
1900 /// Models the `write_batches` operator catching up through many distinct timestamps: the
1901 /// desired input runs ahead, batches are written one timestamp at a time, and written updates
1902 /// come back negated through the persist feedback.
1903 #[mz_ore::test]
1904 // Columnation regions are not Stacked Borrows compliant: later pushes invalidate the
1905 // provenance of previously stored items under Miri.
1906 #[cfg_attr(miri, ignore)]
1907 fn equivalence_with_v1() {
1908 let sink_metrics = sink_metrics();
1909
1910 let mut v1 =
1911 CorrectionV1::<String>::new(sink_metrics.clone(), sink_metrics.for_worker(0), 1);
1912 let mut v2 = CorrectionV2::<String>::new(
1913 sink_metrics.clone(),
1914 sink_metrics.for_worker(0),
1915 None,
1916 3.0,
1917 8 * 1024,
1918 );
1919
1920 let num_ts = 50;
1921 let keys = 4;
1922
1923 // Upsert-style input: every timestamp updates each key, retracting the previous value.
1924 let batch = |t: u64| -> Vec<(String, Timestamp, Diff)> {
1925 (0..keys)
1926 .flat_map(|k| {
1927 let addition = (format!("{k}-{t}"), Timestamp::from(t), Diff::ONE);
1928 let retraction = t
1929 .checked_sub(1)
1930 .map(|p| (format!("{k}-{p}"), Timestamp::from(t), -Diff::ONE));
1931 std::iter::once(addition).chain(retraction)
1932 })
1933 .collect()
1934 };
1935
1936 // Pre-fill both with all batches, like a catch-up where the input runs ahead.
1937 for t in 0..num_ts {
1938 v1.insert(&mut batch(t));
1939 v2.insert(&mut batch(t));
1940 }
1941
1942 // Drain stepwise, with persist feedback, comparing emissions.
1943 for t in 0..num_ts {
1944 let upper = Antichain::from_elem(Timestamp::from(t + 1));
1945
1946 let mut out1: Vec<_> = v1.updates_before(&upper).collect();
1947 let mut out2: Vec<_> = v2.updates_before(&upper).collect();
1948 out1.sort();
1949 out2.sort();
1950 assert_eq!(out1, out2, "diverged at t={t}");
1951
1952 v1.insert_negated(&mut out1.clone());
1953 v2.insert_negated(&mut out2);
1954 v1.advance_since(upper.clone());
1955 v2.advance_since(upper);
1956 }
1957
1958 // Compare the final state at the since.
1959 let upper = Antichain::from_elem(Timestamp::from(num_ts + 1));
1960 v1.consolidate_at_since();
1961 v2.consolidate_at_since();
1962 let mut out1: Vec<_> = v1.updates_before(&upper).collect();
1963 let mut out2: Vec<_> = v2.updates_before(&upper).collect();
1964 out1.sort();
1965 out2.sort();
1966 assert_eq!(out1, out2);
1967 }
1968
1969 /// A since jump across many distinct buffered timestamps must collapse them onto the since.
1970 #[mz_ore::test]
1971 // Columnation regions are not Stacked Borrows compliant: later pushes invalidate the
1972 // provenance of previously stored items under Miri.
1973 #[cfg_attr(miri, ignore)]
1974 fn since_jump() {
1975 let sink_metrics = sink_metrics();
1976 let mut v2 = CorrectionV2::<String>::new(
1977 sink_metrics.clone(),
1978 sink_metrics.for_worker(0),
1979 None,
1980 3.0,
1981 8 * 1024,
1982 );
1983
1984 let num_ts = 100;
1985 for t in 0..num_ts {
1986 v2.insert(&mut vec![
1987 (format!("a-{t}"), Timestamp::from(t), Diff::ONE),
1988 (format!("a-{t}"), Timestamp::from(t), -Diff::ONE),
1989 (format!("b-{t}"), Timestamp::from(t), Diff::ONE),
1990 ]);
1991 }
1992
1993 v2.advance_since(Antichain::from_elem(Timestamp::from(num_ts)));
1994 v2.consolidate_at_since();
1995
1996 let upper = Antichain::from_elem(Timestamp::from(num_ts + 1));
1997 let out: Vec<_> = v2.updates_before(&upper).collect();
1998 assert_eq!(out.len(), usize::try_from(num_ts).unwrap());
1999 assert!(
2000 out.iter()
2001 .all(|(_, t, r)| *t == Timestamp::from(num_ts) && *r == Diff::ONE)
2002 );
2003 }
2004
2005 /// Reads must not observe updates at or beyond their `upper`, even when the `upper` is not
2006 /// beyond the `since`.
2007 #[mz_ore::test]
2008 // Columnation regions are not Stacked Borrows compliant: later pushes invalidate the
2009 // provenance of previously stored items under Miri.
2010 #[cfg_attr(miri, ignore)]
2011 fn upper_not_beyond_since() {
2012 let sink_metrics = sink_metrics();
2013 let mut v2 = CorrectionV2::<String>::new(
2014 sink_metrics.clone(),
2015 sink_metrics.for_worker(0),
2016 None,
2017 3.0,
2018 8 * 1024,
2019 );
2020
2021 v2.insert(&mut vec![(
2022 "a".to_owned(),
2023 Timestamp::from(5_u64),
2024 Diff::ONE,
2025 )]);
2026 v2.advance_since(Antichain::from_elem(Timestamp::from(10_u64)));
2027
2028 // The update logically lives at time 10 now, so a read before 7 must be empty.
2029 let upper = Antichain::from_elem(Timestamp::from(7_u64));
2030 assert_eq!(v2.updates_before(&upper).count(), 0);
2031
2032 // A read before 11 must emit it, advanced to the since.
2033 let upper = Antichain::from_elem(Timestamp::from(11_u64));
2034 let out: Vec<_> = v2.updates_before(&upper).collect();
2035 assert_eq!(
2036 out,
2037 vec![("a".to_owned(), Timestamp::from(10_u64), Diff::ONE)]
2038 );
2039 }
2040
2041 /// A [`PagingPolicy`] that always spills to the swap backend, uncompressed.
2042 ///
2043 /// The default global pager keeps every chunk resident; installing this drives the actual
2044 /// spill path so the tests exercise [`Chunk::column`]'s page-in through [`mz_ore::pager`].
2045 ///
2046 /// [`PagingPolicy`]: column_pager::PagingPolicy
2047 struct ForceSwap;
2048
2049 impl column_pager::PagingPolicy for ForceSwap {
2050 fn decide(&self, _hint: column_pager::PageHint) -> column_pager::PageDecision {
2051 column_pager::PageDecision::Page {
2052 backend: mz_ore::pager::Backend::Swap,
2053 codec: None,
2054 }
2055 }
2056 fn record(&self, _event: column_pager::PageEvent) {}
2057 }
2058
2059 /// Install a global pager that spills every chunk to swap for the duration of `f`, then
2060 /// restore the default (disabled) pager. The global pager is process-wide; concurrent tests
2061 /// only ever observe a correct round-trip regardless of backend, so racing on it is benign.
2062 fn with_swap_pager<R>(f: impl FnOnce() -> R) -> R {
2063 use std::sync::Arc;
2064 column_pager::set_global_pager(column_pager::ColumnPager::new(Arc::new(ForceSwap)));
2065 let result = f();
2066 column_pager::set_global_pager(column_pager::ColumnPager::disabled());
2067 result
2068 }
2069
2070 /// Build a chain crossing the mint boundary while every chunk is spilled to swap, then assert
2071 /// `iter()` (the read path behind `updates_before`) pages each chunk back in and roundtrips
2072 /// values, order, and diffs.
2073 #[mz_ore::test]
2074 #[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri
2075 fn iter_roundtrips_through_swap_backend() {
2076 let count = 200_000_u64;
2077 with_swap_pager(|| {
2078 let mut builder = ChainBuilder::<i64>::default();
2079 for i in 0..count {
2080 let d = i64::try_from(i).expect("fits");
2081 builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
2082 }
2083 let chain = builder.finish();
2084 assert!(chain.chunks.len() > 1, "expected multiple minted chunks");
2085 assert_eq!(chain.update_count, usize::try_from(count).expect("fits"));
2086
2087 let mut expected = 0_u64;
2088 for (d, t, r) in chain.iter() {
2089 assert_eq!(d, i64::try_from(expected).expect("fits"));
2090 assert_eq!(t, Timestamp::new(expected));
2091 assert_eq!(r, Diff::ONE);
2092 expected += 1;
2093 }
2094 assert_eq!(expected, count);
2095 });
2096 }
2097
2098 /// Drive a [`Cursor`] over a spilled, multi-chunk chain to completion (the access pattern
2099 /// merges use). Each step pages the front chunk back in via [`Chunk::column`]; assert the
2100 /// cursor yields every update in order.
2101 #[mz_ore::test]
2102 #[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri
2103 fn cursor_steps_through_swap_backend() {
2104 let count = 200_000_u64;
2105 with_swap_pager(|| {
2106 let mut builder = ChainBuilder::<i64>::default();
2107 for i in 0..count {
2108 let d = i64::try_from(i).expect("fits");
2109 builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
2110 }
2111 let chain = builder.finish();
2112 assert!(chain.chunks.len() > 1, "expected multiple minted chunks");
2113
2114 let mut rest = chain.into_cursor();
2115 let mut expected = 0_u64;
2116 while let Some(cursor) = rest.take() {
2117 let (d, t, r) = cursor.get();
2118 assert_eq!(i64::into_owned(d), i64::try_from(expected).expect("fits"));
2119 assert_eq!(t, Timestamp::new(expected));
2120 assert_eq!(r, Diff::ONE);
2121 expected += 1;
2122 rest = cursor.step();
2123 }
2124 assert_eq!(expected, count);
2125 });
2126 }
2127}