Skip to main content

mz_compute/sink/
correction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The `Correction` data structure used by `persist_sink::write_batches` to stash updates before
11//! they are written into batches.
12
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17use std::rc::Rc;
18
19use differential_dataflow::consolidation::{consolidate, consolidate_updates};
20use differential_dataflow::logging::{BatchEvent, DropEvent};
21use itertools::Itertools;
22use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
23use mz_dyncfg::ConfigSet;
24use mz_ore::iter::IteratorExt;
25use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
26use mz_repr::{Diff, Timestamp};
27use timely::PartialOrder;
28use timely::progress::Antichain;
29
30use crate::logging::compute::{
31    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
32    ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
33    Logger as ComputeLogger,
34};
35use crate::sink::correction_v2::{CorrectionV2, Data};
36
37/// A data structure suitable for storing updates in a self-correcting persist sink.
38///
39/// Selects one of two correction buffer implementations. `V1` is the original simple
40/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
41/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
42/// `V1` in a pinch. The plan is to remove `V1` eventually.
43pub(super) enum Correction<D: Data> {
44    V1(CorrectionV1<D>),
45    V2(CorrectionV2<D>),
46}
47
48impl<D: Data> Correction<D> {
49    /// Construct a new `Correction` instance.
50    pub fn new(
51        metrics: SinkMetrics,
52        worker_metrics: SinkWorkerMetrics,
53        logging: Option<Logging>,
54        config: &ConfigSet,
55    ) -> Self {
56        if ENABLE_CORRECTION_V2.get(config) {
57            Self::V2(CorrectionV2::new(metrics, worker_metrics, logging))
58        } else {
59            let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
60            Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
61        }
62    }
63
64    /// Insert a batch of updates.
65    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
66        match self {
67            Self::V1(c) => c.insert(updates),
68            Self::V2(c) => c.insert(updates),
69        }
70    }
71
72    /// Insert a batch of updates, after negating their diffs.
73    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
74        match self {
75            Self::V1(c) => c.insert_negated(updates),
76            Self::V2(c) => c.insert_negated(updates),
77        }
78    }
79
80    /// Consolidate and return updates before the given `upper`.
81    pub fn updates_before(
82        &mut self,
83        upper: &Antichain<Timestamp>,
84    ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
85        match self {
86            Self::V1(c) => Box::new(c.updates_before(upper)),
87            Self::V2(c) => Box::new(c.updates_before(upper)),
88        }
89    }
90
91    /// Advance the since frontier.
92    ///
93    /// # Panics
94    ///
95    /// Panics if the given `since` is less than the current since frontier.
96    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
97        match self {
98            Self::V1(c) => c.advance_since(since),
99            Self::V2(c) => c.advance_since(since),
100        }
101    }
102
103    /// Consolidate all updates at the current `since`.
104    pub fn consolidate_at_since(&mut self) {
105        match self {
106            Self::V1(c) => c.consolidate_at_since(),
107            Self::V2(c) => c.consolidate_at_since(),
108        }
109    }
110}
111
112/// A collection holding `persist_sink` updates.
113///
114/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
115/// operator:
116///
117///  * It stores updates by time, to enable efficient separation between updates that should
118///    be written to a batch and updates whose time has not yet arrived.
119///  * It eschews an interface for directly removing previously inserted updates. Instead, updates
120///    are removed by inserting them again, with negated diffs. Stored updates are continuously
121///    consolidated to give them opportunity to cancel each other out.
122///  * It provides an interface for advancing all contained updates to a given frontier.
123pub(super) struct CorrectionV1<D> {
124    /// Stashed updates by time.
125    updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
126    /// Frontier to which all update times are advanced.
127    since: Antichain<Timestamp>,
128
129    /// Total length and capacity of vectors in `updates`.
130    ///
131    /// Tracked to maintain metrics.
132    total_size: LengthAndCapacity,
133    /// Global persist sink metrics.
134    metrics: SinkMetrics,
135    /// Per-worker persist sink metrics.
136    worker_metrics: SinkWorkerMetrics,
137    /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
138    growth_dampener: usize,
139}
140
141impl<D> CorrectionV1<D> {
142    /// Construct a new `CorrectionV1` instance.
143    pub fn new(
144        metrics: SinkMetrics,
145        worker_metrics: SinkWorkerMetrics,
146        growth_dampener: usize,
147    ) -> Self {
148        Self {
149            updates: Default::default(),
150            since: Antichain::from_elem(Timestamp::MIN),
151            total_size: Default::default(),
152            metrics,
153            worker_metrics,
154            growth_dampener,
155        }
156    }
157
158    /// Update persist sink metrics to the given new length and capacity.
159    fn update_metrics(&mut self, new_size: LengthAndCapacity) {
160        let old_size = self.total_size;
161        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
162        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
163        self.metrics
164            .report_correction_update_deltas(len_delta, cap_delta);
165        self.worker_metrics
166            .report_correction_update_totals(new_size.length, new_size.capacity);
167
168        self.total_size = new_size;
169    }
170}
171
172impl<D: Data> CorrectionV1<D> {
173    /// Insert a batch of updates.
174    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
175        let Some(since_ts) = self.since.as_option() else {
176            // If the since frontier is empty, discard all updates.
177            updates.clear();
178            return;
179        };
180
181        for (_, time, _) in &mut *updates {
182            *time = std::cmp::max(*time, *since_ts);
183        }
184        self.insert_inner(updates);
185    }
186
187    /// Insert a batch of updates, after negating their diffs.
188    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
189        let Some(since_ts) = self.since.as_option() else {
190            // If the since frontier is empty, discard all updates.
191            updates.clear();
192            return;
193        };
194
195        for (_, time, diff) in &mut *updates {
196            *time = std::cmp::max(*time, *since_ts);
197            *diff = -*diff;
198        }
199        self.insert_inner(updates);
200    }
201
202    /// Insert a batch of updates.
203    ///
204    /// The given `updates` must all have been advanced by `self.since`.
205    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
206        consolidate_updates(updates);
207        updates.sort_unstable_by_key(|(_, time, _)| *time);
208
209        let mut new_size = self.total_size;
210        let mut updates = updates.drain(..).peekable();
211        while let Some(&(_, time, _)) = updates.peek() {
212            debug_assert!(
213                self.since.less_equal(&time),
214                "update not advanced by `since`"
215            );
216
217            let data = updates
218                .peeking_take_while(|(_, t, _)| *t == time)
219                .map(|(d, _, r)| (d, r));
220
221            use std::collections::btree_map::Entry;
222            match self.updates.entry(time) {
223                Entry::Vacant(entry) => {
224                    let mut vec: ConsolidatingVec<_> = data.collect();
225                    vec.growth_dampener = self.growth_dampener;
226                    new_size += (vec.len(), vec.capacity());
227                    entry.insert(vec);
228                }
229                Entry::Occupied(mut entry) => {
230                    let vec = entry.get_mut();
231                    new_size -= (vec.len(), vec.capacity());
232                    vec.extend(data);
233                    new_size += (vec.len(), vec.capacity());
234                }
235            }
236        }
237
238        self.update_metrics(new_size);
239    }
240
241    /// Consolidate and return updates within the given bounds.
242    ///
243    /// # Panics
244    ///
245    /// Panics if `lower` is not less than or equal to `upper`.
246    pub fn updates_within<'a>(
247        &'a mut self,
248        lower: &Antichain<Timestamp>,
249        upper: &Antichain<Timestamp>,
250    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
251        assert!(PartialOrder::less_equal(lower, upper));
252
253        let start = match lower.as_option() {
254            Some(ts) => Bound::Included(*ts),
255            None => Bound::Excluded(Timestamp::MAX),
256        };
257        let end = match upper.as_option() {
258            Some(ts) => Bound::Excluded(*ts),
259            None => Bound::Unbounded,
260        };
261
262        let update_count = self.consolidate((start, end));
263
264        let range = self.updates.range((start, end));
265        range
266            .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
267            .exact_size(update_count)
268    }
269
270    /// Consolidate and return updates before the given `upper`.
271    pub fn updates_before<'a>(
272        &'a mut self,
273        upper: &Antichain<Timestamp>,
274    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
275        let lower = Antichain::from_elem(Timestamp::MIN);
276        self.updates_within(&lower, upper)
277    }
278
279    /// Consolidate the updates at the times in the given range.
280    ///
281    /// Returns the number of updates remaining in the range afterwards.
282    fn consolidate<R>(&mut self, range: R) -> usize
283    where
284        R: RangeBounds<Timestamp>,
285    {
286        let mut new_size = self.total_size;
287
288        let updates = self.updates.range_mut(range);
289        let count = updates.fold(0, |acc, (_, data)| {
290            new_size -= (data.len(), data.capacity());
291            data.consolidate();
292            new_size += (data.len(), data.capacity());
293            acc + data.len()
294        });
295
296        self.update_metrics(new_size);
297        count
298    }
299
300    /// Advance the since frontier.
301    ///
302    /// # Panics
303    ///
304    /// Panics if the given `since` is less than the current since frontier.
305    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
306        assert!(PartialOrder::less_equal(&self.since, &since));
307
308        if since != self.since {
309            self.advance_by(&since);
310            self.since = since;
311        }
312    }
313
314    /// Advance all contained updates by the given frontier.
315    ///
316    /// If the given frontier is empty, all remaining updates are discarded.
317    pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
318        let Some(target_ts) = frontier.as_option() else {
319            self.updates.clear();
320            self.update_metrics(Default::default());
321            return;
322        };
323
324        let mut new_size = self.total_size;
325        while let Some((ts, data)) = self.updates.pop_first() {
326            if frontier.less_equal(&ts) {
327                // We have advanced all updates that can advance.
328                self.updates.insert(ts, data);
329                break;
330            }
331
332            use std::collections::btree_map::Entry;
333            match self.updates.entry(*target_ts) {
334                Entry::Vacant(entry) => {
335                    entry.insert(data);
336                }
337                Entry::Occupied(mut entry) => {
338                    let vec = entry.get_mut();
339                    new_size -= (data.len(), data.capacity());
340                    new_size -= (vec.len(), vec.capacity());
341                    vec.extend(data);
342                    new_size += (vec.len(), vec.capacity());
343                }
344            }
345        }
346
347        self.update_metrics(new_size);
348    }
349
350    /// Consolidate all updates at the current `since`.
351    pub fn consolidate_at_since(&mut self) {
352        let Some(since_ts) = self.since.as_option() else {
353            return;
354        };
355
356        let start = Bound::Included(*since_ts);
357        let end = match since_ts.try_step_forward() {
358            Some(ts) => Bound::Excluded(ts),
359            None => Bound::Unbounded,
360        };
361
362        self.consolidate((start, end));
363    }
364}
365
366impl<D> Drop for CorrectionV1<D> {
367    fn drop(&mut self) {
368        self.update_metrics(Default::default());
369    }
370}
371
372/// Helper type for convenient tracking of length and capacity together.
373#[derive(Clone, Copy, Debug, Default)]
374pub(super) struct LengthAndCapacity {
375    pub length: usize,
376    pub capacity: usize,
377}
378
379impl AddAssign<Self> for LengthAndCapacity {
380    fn add_assign(&mut self, size: Self) {
381        self.length += size.length;
382        self.capacity += size.capacity;
383    }
384}
385
386impl AddAssign<(usize, usize)> for LengthAndCapacity {
387    fn add_assign(&mut self, (len, cap): (usize, usize)) {
388        self.length += len;
389        self.capacity += cap;
390    }
391}
392
393impl SubAssign<(usize, usize)> for LengthAndCapacity {
394    fn sub_assign(&mut self, (len, cap): (usize, usize)) {
395        self.length -= len;
396        self.capacity -= cap;
397    }
398}
399
400/// A vector that consolidates its contents.
401///
402/// The vector is filled with updates until it reaches capacity. At this point, the updates are
403/// consolidated to free up space. This process repeats until the consolidation recovered less than
404/// half of the vector's capacity, at which point the capacity is doubled.
405#[derive(Debug)]
406pub(crate) struct ConsolidatingVec<D> {
407    data: Vec<(D, Diff)>,
408    /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
409    /// might start smaller than this.
410    min_capacity: usize,
411    /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
412    ///
413    /// If consolidation didn't free enough space, at least a linear amount, increase the capacity
414    /// Setting this to 0 results in doubling whenever the list is at least half full.
415    /// Larger numbers result in more conservative approaches that use more CPU, but less memory.
416    growth_dampener: usize,
417}
418
419impl<D: Ord> ConsolidatingVec<D> {
420    /// Creates a new instance from the necessary configuration arguments.
421    pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
422        ConsolidatingVec {
423            data: Vec::new(),
424            min_capacity,
425            growth_dampener,
426        }
427    }
428
429    /// Return the length of the vector.
430    pub fn len(&self) -> usize {
431        self.data.len()
432    }
433
434    /// Return the capacity of the vector.
435    pub fn capacity(&self) -> usize {
436        self.data.capacity()
437    }
438
439    /// Pushes `item` into the vector.
440    ///
441    /// If the vector does not have sufficient capacity, we'll first consolidate and then increase
442    /// its capacity if the consolidated results still occupy a significant fraction of the vector.
443    ///
444    /// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
445    /// but amortizes to O(log n).
446    pub fn push(&mut self, item: (D, Diff)) {
447        let capacity = self.data.capacity();
448        if self.data.len() == capacity {
449            // The vector is full. First, consolidate to try to recover some space.
450            self.consolidate();
451
452            // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
453            // This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
454            let length = self.data.len();
455            let dampener = self.growth_dampener;
456            if capacity < length + length / (dampener + 1) {
457                // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
458                // determining the target capacity, and then reserving an amount that achieves this
459                // while working around the existing length.
460                let new_cap = capacity + capacity / (dampener + 1);
461                self.data.reserve_exact(new_cap - length);
462            }
463        }
464
465        self.data.push(item);
466    }
467
468    /// Consolidate the contents.
469    pub fn consolidate(&mut self) {
470        consolidate(&mut self.data);
471
472        // We may have the opportunity to reclaim allocated memory.
473        // Given that `push` will at most double the capacity when the vector is more than half full, and
474        // we want to avoid entering into a resizing cycle, we choose to only shrink if the
475        // vector's length is less than one fourth of its capacity.
476        if self.data.len() < self.data.capacity() / 4 {
477            self.data.shrink_to(self.min_capacity);
478        }
479    }
480
481    /// Return an iterator over the borrowed items.
482    pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
483        self.data.iter()
484    }
485
486    /// Returns mutable access to the underlying items.
487    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
488        self.data.iter_mut()
489    }
490}
491
492impl<D> IntoIterator for ConsolidatingVec<D> {
493    type Item = (D, Diff);
494    type IntoIter = std::vec::IntoIter<(D, Diff)>;
495
496    fn into_iter(self) -> Self::IntoIter {
497        self.data.into_iter()
498    }
499}
500
501impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
502    fn from_iter<I>(iter: I) -> Self
503    where
504        I: IntoIterator<Item = (D, Diff)>,
505    {
506        Self {
507            data: Vec::from_iter(iter),
508            min_capacity: 0,
509            growth_dampener: 0,
510        }
511    }
512}
513
514impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
515    fn extend<I>(&mut self, iter: I)
516    where
517        I: IntoIterator<Item = (D, Diff)>,
518    {
519        for item in iter {
520            self.push(item);
521        }
522    }
523}
524
525/// Helper type for convenient tracking of various size metrics together.
526#[derive(Clone, Copy, Debug, Default)]
527pub(super) struct SizeMetrics {
528    pub size: usize,
529    pub capacity: usize,
530    pub allocations: usize,
531}
532
533impl AddAssign<Self> for SizeMetrics {
534    fn add_assign(&mut self, other: Self) {
535        self.size += other.size;
536        self.capacity += other.capacity;
537        self.allocations += other.allocations;
538    }
539}
540
541/// Helper for correction buffer logging.
542///
543// TODO: Correction buffer logging currently reuses the arrangement batch and size logging. This
544// isn't strictly correct as a correction buffer is not an arrangement. Consider refactoring this
545// to be about "operator sizes" instead.
546#[derive(Clone, Debug)]
547pub(super) struct Logging(Rc<RefCell<LoggingInner>>);
548
549impl Logging {
550    pub fn new(
551        compute_logger: ComputeLogger,
552        differential_logger: differential_dataflow::logging::Logger,
553        operator_id: usize,
554        address: Vec<usize>,
555    ) -> Self {
556        let inner = LoggingInner::new(compute_logger, differential_logger, operator_id, address);
557        Self(Rc::new(RefCell::new(inner)))
558    }
559
560    /// A new chain was created.
561    pub fn chain_created(&self, updates: usize) {
562        self.0.borrow_mut().chain_created(updates);
563    }
564
565    /// A chain was dropped.
566    pub fn chain_dropped(&self, updates: usize) {
567        self.0.borrow_mut().chain_dropped(updates);
568    }
569
570    /// Report a heap size diff.
571    pub fn report_size_diff(&self, diff: isize) {
572        self.0.borrow_mut().report_size_diff(diff);
573    }
574
575    /// Report a heap capacity diff.
576    pub fn report_capacity_diff(&self, diff: isize) {
577        self.0.borrow_mut().report_capacity_diff(diff);
578    }
579
580    /// Report a heap allocations diff.
581    pub fn report_allocations_diff(&self, diff: isize) {
582        self.0.borrow_mut().report_allocations_diff(diff);
583    }
584}
585
586/// Inner state for correction buffer logging.
587///
588/// This is separate from `Logging` and shared via an `Rc`, to ensure we only emit
589/// `ArrangementHeapSizeOperator{Drop}` events once, even though we pass the logger to both the Ok
590/// and the Err correction buffer.
591struct LoggingInner {
592    compute_logger: ComputeLogger,
593    differential_logger: differential_dataflow::logging::Logger,
594    operator_id: usize,
595}
596
597impl fmt::Debug for LoggingInner {
598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599        f.debug_struct("LoggingInner")
600            .field("operator_id", &self.operator_id)
601            .finish_non_exhaustive()
602    }
603}
604
605impl LoggingInner {
606    fn new(
607        compute_logger: ComputeLogger,
608        differential_logger: differential_dataflow::logging::Logger,
609        operator_id: usize,
610        address: Vec<usize>,
611    ) -> Self {
612        compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
613            ArrangementHeapSizeOperator {
614                operator_id,
615                address,
616            },
617        ));
618
619        Self {
620            compute_logger,
621            differential_logger,
622            operator_id,
623        }
624    }
625
626    fn chain_created(&self, updates: usize) {
627        self.differential_logger.log(BatchEvent {
628            operator: self.operator_id,
629            length: updates,
630        });
631    }
632
633    fn chain_dropped(&self, updates: usize) {
634        self.differential_logger.log(DropEvent {
635            operator: self.operator_id,
636            length: updates,
637        });
638    }
639
640    fn report_size_diff(&self, diff: isize) {
641        if diff != 0 {
642            self.compute_logger
643                .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
644                    operator_id: self.operator_id,
645                    delta_size: diff,
646                }));
647        }
648    }
649
650    fn report_capacity_diff(&self, diff: isize) {
651        if diff != 0 {
652            self.compute_logger
653                .log(&ComputeEvent::ArrangementHeapCapacity(
654                    ArrangementHeapCapacity {
655                        operator_id: self.operator_id,
656                        delta_capacity: diff,
657                    },
658                ));
659        }
660    }
661
662    fn report_allocations_diff(&self, diff: isize) {
663        if diff != 0 {
664            self.compute_logger
665                .log(&ComputeEvent::ArrangementHeapAllocations(
666                    ArrangementHeapAllocations {
667                        operator_id: self.operator_id,
668                        delta_allocations: diff,
669                    },
670                ));
671        }
672    }
673}
674
675impl Drop for LoggingInner {
676    fn drop(&mut self) {
677        self.compute_logger
678            .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
679                ArrangementHeapSizeOperatorDrop {
680                    operator_id: self.operator_id,
681                },
682            ));
683    }
684}