Skip to main content

mz_compute/sink/
correction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The `Correction` data structure used by `persist_sink::write_batches` to stash updates before
11//! they are written into batches.
12
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17use std::rc::Rc;
18
19use differential_dataflow::consolidation::{consolidate, consolidate_updates};
20use differential_dataflow::logging::{BatchEvent, DropEvent};
21use itertools::Itertools;
22use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
23use mz_dyncfg::ConfigSet;
24use mz_ore::iter::IteratorExt;
25use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
26use mz_repr::{Diff, Timestamp};
27use timely::PartialOrder;
28use timely::progress::Antichain;
29
30use crate::logging::compute::{
31    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
32    ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
33    Logger as ComputeLogger,
34};
35use crate::sink::correction_v2::{CorrectionV2, Data};
36
37/// A data structure suitable for storing updates in a self-correcting persist sink.
38///
39/// Selects one of two correction buffer implementations. `V1` is the original simple
40/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
41/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
42/// `V1` in a pinch. The plan is to remove `V1` eventually.
43pub(super) enum Correction<D: Data> {
44    V1(CorrectionV1<D>),
45    V2(CorrectionV2<D>),
46}
47
48impl<D: Data> Correction<D> {
49    /// Construct a new `Correction` instance.
50    pub fn new(
51        metrics: SinkMetrics,
52        worker_metrics: SinkWorkerMetrics,
53        logging: Option<Logging>,
54        config: &ConfigSet,
55    ) -> Self {
56        if ENABLE_CORRECTION_V2.get(config) {
57            Self::V2(CorrectionV2::new(metrics, worker_metrics, logging))
58        } else {
59            let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
60            Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
61        }
62    }
63
64    /// Insert a batch of updates.
65    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
66        match self {
67            Self::V1(c) => c.insert(updates),
68            Self::V2(c) => c.insert(updates),
69        }
70    }
71
72    /// Insert a batch of updates, after negating their diffs.
73    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
74        match self {
75            Self::V1(c) => c.insert_negated(updates),
76            Self::V2(c) => c.insert_negated(updates),
77        }
78    }
79
80    /// Consolidate and return updates before the given `upper`.
81    pub fn updates_before(
82        &mut self,
83        upper: &Antichain<Timestamp>,
84    ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
85        match self {
86            Self::V1(c) => Box::new(c.updates_before(upper)),
87            Self::V2(c) => Box::new(c.updates_before(upper)),
88        }
89    }
90
91    /// Advance the since frontier.
92    ///
93    /// # Panics
94    ///
95    /// Panics if the given `since` is less than the current since frontier.
96    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
97        match self {
98            Self::V1(c) => c.advance_since(since),
99            Self::V2(c) => c.advance_since(since),
100        }
101    }
102
103    /// Consolidate all updates at the current `since`.
104    pub fn consolidate_at_since(&mut self) {
105        match self {
106            Self::V1(c) => c.consolidate_at_since(),
107            Self::V2(c) => c.consolidate_at_since(),
108        }
109    }
110}
111
112/// A collection holding `persist_sink` updates.
113///
114/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
115/// operator:
116///
117///  * It stores updates by time, to enable efficient separation between updates that should
118///    be written to a batch and updates whose time has not yet arrived.
119///  * It eschews an interface for directly removing previously inserted updates. Instead, updates
120///    are removed by inserting them again, with negated diffs. Stored updates are continuously
121///    consolidated to give them opportunity to cancel each other out.
122///  * It provides an interface for advancing all contained updates to a given frontier.
123pub(super) struct CorrectionV1<D> {
124    /// Stashed updates by time.
125    updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
126    /// Frontier to which all update times are advanced.
127    since: Antichain<Timestamp>,
128
129    /// Total length and capacity of vectors in `updates`.
130    ///
131    /// Tracked to maintain metrics.
132    total_size: LengthAndCapacity,
133    /// Global persist sink metrics.
134    metrics: SinkMetrics,
135    /// Per-worker persist sink metrics.
136    worker_metrics: SinkWorkerMetrics,
137    /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
138    growth_dampener: usize,
139}
140
141impl<D> CorrectionV1<D> {
142    /// Construct a new `CorrectionV1` instance.
143    pub fn new(
144        metrics: SinkMetrics,
145        worker_metrics: SinkWorkerMetrics,
146        growth_dampener: usize,
147    ) -> Self {
148        Self {
149            updates: Default::default(),
150            since: Antichain::from_elem(Timestamp::MIN),
151            total_size: Default::default(),
152            metrics,
153            worker_metrics,
154            growth_dampener,
155        }
156    }
157
158    /// Update persist sink metrics to the given new length and capacity.
159    fn update_metrics(&mut self, new_size: LengthAndCapacity) {
160        let old_size = self.total_size;
161        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
162        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
163        self.metrics
164            .report_correction_update_deltas(len_delta, cap_delta);
165        self.worker_metrics
166            .report_correction_update_totals(new_size.length, new_size.capacity);
167
168        self.total_size = new_size;
169    }
170}
171
172impl<D: Data> CorrectionV1<D> {
173    /// Insert a batch of updates.
174    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
175        let Some(since_ts) = self.since.as_option() else {
176            // If the since frontier is empty, discard all updates.
177            return;
178        };
179
180        for (_, time, _) in &mut *updates {
181            *time = std::cmp::max(*time, *since_ts);
182        }
183        self.insert_inner(updates);
184    }
185
186    /// Insert a batch of updates, after negating their diffs.
187    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
188        let Some(since_ts) = self.since.as_option() else {
189            // If the since frontier is empty, discard all updates.
190            updates.clear();
191            return;
192        };
193
194        for (_, time, diff) in &mut *updates {
195            *time = std::cmp::max(*time, *since_ts);
196            *diff = -*diff;
197        }
198        self.insert_inner(updates);
199    }
200
201    /// Insert a batch of updates.
202    ///
203    /// The given `updates` must all have been advanced by `self.since`.
204    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
205        consolidate_updates(updates);
206        updates.sort_unstable_by_key(|(_, time, _)| *time);
207
208        let mut new_size = self.total_size;
209        let mut updates = updates.drain(..).peekable();
210        while let Some(&(_, time, _)) = updates.peek() {
211            debug_assert!(
212                self.since.less_equal(&time),
213                "update not advanced by `since`"
214            );
215
216            let data = updates
217                .peeking_take_while(|(_, t, _)| *t == time)
218                .map(|(d, _, r)| (d, r));
219
220            use std::collections::btree_map::Entry;
221            match self.updates.entry(time) {
222                Entry::Vacant(entry) => {
223                    let mut vec: ConsolidatingVec<_> = data.collect();
224                    vec.growth_dampener = self.growth_dampener;
225                    new_size += (vec.len(), vec.capacity());
226                    entry.insert(vec);
227                }
228                Entry::Occupied(mut entry) => {
229                    let vec = entry.get_mut();
230                    new_size -= (vec.len(), vec.capacity());
231                    vec.extend(data);
232                    new_size += (vec.len(), vec.capacity());
233                }
234            }
235        }
236
237        self.update_metrics(new_size);
238    }
239
240    /// Consolidate and return updates within the given bounds.
241    ///
242    /// # Panics
243    ///
244    /// Panics if `lower` is not less than or equal to `upper`.
245    pub fn updates_within<'a>(
246        &'a mut self,
247        lower: &Antichain<Timestamp>,
248        upper: &Antichain<Timestamp>,
249    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
250        assert!(PartialOrder::less_equal(lower, upper));
251
252        let start = match lower.as_option() {
253            Some(ts) => Bound::Included(*ts),
254            None => Bound::Excluded(Timestamp::MAX),
255        };
256        let end = match upper.as_option() {
257            Some(ts) => Bound::Excluded(*ts),
258            None => Bound::Unbounded,
259        };
260
261        let update_count = self.consolidate((start, end));
262
263        let range = self.updates.range((start, end));
264        range
265            .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
266            .exact_size(update_count)
267    }
268
269    /// Consolidate and return updates before the given `upper`.
270    pub fn updates_before<'a>(
271        &'a mut self,
272        upper: &Antichain<Timestamp>,
273    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
274        let lower = Antichain::from_elem(Timestamp::MIN);
275        self.updates_within(&lower, upper)
276    }
277
278    /// Consolidate the updates at the times in the given range.
279    ///
280    /// Returns the number of updates remaining in the range afterwards.
281    fn consolidate<R>(&mut self, range: R) -> usize
282    where
283        R: RangeBounds<Timestamp>,
284    {
285        let mut new_size = self.total_size;
286
287        let updates = self.updates.range_mut(range);
288        let count = updates.fold(0, |acc, (_, data)| {
289            new_size -= (data.len(), data.capacity());
290            data.consolidate();
291            new_size += (data.len(), data.capacity());
292            acc + data.len()
293        });
294
295        self.update_metrics(new_size);
296        count
297    }
298
299    /// Advance the since frontier.
300    ///
301    /// # Panics
302    ///
303    /// Panics if the given `since` is less than the current since frontier.
304    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
305        assert!(PartialOrder::less_equal(&self.since, &since));
306
307        if since != self.since {
308            self.advance_by(&since);
309            self.since = since;
310        }
311    }
312
313    /// Advance all contained updates by the given frontier.
314    ///
315    /// If the given frontier is empty, all remaining updates are discarded.
316    pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
317        let Some(target_ts) = frontier.as_option() else {
318            self.updates.clear();
319            self.update_metrics(Default::default());
320            return;
321        };
322
323        let mut new_size = self.total_size;
324        while let Some((ts, data)) = self.updates.pop_first() {
325            if frontier.less_equal(&ts) {
326                // We have advanced all updates that can advance.
327                self.updates.insert(ts, data);
328                break;
329            }
330
331            use std::collections::btree_map::Entry;
332            match self.updates.entry(*target_ts) {
333                Entry::Vacant(entry) => {
334                    entry.insert(data);
335                }
336                Entry::Occupied(mut entry) => {
337                    let vec = entry.get_mut();
338                    new_size -= (data.len(), data.capacity());
339                    new_size -= (vec.len(), vec.capacity());
340                    vec.extend(data);
341                    new_size += (vec.len(), vec.capacity());
342                }
343            }
344        }
345
346        self.update_metrics(new_size);
347    }
348
349    /// Consolidate all updates at the current `since`.
350    pub fn consolidate_at_since(&mut self) {
351        let Some(since_ts) = self.since.as_option() else {
352            return;
353        };
354
355        let start = Bound::Included(*since_ts);
356        let end = match since_ts.try_step_forward() {
357            Some(ts) => Bound::Excluded(ts),
358            None => Bound::Unbounded,
359        };
360
361        self.consolidate((start, end));
362    }
363}
364
365impl<D> Drop for CorrectionV1<D> {
366    fn drop(&mut self) {
367        self.update_metrics(Default::default());
368    }
369}
370
371/// Helper type for convenient tracking of length and capacity together.
372#[derive(Clone, Copy, Debug, Default)]
373pub(super) struct LengthAndCapacity {
374    pub length: usize,
375    pub capacity: usize,
376}
377
378impl AddAssign<Self> for LengthAndCapacity {
379    fn add_assign(&mut self, size: Self) {
380        self.length += size.length;
381        self.capacity += size.capacity;
382    }
383}
384
385impl AddAssign<(usize, usize)> for LengthAndCapacity {
386    fn add_assign(&mut self, (len, cap): (usize, usize)) {
387        self.length += len;
388        self.capacity += cap;
389    }
390}
391
392impl SubAssign<(usize, usize)> for LengthAndCapacity {
393    fn sub_assign(&mut self, (len, cap): (usize, usize)) {
394        self.length -= len;
395        self.capacity -= cap;
396    }
397}
398
399/// A vector that consolidates its contents.
400///
401/// The vector is filled with updates until it reaches capacity. At this point, the updates are
402/// consolidated to free up space. This process repeats until the consolidation recovered less than
403/// half of the vector's capacity, at which point the capacity is doubled.
404#[derive(Debug)]
405pub(crate) struct ConsolidatingVec<D> {
406    data: Vec<(D, Diff)>,
407    /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
408    /// might start smaller than this.
409    min_capacity: usize,
410    /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
411    ///
412    /// If consolidation didn't free enough space, at least a linear amount, increase the capacity
413    /// Setting this to 0 results in doubling whenever the list is at least half full.
414    /// Larger numbers result in more conservative approaches that use more CPU, but less memory.
415    growth_dampener: usize,
416}
417
418impl<D: Ord> ConsolidatingVec<D> {
419    /// Creates a new instance from the necessary configuration arguments.
420    pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
421        ConsolidatingVec {
422            data: Vec::new(),
423            min_capacity,
424            growth_dampener,
425        }
426    }
427
428    /// Return the length of the vector.
429    pub fn len(&self) -> usize {
430        self.data.len()
431    }
432
433    /// Return the capacity of the vector.
434    pub fn capacity(&self) -> usize {
435        self.data.capacity()
436    }
437
438    /// Pushes `item` into the vector.
439    ///
440    /// If the vector does not have sufficient capacity, we'll first consolidate and then increase
441    /// its capacity if the consolidated results still occupy a significant fraction of the vector.
442    ///
443    /// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
444    /// but amortizes to O(log n).
445    pub fn push(&mut self, item: (D, Diff)) {
446        let capacity = self.data.capacity();
447        if self.data.len() == capacity {
448            // The vector is full. First, consolidate to try to recover some space.
449            self.consolidate();
450
451            // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
452            // This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
453            let length = self.data.len();
454            let dampener = self.growth_dampener;
455            if capacity < length + length / (dampener + 1) {
456                // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
457                // determining the target capacity, and then reserving an amount that achieves this
458                // while working around the existing length.
459                let new_cap = capacity + capacity / (dampener + 1);
460                self.data.reserve_exact(new_cap - length);
461            }
462        }
463
464        self.data.push(item);
465    }
466
467    /// Consolidate the contents.
468    pub fn consolidate(&mut self) {
469        consolidate(&mut self.data);
470
471        // We may have the opportunity to reclaim allocated memory.
472        // Given that `push` will at most double the capacity when the vector is more than half full, and
473        // we want to avoid entering into a resizing cycle, we choose to only shrink if the
474        // vector's length is less than one fourth of its capacity.
475        if self.data.len() < self.data.capacity() / 4 {
476            self.data.shrink_to(self.min_capacity);
477        }
478    }
479
480    /// Return an iterator over the borrowed items.
481    pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
482        self.data.iter()
483    }
484
485    /// Returns mutable access to the underlying items.
486    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
487        self.data.iter_mut()
488    }
489}
490
491impl<D> IntoIterator for ConsolidatingVec<D> {
492    type Item = (D, Diff);
493    type IntoIter = std::vec::IntoIter<(D, Diff)>;
494
495    fn into_iter(self) -> Self::IntoIter {
496        self.data.into_iter()
497    }
498}
499
500impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
501    fn from_iter<I>(iter: I) -> Self
502    where
503        I: IntoIterator<Item = (D, Diff)>,
504    {
505        Self {
506            data: Vec::from_iter(iter),
507            min_capacity: 0,
508            growth_dampener: 0,
509        }
510    }
511}
512
513impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
514    fn extend<I>(&mut self, iter: I)
515    where
516        I: IntoIterator<Item = (D, Diff)>,
517    {
518        for item in iter {
519            self.push(item);
520        }
521    }
522}
523
524/// Helper type for convenient tracking of various size metrics together.
525#[derive(Clone, Copy, Debug, Default)]
526pub(super) struct SizeMetrics {
527    pub size: usize,
528    pub capacity: usize,
529    pub allocations: usize,
530}
531
532impl AddAssign<Self> for SizeMetrics {
533    fn add_assign(&mut self, other: Self) {
534        self.size += other.size;
535        self.capacity += other.capacity;
536        self.allocations += other.allocations;
537    }
538}
539
540/// Helper for correction buffer logging.
541///
542// TODO: Correction buffer logging currently reuses the arrangement batch and size logging. This
543// isn't strictly correct as a correction buffer is not an arrangement. Consider refactoring this
544// to be about "operator sizes" instead.
545#[derive(Clone, Debug)]
546pub(super) struct Logging(Rc<RefCell<LoggingInner>>);
547
548impl Logging {
549    pub fn new(
550        compute_logger: ComputeLogger,
551        differential_logger: differential_dataflow::logging::Logger,
552        operator_id: usize,
553        address: Vec<usize>,
554    ) -> Self {
555        let inner = LoggingInner::new(compute_logger, differential_logger, operator_id, address);
556        Self(Rc::new(RefCell::new(inner)))
557    }
558
559    /// A new chain was created.
560    pub fn chain_created(&self, updates: usize) {
561        self.0.borrow_mut().chain_created(updates);
562    }
563
564    /// A chain was dropped.
565    pub fn chain_dropped(&self, updates: usize) {
566        self.0.borrow_mut().chain_dropped(updates);
567    }
568
569    /// Report a heap size diff.
570    pub fn report_size_diff(&self, diff: isize) {
571        self.0.borrow_mut().report_size_diff(diff);
572    }
573
574    /// Report a heap capacity diff.
575    pub fn report_capacity_diff(&self, diff: isize) {
576        self.0.borrow_mut().report_capacity_diff(diff);
577    }
578
579    /// Report a heap allocations diff.
580    pub fn report_allocations_diff(&self, diff: isize) {
581        self.0.borrow_mut().report_allocations_diff(diff);
582    }
583}
584
585/// Inner state for correction buffer logging.
586///
587/// This is separate from `Logging` and shared via an `Rc`, to ensure we only emit
588/// `ArrangementHeapSizeOperator{Drop}` events once, even though we pass the logger to both the Ok
589/// and the Err correction buffer.
590struct LoggingInner {
591    compute_logger: ComputeLogger,
592    differential_logger: differential_dataflow::logging::Logger,
593    operator_id: usize,
594}
595
596impl fmt::Debug for LoggingInner {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        f.debug_struct("LoggingInner")
599            .field("operator_id", &self.operator_id)
600            .finish_non_exhaustive()
601    }
602}
603
604impl LoggingInner {
605    fn new(
606        compute_logger: ComputeLogger,
607        differential_logger: differential_dataflow::logging::Logger,
608        operator_id: usize,
609        address: Vec<usize>,
610    ) -> Self {
611        compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
612            ArrangementHeapSizeOperator {
613                operator_id,
614                address,
615            },
616        ));
617
618        Self {
619            compute_logger,
620            differential_logger,
621            operator_id,
622        }
623    }
624
625    fn chain_created(&self, updates: usize) {
626        self.differential_logger.log(BatchEvent {
627            operator: self.operator_id,
628            length: updates,
629        });
630    }
631
632    fn chain_dropped(&self, updates: usize) {
633        self.differential_logger.log(DropEvent {
634            operator: self.operator_id,
635            length: updates,
636        });
637    }
638
639    fn report_size_diff(&self, diff: isize) {
640        if diff != 0 {
641            self.compute_logger
642                .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
643                    operator_id: self.operator_id,
644                    delta_size: diff,
645                }));
646        }
647    }
648
649    fn report_capacity_diff(&self, diff: isize) {
650        if diff != 0 {
651            self.compute_logger
652                .log(&ComputeEvent::ArrangementHeapCapacity(
653                    ArrangementHeapCapacity {
654                        operator_id: self.operator_id,
655                        delta_capacity: diff,
656                    },
657                ));
658        }
659    }
660
661    fn report_allocations_diff(&self, diff: isize) {
662        if diff != 0 {
663            self.compute_logger
664                .log(&ComputeEvent::ArrangementHeapAllocations(
665                    ArrangementHeapAllocations {
666                        operator_id: self.operator_id,
667                        delta_allocations: diff,
668                    },
669                ));
670        }
671    }
672}
673
674impl Drop for LoggingInner {
675    fn drop(&mut self) {
676        self.compute_logger
677            .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
678                ArrangementHeapSizeOperatorDrop {
679                    operator_id: self.operator_id,
680                },
681            ));
682    }
683}